Browse Source

bug fix: fix with design pattern of the last observer

Steve Nyemba 2 years ago
parent
commit
41c6f72cb3
1 changed files with 15 additions and 9 deletions
  1. 15 9
      healthcareio/export/workers.py

+ 15 - 9
healthcareio/export/workers.py

@@ -12,6 +12,7 @@ from multiprocessing import Process, Lock
 import numpy as np
 import json
 import pandas as pd
+from zmq import has
 
 class Subject (Process):
     cache = pd.DataFrame()
@@ -94,8 +95,8 @@ class Worker :
         except Exception as error:
             pass
         finally:
-            
-            self.caller.notify() 
+            if hasattr(self,'caller') :
+                self.caller.notify() 
     def _apply(self):        
         pass
     def get(self):
@@ -176,11 +177,16 @@ class Reader(Worker):
     def init(self,**_args):
         super().init(**_args)
         self.rows = []
-        
+              
   
     def _apply(self):
         try:
-            
+            if 'type' in self._info :
+                self._info['type'] = self._info['type'].replace('Writer','Reader')
+                if 'fields' in self._info['args'] :
+                    del self._info['args']['fields']
+            else:
+                self._info['context'] = 'read'
             self.reader = transport.factory.instance(**self._info) ; 
             
             # self.rows = self.reader.read(mongo=self.pipeline)
@@ -206,7 +212,7 @@ class Reader(Worker):
         except Exception as e :
             _log['status'] = 0
             _log['info']  = {"error":e.args[0]}
-            print (e)
+            print ([e])
             
 
         self.log(**_log)
@@ -221,13 +227,13 @@ class Writer(Worker):
         super().__init__(**_args)
         if 'provider' in self._info :
             self._info['context'] = 'write'
-
+        
     def init(self,**_args):
         """
         :param  store   output data-store needed for writing
         :param invalues input values with to be written somewhere
         """
-        super().init(**_args)
+        
         
         self._invalues = _args['invalues']
         
@@ -259,8 +265,8 @@ class Writer(Worker):
                     # Upon upgrade use the operator "$toString" in export.init function
                     #
                     rows = [dict(item,**{"_id":str(item["_id"])}) for item in rows]
-                    
-                    writer.write(rows)
+                    _df = pd.DataFrame(rows)                    
+                    writer.write(_df)
                 index += 1
                 # for _e in rows :
                     # writer.write(_e)