123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- """
- HealthcareIO - The Phi Technology LLC 2020
- This file contains functionalities that implement elements of an ETL pipeline that will consist of various workers.
- The pipeline is built around an observer design pattern.
-
- @TODO: Integrate with airflow and other process monitoring tools
- """
- import transport
- import os
- from multiprocessing import Process, Lock
- import numpy as np
- import json
- import pandas as pd
- from zmq import has
- class Subject (Process):
- cache = pd.DataFrame()
- lock = Lock()
- @staticmethod
- def log(_args):
- Subject.lock.acquire()
- try:
- Subject.cache = Subject.cache.append(pd.DataFrame([_args]))
- except Exception as e :
- print (e)
- finally:
- Subject.lock.release()
- def __init__(self,**_args):
- super().__init__()
- self.observers = _args['observers']
- self.index = 0
- self.name = _args['name']
- self.table = self.observers[1].table
- self.m = {}
-
-
- pass
- def run(self):
- self.notify()
- def notify(self):
- if self.index < len(self.observers) :
-
- observer = self.observers[self.index]
- _observer = None if self.index == 0 else self.observers[self.index -1]
- _invalues = None if not _observer else _observer.get()
- if _observer is None :
- self.m['table'] = self.name
-
- observer.init(caller=self,invalues = _invalues)
- self.index += 1
- observer.execute()
- print ({"table":self.table,"module":observer.name(),"status":observer.status})
- # self.m[observer.name()] = observer.status
-
- else:
- pass
-
-
-
-
- class Worker :
- def __init__(self,**_args):
- #PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
- #CONFIG = json.loads((open(PATH)).read())
- self._info = _args['store']
- self.logs = []
- self.schema = _args['schema']
- self.prefix = _args['prefix']
- self.status = 0
-
- def name(self):
- return self.__class__.__name__
- def log (self,**_args):
- """
- This function is designed to log to either the console or a data-store
- """
- # print (_args)
- pass
- def init(self,**_args):
- """
- Initializing a worker with arguments needed for it to perform it's task basic information needed are
- :param caller caller to be notified
- :param store data-store information i.e (pgsql,couchdb, mongo ...)
- """
- self.caller = _args['caller']
- #self._info = _args['store']
- self._invalues = _args['invalues'] if 'invalues' in _args else None
- def execute(self):
- try:
- self._apply()
- except Exception as error:
- pass
- finally:
- if hasattr(self,'caller') :
- self.caller.notify()
- def _apply(self):
- pass
- def get(self):
- pass
- def notify(self):
- self.caller.notify()
- def tablename(self,name) :
- PREFIX_SEPARATOR = '_' if '_' not in self.prefix else ''
- SCHEMA_SEPARATOR = '' if self.schema.strip() =='' else '.'
- TABLE_NAME = PREFIX_SEPARATOR.join([self.prefix,name])
- return SCHEMA_SEPARATOR.join([self.schema,TABLE_NAME])
-
- class CreateSQL(Worker) :
- """
- This class is intended to create an SQL Table given the
- """
- def __init__(self,**_args):
- super().__init__(**_args)
- self._sql = _args['sql']
-
- def init(self,**_args):
- super().init(**_args)
-
- def _apply(self) :
- sqltable = self._info['table'] if 'provider' in self._info else self._info['args']['table']
- sqltable = self.tablename(sqltable)
- # log = {"context":self.name(),"args":{"table":self._info['args']['table'],"sql":self._sql}}
- log = {"context":self.name(),"args":{"table":sqltable,"sql":self._sql.replace(":table",sqltable)}}
- try:
-
-
- writer = transport.factory.instance(**self._info)
- writer.apply(self._sql.replace(":table",sqltable))
- writer.close()
- log['status'] = 1
- self.status = 1
- except Exception as e:
- log['status'] = 0
- log['info'] = {"error":e.args[0]}
-
- # print (e)
- finally:
- self.log(**log)
- class Reader(Worker):
- """
- read from mongodb and and make the data available to a third party
- :param pipeline mongodb command
- :param max_rows maximum rows to be written in a single insert
- """
- def __init__(self,**_args):
- super().__init__(**_args)
-
-
- # self.pipeline = _args['mongo'] #-- pipeline in the context of mongodb NOT ETL
-
- # self.pipeline = _args['mongo'] if 'mongo' in _args else _args['sql']
- self.pipeline = _args['read'] ;
- self.MAX_ROWS = _args['max_rows']
- self.table = _args['table'] #-- target table
-
-
- # is_demo = 'features' not in _args or ('features' in _args and ('export_etl' not in _args['features'] or _args['features']['export_etl'] == 0))
- #
- # @TODO: Bundle the limits with the features so as to insure that it doesn't come across as a magic number
- #
- # LIMIT = -1
- # if is_demo :
- # LIMIT = 10000
- # if set(['find','distinct']) & set(self.pipeline.keys()) :
- # self.pipeline['limit'] = LIMIT
- # elif 'aggregate' in self.pipeline :
-
- # self.pipeline['pipeline'] = [{"$limit":LIMIT}] + self.pipeline['pipeline']
- # self.log(**{"context":self.name(),"demo":is_demo,"args":{"limit":LIMIT}})
-
- def init(self,**_args):
- super().init(**_args)
- self.rows = []
-
-
- def _apply(self):
- try:
- if 'type' in self._info :
- self._info['type'] = self._info['type'].replace('Writer','Reader')
- if 'fields' in self._info['args'] :
- del self._info['args']['fields']
- else:
- self._info['context'] = 'read'
- self.reader = transport.factory.instance(**self._info) ;
-
- # self.rows = self.reader.read(mongo=self.pipeline)
- self.rows = self.reader.read(**self.pipeline)
-
- if type(self.rows) == pd.DataFrame :
- self.rows = self.rows.to_dict(orient='records')
- # if 'provider' in self._info and self._info['provider'] == 'sqlite' :
- # self.rows = self.rows.apply(lambda row: json.loads(row.data),axis=1).tolist()
-
- N = len(self.rows) / self.MAX_ROWS if len(self.rows) > self.MAX_ROWS else 1
- N = int(N)
- # self.rows = rows
-
- _log = {"context":self.name(), "status":1,"info":{"rows":len(self.rows),"table":self.table,"segments":N}}
- self.rows = np.array_split(self.rows,N)
-
-
- # self.get = lambda : rows #np.array_split(rows,N)
- self.reader.close()
- self.status = 1
- #
- except Exception as e :
- _log['status'] = 0
- _log['info'] = {"error":e.args[0]}
- print ([e])
-
- self.log(**_log)
-
- # @TODO: Call the caller and notify it that this here is done
- def get(self):
- return self.rows
-
- class Writer(Worker):
- def __init__(self,**_args):
- super().__init__(**_args)
- if 'provider' in self._info :
- self._info['context'] = 'write'
-
- def init(self,**_args):
- """
- :param store output data-store needed for writing
- :param invalues input values with to be written somewhere
- """
-
-
- self._invalues = _args['invalues']
-
-
- def _apply(self):
-
- # table = self._info['args']['table'] if 'table' in self._info['args'] else 'N/A'
- # table = self.tablename(self._info['args']['table'])
- if 'provider' in self._info :
- table = self.tablename(self._info['table'])
- self._info['table'] = table
- else:
- table = self.tablename(self._info['args']['table'])
- self._info['args']['table'] = table
-
- writer = transport.factory.instance(**self._info)
-
- index = 0
-
- if self._invalues :
- for rows in self._invalues :
- # print (['segment # ',index,len(rows)])
-
- # self.log(**{"context":self.name(),"segment":(index+1),"args":{"rows":len(rows),"table":table}})
-
- if len(rows) > 0:
- #
- # @TODO: Upgrade to mongodb 4.0+ and remove the line below
- # Upon upgrade use the operator "$toString" in export.init function
- #
- rows = [dict(item,**{"_id":str(item["_id"])}) for item in rows]
- _df = pd.DataFrame(rows)
- writer.write(_df)
- index += 1
- # for _e in rows :
- # writer.write(_e)
-
- self.status = 1
- else:
- print ("No data was passed")
-
-
- writer.close()
- #_args = {"type":"mongo.MongoReader","args":{"db":"parserio","doc":"logs"}}
- #reader = Reader()
- #reader.init(store = _args,pipeline={"distinct":"claims","key":"name"})
- #reader._apply()
- #print (reader.get())
- #for row in reader.get() :
- # print (row)
|