ソースを参照

bug fix: close & etl

Steve Nyemba 5 ヶ月 前
コミット
49ebd4a432
2 ファイル変更32 行追加17 行削除
  1. 7 2
      transport/iowrapper.py
  2. 25 15
      transport/sql/common.py

+ 7 - 2
transport/iowrapper.py

@@ -109,8 +109,10 @@ class IETL(IReader) :
         self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
     def read(self,**_args):
         _data = super().read(**_args)
-
+        _schema = super().meta()
         for _kwargs in self._targets :
+            if _schema :
+                _kwargs['schema'] = _schema
             self.post(_data,**_kwargs)
 
         return _data
@@ -122,5 +124,8 @@ class IETL(IReader) :
         :_args  parameters associated with writer object
         """
         writer = transport.get.writer(**_args)
-        writer.write(_data)
+        if 'schema' in _args :
+            writer.write(_data,schema=_args['schema'])
+        else:
+            writer.write(_data)
         writer.close()

+ 25 - 15
transport/sql/common.py

@@ -3,7 +3,7 @@ This file encapsulates common operations associated with SQL databases via SQLAl
 
 """
 import sqlalchemy as sqa
-from sqlalchemy import text 
+from sqlalchemy import text , MetaData, inspect
 
 import pandas as pd
 
@@ -34,20 +34,26 @@ class Base:
         :table  optional name of the table (can be fully qualified)
         """
         _table = self._table if 'table' not in _args else _args['table']
+        _map = {'TINYINT':'INTEGER','BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
         _schema = []
-        if _table :
-            if sqa.__version__.startswith('1.') :
-                _handler = sqa.MetaData(bind=self._engine)
-                _handler.reflect()
-            else:
-                #
-                # sqlalchemy's version 2.+
-                _handler = sqa.MetaData()
-                _handler.reflect(bind=self._engine)
-            #
-            # Let us extract the schema with the native types
-            _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
-            _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
+        # if _table :
+        #     if sqa.__version__.startswith('1.') :
+        #         _handler = sqa.MetaData(bind=self._engine)
+        #         _handler.reflect()
+        #     else:
+        #         #
+        #         # sqlalchemy's version 2.+
+        #         _handler = sqa.MetaData()
+        #         _handler.reflect(bind=self._engine)
+        #     #
+        #     # Let us extract the schema with the native types
+        #     _map = {'BIGINT':'INTEGER','TEXT':'STRING','DOUBLE_PRECISION':'FLOAT','NUMERIC':'FLOAT','DECIMAL':'FLOAT','REAL':'FLOAT'}
+        #     _schema = [{"name":_attr.name,"type":_map.get(str(_attr.type),str(_attr.type))} for _attr in _handler.tables[_table].columns]
+        #
+
+        _inspector = inspect(self._engine)
+        _columns = _inspector.get_columns(_table)
+        _schema = [{'name':column['name'],'type':_map.get(str(column['type']),str(column['type'])) } for column in _columns]
         return _schema
     def  has(self,**_args):
         return self.meta(**_args)
@@ -94,7 +100,11 @@ class SQLBase(Base):
         # _uri = [_item.strip() for _item in _uri if _item.strip()]
         # return '/'.join(_uri)
         return f'{_provider}://{_host}/{_database}' if _account == '' else f'{_provider}://{_account}{_host}/{_database}'
-
+    def close(self,) :
+        try:
+            self._engine.dispose()
+        except :
+            pass
 class BaseReader(SQLBase):
     def __init__(self,**_args):
         super().__init__(**_args)