Browse Source

bq support

Steve Nyemba 4 years ago
parent
commit
687ffec215
2 changed files with 61 additions and 4 deletions
  1. 2 2
      setup.py
  2. 59 2
      transport/sql.py

+ 2 - 2
setup.py

@@ -8,12 +8,12 @@ def read(fname):
     return open(os.path.join(os.path.dirname(__file__), fname)).read() 
 args    = {
     "name":"data-transport",
-    "version":"1.3.6",
+    "version":"1.3.8",
     "author":"The Phi Technology LLC","author_email":"info@the-phi.com",
     "license":"MIT",
     "packages":["transport"]}
 args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
-args["install_requires"] = ['pymongo','numpy','cloudant','pika','boto','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
+args["install_requires"] = ['pymongo','numpy','cloudant','pika','boto','google-cloud-bigquery','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
 args["url"] =   "https://healthcareio.the-phi.com/git/code/transport.git"
 
 if sys.version_info[0] == 2 :

+ 59 - 2
transport/sql.py

@@ -17,8 +17,8 @@ if sys.version_info[0] > 2 :
 else:
 	from common import Reader,Writer
 import json
-# from threading import Lock
-
+from google.oauth2 import service_account
+from multiprocessing import Lock
 import pandas as pd
 import numpy as np
 class SQLRW :
@@ -175,7 +175,64 @@ class SQLWriter(SQLRW,Writer):
             self.conn.close()
         finally:
             pass
+class BigQuery:
+    def __init__(self,**_args):
+        path = _args['service_key']
+        self.credentials = service_account.Credentials.from_service_account_file(path)
+        
+class BQReader(BigQuery,Reader) :
+    def __init__(self,**_args):
+        
+        super().__init__(**_args)    
+
+        pass
+    def read(self,**_args):
+        SQL = None
+        if 'sql' in _args :
+            SQL = _args['sql']
+        elif 'table' in _args:
+
+            table = "".join(["`",_args['table'],"`"]) 
+            SQL = "SELECT  * FROM :table ".replace(":table",table)
+        if SQL and 'limit' in _args:
+            SQL += " LIMIT "+str(_args['limit'])
 
+        return pd.read_gbq(SQL,credentials=self.credentials,dialect='standard') if SQL else None
+class BQWriter(BigQuery,Writer):
+    Lock = Lock()
+    def __init__(self,**_args):
+        super().__init__(**_args)    
+        
+        self.parallel = False if 'lock' not in _args else _args['lock']
+        self.table = _args['table'] if 'table' in _args else None
+        self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials}
+
+    def write(self,_info,**_args) :
+        try:
+            if self.parallel :
+                self.lock.acquire()
+            self._write(_info,**_args)
+        finally:
+            if self.parallel:
+                self.lock.release()
+    def _write(self,_info,**_args) :
+        _df = None
+        if type(_info) in [list,pd.DataFrame] :
+            if type(_info) == list :
+                _df = pd.DataFrame(_info)
+            elif type(_info) == pd.DataFrame :
+                _df = _info 
+            
+            self.mode['destination_table'] = _args['table'].strip()
+            _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)	
+            
+        pass
+# import transport    
+# reader = transport.factory.instance(type="sql.BQReader",args={"service_key":"/home/steve/dev/google-cloud-sdk/accounts/curation-prod.json"})
+# _df = reader.read(sql="select  * from `2019q1r4_combined.person` limit 10")
+# writer = transport.factory.instance(type="sql.BQWriter",args={"service_key":"/home/steve/dev/google-cloud-sdk/accounts/curation-prod.json"})
+# writer.write(_df,table='2019q1r4_combined.foo')
+# write.write()
 # _args = {"db":"sample","table":"foo","provider":"postgresql"}
 # # # w = SQLWriter(**_args)
 # # # w.write({"name":"kalara.io","email":"ceo@kalara.io","age":10})