| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- """
- Implementing support for google's bigquery
- - cloud.bigquery.Read
- - cloud.bigquery.Write
- """
- import json
- from google.oauth2 import service_account
- from google.cloud import bigquery as bq
- from multiprocessing import Lock, RLock
- import pandas as pd
- import pandas_gbq as pd_gbq
- import numpy as np
- import time
- MAX_CHUNK = 2000000
- def template ():
- return {'provider':'bigquery','private_key':'path-to-key','dataset':'name-of-dataset','table':'table'}
- class BigQuery:
- def __init__(self,**_args):
- path = _args['service_key'] if 'service_key' in _args else _args['private_key']
- self.credentials = service_account.Credentials.from_service_account_file(path)
- self.dataset = _args['dataset'] if 'dataset' in _args else None
- self.path = path
- self.dtypes = _args['dtypes'] if 'dtypes' in _args else None
- self.table = _args['table'] if 'table' in _args else None
- self.client = bq.Client.from_service_account_json(self.path)
- def meta(self,**_args):
- """
- This function returns meta data for a given table or query with dataset/table properly formatted
- :param table name of the name WITHOUT including dataset
- :param sql sql query to be pulled,
- """
- table = _args['table'] if 'table' in _args else self.table
-
- try:
- if table :
- _dataset = self.dataset if 'dataset' not in _args else _args['dataset']
- sql = f"""SELECT column_name as name, data_type as type FROM {_dataset}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{table}' """
- _info = {'credentials':self.credentials,'dialect':'standard'}
- return pd_gbq.read_gbq(sql,**_info).to_dict(orient='records')
- # return self.read(sql=sql).to_dict(orient='records')
- # ref = self.client.dataset(self.dataset).table(table)
-
- # _schema = self.client.get_table(ref).schema
- # return [{"name":_item.name,"type":_item.field_type,"description":( "" if not hasattr(_item,"description") else _item.description )} for _item in _schema]
- else :
- return []
- except Exception as e:
-
- return []
- def has(self,**_args):
- found = False
- try:
- _has = self.meta(**_args)
- found = _has is not None and len(_has) > 0
- except Exception as e:
- pass
- return found
- class Reader (BigQuery):
- """
- Implementing support for reading from bigquery, This class acts as a wrapper around google's API
- """
- def __init__(self,**_args):
-
- super().__init__(**_args)
- def apply(self,sql):
- return self.read(sql=sql)
-
- def read(self,**_args):
- SQL = None
- table = self.table if 'table' not in _args else _args['table']
- if 'sql' in _args :
- SQL = _args['sql']
- elif table:
- table = "".join(["`",table,"`"]) if '.' in table else "".join(["`:dataset.",table,"`"])
- SQL = "SELECT * FROM :table ".replace(":table",table)
- if not SQL :
- return None
- if SQL and 'limit' in _args:
- SQL += " LIMIT "+str(_args['limit'])
- if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset:
- SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset)
- _info = {'credentials':self.credentials,'dialect':'standard'}
- return pd_gbq.read_gbq(SQL,**_info) if SQL else None
- # return self.client.query(SQL).to_dataframe() if SQL else None
-
- class Writer (BigQuery):
- """
- This class implements support for writing against bigquery
- """
- lock = RLock()
- def __init__(self,**_args):
- super().__init__(**_args)
-
- self.parallel = False if 'lock' not in _args else _args['lock']
- self.table = _args['table'] if 'table' in _args else None
- self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials}
- self._chunks = 1 if 'chunks' not in _args else int(_args['chunks'])
- self._location = 'US' if 'location' not in _args else _args['location']
- def write(self,_data,**_args) :
- """
- This function will perform a write to bigquery
- :_data data-frame to be written to bigquery
- """
- try:
- if self.parallel or 'lock' in _args :
- Writer.lock.acquire()
- _args['table'] = self.table if 'table' not in _args else _args['table']
- self._write(_data,**_args)
- finally:
- if self.parallel:
- Writer.lock.release()
- def submit(self,_sql):
- """
- Write the output of a massive query to a given table, biquery will handle this as a job
- This function will return the job identifier
- """
- _config = bq.QueryJobConfig()
- _config.destination = self.client.dataset(self.dataset).table(self.table)
- _config.allow_large_results = True
- # _config.write_disposition = bq.bq_consts.WRITE_APPEND
- _config.dry_run = False
- # _config.priority = 'BATCH'
- _resp = self.client.query(_sql,location=self._location,job_config=_config)
- return _resp.job_id
- def status (self,_id):
- return self.client.get_job(_id,location=self._location)
- def _write(self,_info,**_args) :
- _df = None
- if type(_info) in [list,pd.DataFrame] :
- if type(_info) == list :
- _df = pd.DataFrame(_info)
- elif type(_info) == pd.DataFrame :
- _df = _info
-
- if '.' not in _args['table'] :
- self.mode['destination_table'] = '.'.join([self.dataset,_args['table']])
- else:
- self.mode['destination_table'] = _args['table'].strip()
- if 'schema' in _args :
- self.mode['table_schema'] = _args['schema']
- #
- # Let us insure that the types are somewhat compatible ...
- # _map = {'INTEGER':np.int64,'DATETIME':'datetime64[ns]','TIMESTAMP':'datetime64[ns]','FLOAT':np.float64,'DOUBLE':np.float64,'STRING':str}
- # _mode = copy.deepcopy(self.mode)
- # _mode = self.mode
- # _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
- #
- # Let us adjust the chunking here
- if 'if_exists' in _args :
- self.mode['if_exists'] = _args['if_exists']
- self._chunks = 10 if _df.shape[0] > MAX_CHUNK and self._chunks == 1 else self._chunks
- _indexes = np.array_split(np.arange(_df.shape[0]),self._chunks)
- for i in _indexes :
- # _df.iloc[i].to_gbq(**self.mode)
- pd_gbq.to_gbq(_df.iloc[i],**self.mode)
- time.sleep(1)
- pass
|