123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168 |
- """
- This file serves as proxy to healthcare-io, it will be embedded into the API
- """
- import os
- import transport
- import numpy as np
- from healthcareio import x12
- import pandas as pd
- import smart
- from healthcareio.analytics import Apex
- import time
- class get :
- PROCS = []
- PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
- @staticmethod
- def resume (files,args):
- """
- This function will determine the appropriate files to be processed by performing a simple complementary set operation against the logs
- @TODO: Support data-stores other than mongodb
- :param files list of files within a folder
- :param _args configuration
- """
- _args = args['store'].copy()
- if 'mongo' in _args['type'] :
- _args['type'] = 'mongo.MongoReader'
- reader = transport.factory.instance(**_args)
- _files = []
- try:
- pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}]
- _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
- _files = reader.read(mongo = _args)
- _files = [item['name'] for item in _files]
- except Exception as e :
- pass
- print ( [len(list(set(files) - set(_files))),' files to be processed'])
- return list(set(files) - set(_files))
- @staticmethod
- def processes(_args):
- APP_NAME ='healthcare-io'
- _info = smart.top.read(name=APP_NAME) #pd.DataFrame(smart.top.read(name='healthcare-io'))[['name','cpu','mem']]
-
-
- if _info.shape[0] == 0 :
- _info = pd.DataFrame({"name":[APP_NAME],"cpu":[0],"mem":[0]})
- # _info = pd.DataFrame(_info.groupby(['name']).sum())
- # _info['name'] = ['healthcare-io.py']
- m = {'cpu':'CPU','mem':'RAM','name':'name'}
- _info = _info.rename(columns=m)
- # _info.columns = [m[name] for name in _info.columns.tolist() if name in m]
- _info.index = np.arange(_info.shape[0])
- charts = []
- for label in ['CPU','RAM'] :
- value = _info[label].sum()
- df = pd.DataFrame({"name":[label],label:[value]})
- charts.append (
- Apex.apply(
- {"data":df, "chart":{"type":"radial","axis":{"x":label,"y":"name"}}}
- )['apex']
- )
-
- return {"process":{"chart":charts,"counts":_info.shape[0]}}
- @staticmethod
- def files (_args):
- folder = _args['args']['folder']
- _info = smart.folder.read(path=folder)
-
- N = _info.files.tolist()[0]
- store_args = _args['store'].copy()
- store_args['context'] = 'read'
-
- # if 'mongo' in _args['store']['type'] :
- if _args['store']['provider'] in ['mongo', 'mongodb']:
- # store_args = dict(_args['store'].copy(),**{"type":"mongo.MongoReader"})
- # reader = transport.factory.instance(**_args)
-
- pipeline = [{"$group":{"_id":"$name","count":{"$sum":{"$cond":[{"$eq":["$completed",True]},1,0]}} }},{"$group":{"_id":None,"count":{"$sum":"$count"}}},{"$project":{"_id":0,"status":"completed","count":1}}]
- query = {"mongo":{"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline}}
- # _info = pd.DataFrame(reader.read(mongo={"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline}))
- pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
- _query = {"mongo":{"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}} #-- distribution claims/remits
- else:
- # store_args = dict(_args['store'].copy(),**{"type":"disk.SQLiteReader"})
-
- # store_args['args']['table'] = 'logs'
- store_args['table'] = 'logs'
- query= {"sql":"select count(distinct json_extract(data,'$.name')) as count, 'completed' status from logs where json_extract(data,'$.completed') = true"}
- _query={"sql":"select json_extract(data,'$.parse') as type,count(distinct json_extract(data,'$.name')) as count from logs group by type"} #-- distribution claim/remits
- reader = transport.factory.instance(**store_args)
- _info = pd.DataFrame(reader.read(**query))
-
- if not _info.shape[0] :
- _info = pd.DataFrame({"status":["completed"],"count":[0]})
- _info['count'] = np.round( (_info['count'] * 100 )/N,2)
-
- charts = [Apex.apply({"data":_info,"chart":{"type":"radial","axis":{"y":"status","x":"count"}}})['apex']]
- #
- # Let us classify the files now i.e claims / remits
- #
- r = pd.DataFrame(reader.read(**_query)) #-- distribution claims/remits
- r = Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})['apex']
- r['chart']['height'] = '100%'
- r['legend']['position'] = 'bottom'
- charts += [r]
-
- return {"files":{"counts":N,"chart":charts}}
- pass
- #
- # Process handling ....
- def run (_args) :
- """
- This function will run the jobs and insure as processes (as daemons).
- :param _args system configuration
- """
- FILES = []
- BATCH = int(_args['args']['batch']) #-- number of processes (poorly named variable)
- for root,_dir,f in os.walk(_args['args']['folder']) :
- if f :
- FILES += [os.sep.join([root,name]) for name in f]
- FILES = get.resume(FILES,_args)
- FILES = np.array_split(FILES,BATCH)
-
- for FILE_GROUP in FILES :
-
- FILE_GROUP = FILE_GROUP.tolist()
- # logger.write({"process":index,"parse":_args['parse'],"file_count":len(row)})
- # proc = Process(target=apply,args=(row,info['store'],_info,))
- parser = x12.Parser(get.PATH) #os.sep.join([PATH,'config.json']))
- parser.set.files(FILE_GROUP)
- parser.daemon = True
- parser.start()
- get.PROCS.append(parser)
- time.sleep(3)
- #
- # @TODO:consider submitting an update to clients via publish/subscribe framework
- #
- return get.PROCS
- def stop(_args):
- for job in get.PROCS :
- if job.is_alive() :
- job.terminate()
- get.PROCS = []
- #
- # @TODO: consider submitting an update to clients via publish/subscribe framework
- pass
- def write(src_args,dest_args,files) :
- #
- # @TODO: Support for SQLite
- pass
- def publish (src_args,dest_args,folder="/data"):
- FILES = []
- for root,_dir,f in os.walk(folder) :
- if f :
- FILES += [os.sep.join([root,name]) for name in f]
- #
- # @TODO: Add support for SQLite ....
-
- FILES = np.array_split(FILES,4)
-
|