Browse Source

bug fix ...

Steve Nyemba 4 years ago
parent
commit
afcc5ed690
2 changed files with 42 additions and 7 deletions
  1. 2 2
      setup.py
  2. 40 5
      transport/sql.py

+ 2 - 2
setup.py

@@ -8,12 +8,12 @@ def read(fname):
     return open(os.path.join(os.path.dirname(__file__), fname)).read() 
 args    = {
     "name":"data-transport",
-    "version":"1.3.8",
+    "version":"1.3.8.1",
     "author":"The Phi Technology LLC","author_email":"info@the-phi.com",
     "license":"MIT",
     "packages":["transport"]}
 args["keywords"]=['mongodb','couchdb','rabbitmq','file','read','write','s3','sqlite']
-args["install_requires"] = ['pymongo','numpy','cloudant','pika','boto','google-cloud-bigquery','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
+args["install_requires"] = ['pymongo','numpy','cloudant','pika','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python']
 args["url"] =   "https://healthcareio.the-phi.com/git/code/transport.git"
 
 if sys.version_info[0] == 2 :

+ 40 - 5
transport/sql.py

@@ -18,9 +18,13 @@ else:
 	from common import Reader,Writer
 import json
 from google.oauth2 import service_account
+from google.cloud import bigquery as bq
 from multiprocessing import Lock
 import pandas as pd
 import numpy as np
+import copy
+
+
 class SQLRW :
     PROVIDERS = {"postgresql":"5432","redshift":"5432","mysql":"3306","mariadb":"3306"}
     DRIVERS  = {"postgresql":pg,"redshift":pg,"mysql":my,"mariadb":my}
@@ -177,9 +181,32 @@ class SQLWriter(SQLRW,Writer):
             pass
 class BigQuery:
     def __init__(self,**_args):
-        path = _args['service_key']
+        path = _args['service_key'] if 'service_key' in _args else _args['private_key']
         self.credentials = service_account.Credentials.from_service_account_file(path)
-        
+        self.dataset = _args['dataset'] if 'dataset' in _args else None
+        self.path = path
+    def meta(self,**_args):
+        """
+        This function returns meta data for a given table or query with dataset/table properly formatted
+        :param table    name of the name WITHOUT including dataset
+        :param sql      sql query to be pulled,
+        """
+        #if 'table' in _args :
+        #    sql = "SELECT * from :dataset."+ _args['table']" limit 1"
+        #else:
+        #    sql = _args['sql'] 
+        #    if 'limit' not in sql.lower() :
+        #        sql = sql + ' limit 1'
+
+        #sql = sql.replace(':dataset',self.dataset) if ':dataset' in args else sql
+
+        #
+        # Let us return the schema information now for a given table 
+        #
+        table = _args['table']
+        client = bq.Client.from_service_account_json(self.path)
+        ref     = client.dataset(self.dataset).table(table)
+        return client.get_table(ref).schema
 class BQReader(BigQuery,Reader) :
     def __init__(self,**_args):
         
@@ -196,7 +223,8 @@ class BQReader(BigQuery,Reader) :
             SQL = "SELECT  * FROM :table ".replace(":table",table)
         if SQL and 'limit' in _args:
             SQL += " LIMIT "+str(_args['limit'])
-
+        if (':dataset' in SQL or ':DATASET' in SQL)  and self.dataset:
+            SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset)
         return pd.read_gbq(SQL,credentials=self.credentials,dialect='standard') if SQL else None
 class BQWriter(BigQuery,Writer):
     Lock = Lock()
@@ -223,7 +251,14 @@ class BQWriter(BigQuery,Writer):
             elif type(_info) == pd.DataFrame :
                 _df = _info 
             
-            self.mode['destination_table'] = _args['table'].strip()
+            if '.'  not in _args['table'] :
+                self.mode['destination_table'] = '.'.join([self.dataset,_args['table']])
+            else:
+
+                self.mode['destination_table'] = _args['table'].strip()
+            if 'schema' in _args :
+                self.mode['table_schema'] = _args['schema']
+            _mode = copy.deepcopy(self.mode)
             _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)	
             
         pass
@@ -237,4 +272,4 @@ class BQWriter(BigQuery,Writer):
 # # # w = SQLWriter(**_args)
 # # # w.write({"name":"kalara.io","email":"ceo@kalara.io","age":10})
 # r = SQLReader(**_args)
-# print (r.read(filter='age > 0',limit = 20))
+# print (r.read(filter='age > 0',limit = 20))