common.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. """
  2. This file encapsulates common operations associated with SQL databases via SQLAlchemy
  3. """
  4. import sqlalchemy as sqa
  5. import pandas as pd
  6. class Base:
  7. def __init__(self,**_args):
  8. self._host = _args['host'] if 'host' in _args else 'localhost'
  9. self._port = None
  10. self._database = _args['database']
  11. self._table = _args['table'] if 'table' in _args else None
  12. self._engine= sqa.create_engine(self._get_uri(**_args),future=True)
  13. def _set_uri(self,**_args) :
  14. """
  15. :provider provider
  16. :host host and port
  17. :account account user/pwd
  18. """
  19. _account = _args['account'] if 'account' in _args else None
  20. _host = _args['host']
  21. _provider = _args['provider'].replace(':','').replace('/','').strip()
  22. def _get_uri(self,**_args):
  23. """
  24. This function will return the formatted uri for the sqlAlchemy engine
  25. """
  26. raise Exception ("Function Needs to be implemented ")
  27. def meta (self,**_args):
  28. """
  29. This function returns the schema (table definition) of a given table
  30. :table optional name of the table (can be fully qualified)
  31. """
  32. _table = self._table if 'table' not in _args else _args['table']
  33. _schema = []
  34. if _table :
  35. if sqa.__version__.startswith('1.') :
  36. _handler = sqa.MetaData(bind=self._engine)
  37. _handler.reflect()
  38. else:
  39. #
  40. # sqlalchemy's version 2.+
  41. _handler = sqa.MetaData()
  42. _handler.reflect(bind=self._engine)
  43. #
  44. # Let us extract the schema with the native types
  45. _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
  46. _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
  47. return _schema
  48. def has(self,**_args):
  49. return self.meta(**_args)
  50. def apply(self,sql):
  51. """
  52. Executing sql statement that returns query results (hence the restriction on sql and/or with)
  53. :sql SQL query to be exectued
  54. @TODO: Execution of stored procedures
  55. """
  56. return pd.read_sql(sql,self._engine) if sql.lower().startswith('select') or sql.lower().startswith('with') else None
  57. class SQLBase(Base):
  58. def __init__(self,**_args):
  59. super().__init__(**_args)
  60. def get_provider(self):
  61. raise Exception ("Provider Needs to be set ...")
  62. def get_default_port(self) :
  63. raise Exception ("default port needs to be set")
  64. def _get_uri(self,**_args):
  65. _host = self._host
  66. _account = ''
  67. if self._port :
  68. _port = self._port
  69. else:
  70. _port = self.get_default_port()
  71. _host = f'{_host}:{_port}'
  72. if 'username' in _args :
  73. _account = ''.join([_args['username'],':',_args['password'],'@'])
  74. _database = self._database
  75. _provider = self.get_provider().replace(':','').replace('/','')
  76. # _uri = [f'{_provider}:/',_account,_host,_database]
  77. # _uri = [_item.strip() for _item in _uri if _item.strip()]
  78. # return '/'.join(_uri)
  79. return f'{_provider}://{_host}/{_database}' if _account == '' else f'{_provider}://{_account}{_host}/{_database}'
  80. class BaseReader(SQLBase):
  81. def __init__(self,**_args):
  82. super().__init__(**_args)
  83. def read(self,**_args):
  84. """
  85. This function will read a query or table from the specific database
  86. """
  87. if 'sql' in _args :
  88. sql = _args['sql']
  89. else:
  90. _table = _args['table'] if 'table' in _args else self._table
  91. sql = f'SELECT * FROM {_table}'
  92. return self.apply(sql)
  93. class BaseWriter (SQLBase):
  94. """
  95. This class implements SQLAlchemy support for Writting to a data-store (RDBMS)
  96. """
  97. def __init__(self,**_args):
  98. super().__init__(**_args)
  99. def write(self,_data,**_args):
  100. if type(_data) == dict :
  101. _df = pd.DataFrame(_data)
  102. elif type(_data) == list :
  103. _df = pd.DataFrame(_data)
  104. else:
  105. _df = _data.copy()
  106. #
  107. # We are assuming we have a data-frame at this point
  108. #
  109. _table = _args['table'] if 'table' in _args else self._table
  110. _mode = {'chunksize':2000000,'if_exists':'append','index':False}
  111. if 'schema' in _args :
  112. _mode['schema'] = _args['schema']
  113. if 'if_exists' in _args :
  114. _mode['if_exists'] = _args['if_exists']
  115. _df.to_sql(_table,self._engine,**_args,index=False)