Explorar o código

Merge pull request #37 from lnyemba/v2.2.0

etl bug fix
Steve L. Nyemba hai 4 semanas
pai
achega
c5c819b3f5
Modificáronse 2 ficheiros con 16 adicións e 12 borrados
  1. 7 4
      bin/transport
  2. 9 8
      transport/iowrapper.py

+ 7 - 4
bin/transport

@@ -56,6 +56,9 @@ def wait(jobs):
 # def wait (jobs):
 #     while jobs :
 #             jobs = [pthread for pthread in jobs if pthread.is_alive()]
+def launch_etl (_args):
+    _etlTask = IETL(**_args)
+    _etlTask.read()
 
 @app_e.command(name="run")
 def apply (path:Annotated[str,typer.Argument(help="path of the configuration file")],
@@ -75,10 +78,10 @@ def apply (path:Annotated[str,typer.Argument(help="path of the configuration fil
         jobs = []          
         for _args in _config :
             # pthread = etl.instance(**_args) #-- automatically starts the process
-            def bootup ():
-                _worker = IETL(**_args)
-                _worker.run()
-            pthread = Process(target=bootup)
+            # def bootup ():
+            #     _worker = IETL(**_args)
+            #     _worker.read()
+            pthread = Process(target=launch_etl,args=(_args,))
             pthread.start()
             jobs.append(pthread)
             if len(jobs) == batch :

+ 9 - 8
transport/iowrapper.py

@@ -99,6 +99,7 @@ class IETL(IReader) :
     """
     def __init__(self,**_args):
         super().__init__(agent=transport.get.reader(**_args['source']),plugins=None)
+        self._source = _args['source']
         if 'target' in _args:
             self._targets = _args['target'] if type(_args['target']) == list else [_args['target']]
         else:
@@ -108,11 +109,10 @@ class IETL(IReader) :
         # If the parent is already multiprocessing
         self._hasParentProcess = False if 'hasParentProcess' not in _args else _args['hasParentProcess']
     def read(self,**_args):
-        _data = super().read(**_args)
-        _schema = super().meta()
+        _key = 'cmd' if 'cmd' in self._source else 'query'
+        _kwargs = self._source[_key] if _key in self._source else None
+        _data = super().read(**_kwargs)
         for _kwargs in self._targets :
-            # if _schema :
-            #     _kwargs['schema'] = _schema
             self.post(_data,**_kwargs)
 
         return _data
@@ -124,8 +124,9 @@ class IETL(IReader) :
         :_args  parameters associated with writer object
         """
         writer = transport.get.writer(**_args)
-        if 'schema' in _args :
-            writer.write(_data,schema=_args['schema'])
-        else:
-            writer.write(_data)
+        # if 'schema' in _args :
+        #     writer.write(_data,schema=_args['schema'])
+        # else:
+        print ("writing .... ",_data.shape)
+        writer.write(_data)
         writer.close()