workers.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288
  1. """
  2. HealthcareIO - The Phi Technology LLC 2020
  3. This file contains functionalities that implement elements of an ETL pipeline that will consist of various workers.
  4. The pipeline is built around an observer design pattern.
  5. @TODO: Integrate with airflow and other process monitoring tools
  6. """
  7. import transport
  8. import os
  9. from multiprocessing import Process, Lock
  10. import numpy as np
  11. import json
  12. import pandas as pd
  13. from zmq import has
  14. class Subject (Process):
  15. cache = pd.DataFrame()
  16. lock = Lock()
  17. @staticmethod
  18. def log(_args):
  19. Subject.lock.acquire()
  20. try:
  21. Subject.cache = Subject.cache.append(pd.DataFrame([_args]))
  22. except Exception as e :
  23. print (e)
  24. finally:
  25. Subject.lock.release()
  26. def __init__(self,**_args):
  27. super().__init__()
  28. self.observers = _args['observers']
  29. self.index = 0
  30. self.name = _args['name']
  31. self.table = self.observers[1].table
  32. self.m = {}
  33. pass
  34. def run(self):
  35. self.notify()
  36. def notify(self):
  37. if self.index < len(self.observers) :
  38. observer = self.observers[self.index]
  39. _observer = None if self.index == 0 else self.observers[self.index -1]
  40. _invalues = None if not _observer else _observer.get()
  41. if _observer is None :
  42. self.m['table'] = self.name
  43. observer.init(caller=self,invalues = _invalues)
  44. self.index += 1
  45. observer.execute()
  46. print ({"table":self.table,"module":observer.name(),"status":observer.status})
  47. # self.m[observer.name()] = observer.status
  48. else:
  49. pass
  50. class Worker :
  51. def __init__(self,**_args):
  52. #PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  53. #CONFIG = json.loads((open(PATH)).read())
  54. self._info = _args['store']
  55. self.logs = []
  56. self.schema = _args['schema']
  57. self.prefix = _args['prefix']
  58. self.status = 0
  59. def name(self):
  60. return self.__class__.__name__
  61. def log (self,**_args):
  62. """
  63. This function is designed to log to either the console or a data-store
  64. """
  65. # print (_args)
  66. pass
  67. def init(self,**_args):
  68. """
  69. Initializing a worker with arguments needed for it to perform it's task basic information needed are
  70. :param caller caller to be notified
  71. :param store data-store information i.e (pgsql,couchdb, mongo ...)
  72. """
  73. self.caller = _args['caller']
  74. #self._info = _args['store']
  75. self._invalues = _args['invalues'] if 'invalues' in _args else None
  76. def execute(self):
  77. try:
  78. self._apply()
  79. except Exception as error:
  80. pass
  81. finally:
  82. if hasattr(self,'caller') :
  83. self.caller.notify()
  84. def _apply(self):
  85. pass
  86. def get(self):
  87. pass
  88. def notify(self):
  89. self.caller.notify()
  90. def tablename(self,name) :
  91. PREFIX_SEPARATOR = '_' if '_' not in self.prefix else ''
  92. SCHEMA_SEPARATOR = '' if self.schema.strip() =='' else '.'
  93. TABLE_NAME = PREFIX_SEPARATOR.join([self.prefix,name])
  94. return SCHEMA_SEPARATOR.join([self.schema,TABLE_NAME])
  95. class CreateSQL(Worker) :
  96. """
  97. This class is intended to create an SQL Table given the
  98. """
  99. def __init__(self,**_args):
  100. super().__init__(**_args)
  101. self._sql = _args['sql']
  102. def init(self,**_args):
  103. super().init(**_args)
  104. def _apply(self) :
  105. sqltable = self._info['table'] if 'provider' in self._info else self._info['args']['table']
  106. sqltable = self.tablename(sqltable)
  107. # log = {"context":self.name(),"args":{"table":self._info['args']['table'],"sql":self._sql}}
  108. log = {"context":self.name(),"args":{"table":sqltable,"sql":self._sql.replace(":table",sqltable)}}
  109. try:
  110. writer = transport.factory.instance(**self._info)
  111. writer.apply(self._sql.replace(":table",sqltable))
  112. writer.close()
  113. log['status'] = 1
  114. self.status = 1
  115. except Exception as e:
  116. log['status'] = 0
  117. log['info'] = {"error":e.args[0]}
  118. # print (e)
  119. finally:
  120. self.log(**log)
  121. class Reader(Worker):
  122. """
  123. read from mongodb and and make the data available to a third party
  124. :param pipeline mongodb command
  125. :param max_rows maximum rows to be written in a single insert
  126. """
  127. def __init__(self,**_args):
  128. super().__init__(**_args)
  129. # self.pipeline = _args['mongo'] #-- pipeline in the context of mongodb NOT ETL
  130. # self.pipeline = _args['mongo'] if 'mongo' in _args else _args['sql']
  131. self.pipeline = _args['read'] ;
  132. self.MAX_ROWS = _args['max_rows']
  133. self.table = _args['table'] #-- target table
  134. # is_demo = 'features' not in _args or ('features' in _args and ('export_etl' not in _args['features'] or _args['features']['export_etl'] == 0))
  135. #
  136. # @TODO: Bundle the limits with the features so as to insure that it doesn't come across as a magic number
  137. #
  138. # LIMIT = -1
  139. # if is_demo :
  140. # LIMIT = 10000
  141. # if set(['find','distinct']) & set(self.pipeline.keys()) :
  142. # self.pipeline['limit'] = LIMIT
  143. # elif 'aggregate' in self.pipeline :
  144. # self.pipeline['pipeline'] = [{"$limit":LIMIT}] + self.pipeline['pipeline']
  145. # self.log(**{"context":self.name(),"demo":is_demo,"args":{"limit":LIMIT}})
  146. def init(self,**_args):
  147. super().init(**_args)
  148. self.rows = []
  149. def _apply(self):
  150. try:
  151. if 'type' in self._info :
  152. self._info['type'] = self._info['type'].replace('Writer','Reader')
  153. if 'fields' in self._info['args'] :
  154. del self._info['args']['fields']
  155. else:
  156. self._info['context'] = 'read'
  157. self.reader = transport.factory.instance(**self._info) ;
  158. # self.rows = self.reader.read(mongo=self.pipeline)
  159. self.rows = self.reader.read(**self.pipeline)
  160. if type(self.rows) == pd.DataFrame :
  161. self.rows = self.rows.to_dict(orient='records')
  162. # if 'provider' in self._info and self._info['provider'] == 'sqlite' :
  163. # self.rows = self.rows.apply(lambda row: json.loads(row.data),axis=1).tolist()
  164. N = len(self.rows) / self.MAX_ROWS if len(self.rows) > self.MAX_ROWS else 1
  165. N = int(N)
  166. # self.rows = rows
  167. _log = {"context":self.name(), "status":1,"info":{"rows":len(self.rows),"table":self.table,"segments":N}}
  168. self.rows = np.array_split(self.rows,N)
  169. # self.get = lambda : rows #np.array_split(rows,N)
  170. self.reader.close()
  171. self.status = 1
  172. #
  173. except Exception as e :
  174. _log['status'] = 0
  175. _log['info'] = {"error":e.args[0]}
  176. print ([e])
  177. self.log(**_log)
  178. # @TODO: Call the caller and notify it that this here is done
  179. def get(self):
  180. return self.rows
  181. class Writer(Worker):
  182. def __init__(self,**_args):
  183. super().__init__(**_args)
  184. if 'provider' in self._info :
  185. self._info['context'] = 'write'
  186. def init(self,**_args):
  187. """
  188. :param store output data-store needed for writing
  189. :param invalues input values with to be written somewhere
  190. """
  191. self._invalues = _args['invalues']
  192. def _apply(self):
  193. # table = self._info['args']['table'] if 'table' in self._info['args'] else 'N/A'
  194. # table = self.tablename(self._info['args']['table'])
  195. if 'provider' in self._info :
  196. table = self.tablename(self._info['table'])
  197. self._info['table'] = table
  198. else:
  199. table = self.tablename(self._info['args']['table'])
  200. self._info['args']['table'] = table
  201. writer = transport.factory.instance(**self._info)
  202. index = 0
  203. if self._invalues :
  204. for rows in self._invalues :
  205. # print (['segment # ',index,len(rows)])
  206. # self.log(**{"context":self.name(),"segment":(index+1),"args":{"rows":len(rows),"table":table}})
  207. if len(rows) > 0:
  208. #
  209. # @TODO: Upgrade to mongodb 4.0+ and remove the line below
  210. # Upon upgrade use the operator "$toString" in export.init function
  211. #
  212. rows = [dict(item,**{"_id":str(item["_id"])}) for item in rows]
  213. _df = pd.DataFrame(rows)
  214. writer.write(_df)
  215. index += 1
  216. # for _e in rows :
  217. # writer.write(_e)
  218. self.status = 1
  219. else:
  220. print ("No data was passed")
  221. writer.close()
  222. #_args = {"type":"mongo.MongoReader","args":{"db":"parserio","doc":"logs"}}
  223. #reader = Reader()
  224. #reader.init(store = _args,pipeline={"distinct":"claims","key":"name"})
  225. #reader._apply()
  226. #print (reader.get())
  227. #for row in reader.get() :
  228. # print (row)