bigquery.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. """
  2. Implementing support for google's bigquery
  3. - cloud.bigquery.Read
  4. - cloud.bigquery.Write
  5. """
  6. import json
  7. from google.oauth2 import service_account
  8. from google.cloud import bigquery as bq
  9. from multiprocessing import Lock, RLock
  10. import pandas as pd
  11. import pandas_gbq as pd_gbq
  12. import numpy as np
  13. import time
  14. MAX_CHUNK = 2000000
  15. def template ():
  16. return {'provider':'bigquery','private_key':'path-to-key','dataset':'name-of-dataset','table':'table'}
  17. class BigQuery:
  18. def __init__(self,**_args):
  19. path = _args['service_key'] if 'service_key' in _args else _args['private_key']
  20. self.credentials = service_account.Credentials.from_service_account_file(path)
  21. self.dataset = _args['dataset'] if 'dataset' in _args else None
  22. self.path = path
  23. self.dtypes = _args['dtypes'] if 'dtypes' in _args else None
  24. self.table = _args['table'] if 'table' in _args else None
  25. self.client = bq.Client.from_service_account_json(self.path)
  26. def meta(self,**_args):
  27. """
  28. This function returns meta data for a given table or query with dataset/table properly formatted
  29. :param table name of the name WITHOUT including dataset
  30. :param sql sql query to be pulled,
  31. """
  32. table = _args['table'] if 'table' in _args else self.table
  33. try:
  34. if table :
  35. _dataset = self.dataset if 'dataset' not in _args else _args['dataset']
  36. sql = f"""SELECT column_name as name, data_type as type FROM {_dataset}.INFORMATION_SCHEMA.COLUMNS WHERE table_name = '{table}' """
  37. _info = {'credentials':self.credentials,'dialect':'standard'}
  38. return pd_gbq.read_gbq(sql,**_info).to_dict(orient='records')
  39. # return self.read(sql=sql).to_dict(orient='records')
  40. # ref = self.client.dataset(self.dataset).table(table)
  41. # _schema = self.client.get_table(ref).schema
  42. # return [{"name":_item.name,"type":_item.field_type,"description":( "" if not hasattr(_item,"description") else _item.description )} for _item in _schema]
  43. else :
  44. return []
  45. except Exception as e:
  46. return []
  47. def has(self,**_args):
  48. found = False
  49. try:
  50. _has = self.meta(**_args)
  51. found = _has is not None and len(_has) > 0
  52. except Exception as e:
  53. pass
  54. return found
  55. class Reader (BigQuery):
  56. """
  57. Implementing support for reading from bigquery, This class acts as a wrapper around google's API
  58. """
  59. def __init__(self,**_args):
  60. super().__init__(**_args)
  61. def apply(self,sql):
  62. return self.read(sql=sql)
  63. def read(self,**_args):
  64. SQL = None
  65. table = self.table if 'table' not in _args else _args['table']
  66. if 'sql' in _args :
  67. SQL = _args['sql']
  68. elif table:
  69. table = "".join(["`",table,"`"]) if '.' in table else "".join(["`:dataset.",table,"`"])
  70. SQL = "SELECT * FROM :table ".replace(":table",table)
  71. if not SQL :
  72. return None
  73. if SQL and 'limit' in _args:
  74. SQL += " LIMIT "+str(_args['limit'])
  75. if (':dataset' in SQL or ':DATASET' in SQL) and self.dataset:
  76. SQL = SQL.replace(':dataset',self.dataset).replace(':DATASET',self.dataset)
  77. _info = {'credentials':self.credentials,'dialect':'standard'}
  78. return pd_gbq.read_gbq(SQL,**_info) if SQL else None
  79. # return self.client.query(SQL).to_dataframe() if SQL else None
  80. class Writer (BigQuery):
  81. """
  82. This class implements support for writing against bigquery
  83. """
  84. lock = RLock()
  85. def __init__(self,**_args):
  86. super().__init__(**_args)
  87. self.parallel = False if 'lock' not in _args else _args['lock']
  88. self.table = _args['table'] if 'table' in _args else None
  89. self.mode = {'if_exists':'append','chunksize':900000,'destination_table':self.table,'credentials':self.credentials}
  90. self._chunks = 1 if 'chunks' not in _args else int(_args['chunks'])
  91. self._location = 'US' if 'location' not in _args else _args['location']
  92. def write(self,_data,**_args) :
  93. """
  94. This function will perform a write to bigquery
  95. :_data data-frame to be written to bigquery
  96. """
  97. try:
  98. if self.parallel or 'lock' in _args :
  99. Writer.lock.acquire()
  100. _args['table'] = self.table if 'table' not in _args else _args['table']
  101. self._write(_data,**_args)
  102. finally:
  103. if self.parallel:
  104. Writer.lock.release()
  105. def submit(self,_sql):
  106. """
  107. Write the output of a massive query to a given table, biquery will handle this as a job
  108. This function will return the job identifier
  109. """
  110. _config = bq.QueryJobConfig()
  111. _config.destination = self.client.dataset(self.dataset).table(self.table)
  112. _config.allow_large_results = True
  113. # _config.write_disposition = bq.bq_consts.WRITE_APPEND
  114. _config.dry_run = False
  115. # _config.priority = 'BATCH'
  116. _resp = self.client.query(_sql,location=self._location,job_config=_config)
  117. return _resp.job_id
  118. def status (self,_id):
  119. return self.client.get_job(_id,location=self._location)
  120. def _write(self,_info,**_args) :
  121. _df = None
  122. if type(_info) in [list,pd.DataFrame] :
  123. if type(_info) == list :
  124. _df = pd.DataFrame(_info)
  125. elif type(_info) == pd.DataFrame :
  126. _df = _info
  127. if '.' not in _args['table'] :
  128. self.mode['destination_table'] = '.'.join([self.dataset,_args['table']])
  129. else:
  130. self.mode['destination_table'] = _args['table'].strip()
  131. if 'schema' in _args :
  132. self.mode['table_schema'] = _args['schema']
  133. #
  134. # Let us insure that the types are somewhat compatible ...
  135. # _map = {'INTEGER':np.int64,'DATETIME':'datetime64[ns]','TIMESTAMP':'datetime64[ns]','FLOAT':np.float64,'DOUBLE':np.float64,'STRING':str}
  136. # _mode = copy.deepcopy(self.mode)
  137. # _mode = self.mode
  138. # _df.to_gbq(**self.mode) #if_exists='append',destination_table=partial,credentials=credentials,chunksize=90000)
  139. #
  140. # Let us adjust the chunking here
  141. if 'if_exists' in _args :
  142. self.mode['if_exists'] = _args['if_exists']
  143. self._chunks = 10 if _df.shape[0] > MAX_CHUNK and self._chunks == 1 else self._chunks
  144. _indexes = np.array_split(np.arange(_df.shape[0]),self._chunks)
  145. for i in _indexes :
  146. # _df.iloc[i].to_gbq(**self.mode)
  147. pd_gbq.to_gbq(_df.iloc[i],**self.mode)
  148. time.sleep(1)
  149. pass