Kaynağa Gözat

bug fix: schema (postgresql) construct

Steve Nyemba 4 ay önce
ebeveyn
işleme
eaa2b99a2d
2 değiştirilmiş dosya ile 15 ekleme ve 8 silme
  1. 6 6
      transport/iowrapper.py
  2. 9 2
      transport/sql/common.py

+ 6 - 6
transport/iowrapper.py

@@ -45,12 +45,12 @@ class IO:
     def close(self):
         if hasattr(self._agent,'close') :
             self._agent.close()
-    def apply(self):
-        """
-        applying pre/post conditions given a pipeline expression
-        """
-        for _pointer in self._plugins :
-            _data = _pointer(_data)
+    # def apply(self):
+    #     """
+    #     applying pre/post conditions given a pipeline expression
+    #     """
+    #     for _pointer in self._plugins :
+    #         _data = _pointer(_data)
     def apply(self,_query):
         if hasattr(self._agent,'apply') :
             return self._agent.apply(_query)

+ 9 - 2
transport/sql/common.py

@@ -71,7 +71,7 @@ class Base:
         @TODO: Execution of stored procedures
         """
         if sql.strip().lower().startswith('select') or sql.strip().lower().startswith('with') or sql.strip().startswith('show'):
-            print (self._engine)
+            
             return pd.read_sql(sql,self._engine) 
         else:
             _handler = self._engine.connect()
@@ -83,6 +83,7 @@ class Base:
 class SQLBase(Base):
     def __init__(self,**_args):
         super().__init__(**_args)
+        self._schema = _args.get('schema',None)
     def get_provider(self):
         raise Exception ("Provider Needs to be set ...")
     def get_default_port(self) :
@@ -122,6 +123,8 @@ class BaseReader(SQLBase):
             sql = _args['sql']
         else:
             _table = _args['table'] if 'table' in _args else self._table
+            if self._schema and type(self._schema) == str :
+                _table = f'{self._schema}.{_table}'
             sql = f'SELECT * FROM {_table}'
         return self.apply(sql)
     
@@ -132,6 +135,7 @@ class BaseWriter (SQLBase):
     """
     def __init__(self,**_args):
         super().__init__(**_args)
+        
     def write(self,_data,**_args):
         if type(_data) == dict :
             _df = pd.DataFrame(_data)
@@ -151,5 +155,8 @@ class BaseWriter (SQLBase):
         #     _mode['schema'] = _args['schema']
         # if 'if_exists' in _args :
         #     _mode['if_exists'] = _args['if_exists']
-
+        if 'schema' in _args and type(_args['schema']) == str:
+            self._schema = _args.get('schema',None)
+        if self._schema :
+           _mode['schema'] = self._schema
         _df.to_sql(_table,self._engine,**_mode)