Browse Source

bug fix, with bigquery write

Steve Nyemba 1 year ago
parent
commit
677239585c
1 changed files with 7 additions and 4 deletions
  1. 7 4
      transport/cloud/bigquery.py

+ 7 - 4
transport/cloud/bigquery.py

@@ -104,12 +104,12 @@ class Writer (BigQuery):
         """
         try:
             if self.parallel or 'lock' in _args :
-                Write.lock.acquire()
+                Writer.lock.acquire()
             _args['table'] = self.table if 'table' not in _args else _args['table']
             self._write(_data,**_args)
         finally:
             if self.parallel:
-                Write.lock.release()
+                Writer.lock.release()
     def submit(self,_sql):
         """
         Write the output of a massive query to a given table, biquery will handle this as a job
@@ -144,13 +144,16 @@ class Writer (BigQuery):
                 # Let us insure that the types are somewhat compatible ...
                 # _map = {'INTEGER':np.int64,'DATETIME':'datetime64[ns]','TIMESTAMP':'datetime64[ns]','FLOAT':np.float64,'DOUBLE':np.float64,'STRING':str}
             # _mode = copy.deepcopy(self.mode)
-            _mode = self.mode
+            # _mode = self.mode
             # _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)	
             #
             # Let us adjust the chunking here 
+            if 'if_exists' in _args :
+                self.mode['if_exists'] = _args['if_exists']
             self._chunks = 10 if _df.shape[0] > MAX_CHUNK and self._chunks == 1 else self._chunks 
             _indexes = np.array_split(np.arange(_df.shape[0]),self._chunks) 
             for i in _indexes :
-                _df.iloc[i].to_gbq(**self.mode)
+                # _df.iloc[i].to_gbq(**self.mode)
+                pd_gbq.to_gbq(_df.iloc[i],**self.mode)
                 time.sleep(1)
         pass