proxy.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. """
  2. This file serves as proxy to healthcare-io, it will be embedded into the API
  3. """
  4. import os
  5. import transport
  6. import numpy as np
  7. from healthcareio import x12
  8. import pandas as pd
  9. import smart
  10. from healthcareio.analytics import Apex
  11. import time
  12. class get :
  13. PROCS = []
  14. PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  15. @staticmethod
  16. def resume (files,args):
  17. """
  18. This function will determine the appropriate files to be processed by performing a simple complementary set operation against the logs
  19. @TODO: Support data-stores other than mongodb
  20. :param files list of files within a folder
  21. :param _args configuration
  22. """
  23. _args = args['store'].copy()
  24. if 'mongo' in _args['type'] :
  25. _args['type'] = 'mongo.MongoReader'
  26. reader = transport.factory.instance(**_args)
  27. _files = []
  28. try:
  29. pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}]
  30. _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
  31. _files = reader.read(mongo = _args)
  32. _files = [item['name'] for item in _files]
  33. except Exception as e :
  34. pass
  35. print (["found ",len(files),"\tProcessed ",len(_files)])
  36. return list(set(files) - set(_files))
  37. @staticmethod
  38. def processes(_args):
  39. _info = pd.DataFrame(smart.top.read(name='healthcare-io.py'))[['name','cpu','mem']]
  40. if _info.shape[0] == 0 :
  41. _info = pd.DataFrame({"name":["healthcare-io.py"],"cpu":[0],"mem":[0]})
  42. # _info = pd.DataFrame(_info.groupby(['name']).sum())
  43. # _info['name'] = ['healthcare-io.py']
  44. m = {'cpu':'CPU','mem':'RAM','name':'name'}
  45. _info.columns = [m[name] for name in _info.columns.tolist()]
  46. _info.index = np.arange(_info.shape[0])
  47. charts = []
  48. for label in ['CPU','RAM'] :
  49. value = _info[label].sum()
  50. df = pd.DataFrame({"name":[label],label:[value]})
  51. charts.append (
  52. Apex.apply(
  53. {"data":df, "chart":{"type":"radial","axis":{"x":label,"y":"name"}}}
  54. )['apex']
  55. )
  56. #
  57. # This will update the counts for the processes, upon subsequent requests so as to show the change
  58. #
  59. N = 0
  60. lprocs = []
  61. for proc in get.PROCS :
  62. if proc.is_alive() :
  63. lprocs.append(proc)
  64. N = len(lprocs)
  65. get.PROCS = lprocs
  66. return {"process":{"chart":charts,"counts":N}}
  67. @staticmethod
  68. def files (_args):
  69. _info = smart.folder.read(path='/data')
  70. N = _info.files.tolist()[0]
  71. if 'mongo' in _args['store']['type'] :
  72. store_args = dict(_args['store'].copy(),**{"type":"mongo.MongoReader"})
  73. # reader = transport.factory.instance(**_args)
  74. pipeline = [{"$group":{"_id":"$name","count":{"$sum":{"$cond":[{"$eq":["$completed",True]},1,0]}} }},{"$group":{"_id":None,"count":{"$sum":"$count"}}},{"$project":{"_id":0,"status":"completed","count":1}}]
  75. query = {"mongo":{"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline}}
  76. # _info = pd.DataFrame(reader.read(mongo={"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline}))
  77. pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
  78. _query = {"mongo":{"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}} #-- distribution claims/remits
  79. else:
  80. store_args = dict(_args['store'].copy(),**{"type":"disk.SQLiteReader"})
  81. store_args['args']['table'] = 'logs'
  82. query= {"sql":"select count(distinct json_extract(data,'$.name')) as count, 'completed' status from logs where json_extract(data,'$.completed') = true"}
  83. _query={"sql":"select json_extract(data,'$.parse') as type,count(distinct json_extract(data,'$.name')) as count from logs group by type"} #-- distribution claim/remits
  84. reader = transport.factory.instance(**store_args)
  85. _info = pd.DataFrame(reader.read(**query))
  86. if not _info.shape[0] :
  87. _info = pd.DataFrame({"status":["completed"],"count":[0]})
  88. _info['count'] = np.round( (_info['count'] * 100 )/N,2)
  89. charts = [Apex.apply({"data":_info,"chart":{"type":"radial","axis":{"y":"status","x":"count"}}})['apex']]
  90. #
  91. # Let us classify the files now i.e claims / remits
  92. #
  93. # pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
  94. # _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
  95. # r = pd.DataFrame(reader.read(mongo=_args))
  96. r = pd.DataFrame(reader.read(**_query)) #-- distribution claims/remits
  97. r = Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})['apex']
  98. r['chart']['height'] = '100%'
  99. r['legend']['position'] = 'bottom'
  100. charts += [r]
  101. return {"files":{"counts":N,"chart":charts}}
  102. pass
  103. #
  104. # Process handling ....
  105. def run (_args) :
  106. """
  107. This function will run the jobs and insure as processes (as daemons).
  108. :param _args system configuration
  109. """
  110. FILES = []
  111. BATCH = int(_args['args']['batch']) #-- number of processes (poorly named variable)
  112. for root,_dir,f in os.walk(_args['args']['folder']) :
  113. if f :
  114. FILES += [os.sep.join([root,name]) for name in f]
  115. FILES = get.resume(FILES,_args)
  116. FILES = np.array_split(FILES,BATCH)
  117. for FILE_GROUP in FILES :
  118. FILE_GROUP = FILE_GROUP.tolist()
  119. # logger.write({"process":index,"parse":_args['parse'],"file_count":len(row)})
  120. # proc = Process(target=apply,args=(row,info['store'],_info,))
  121. parser = x12.Parser(get.PATH) #os.sep.join([PATH,'config.json']))
  122. parser.set.files(FILE_GROUP)
  123. parser.daemon = True
  124. parser.start()
  125. get.PROCS.append(parser)
  126. time.sleep(3)
  127. #
  128. # @TODO:consider submitting an update to clients via publish/subscribe framework
  129. #
  130. return get.PROCS
  131. def stop(_args):
  132. for job in get.PROCS :
  133. if job.is_alive() :
  134. job.terminate()
  135. get.PROCS = []
  136. #
  137. # @TODO: consider submitting an update to clients via publish/subscribe framework
  138. pass
  139. def write(src_args,dest_args,files) :
  140. #
  141. # @TODO: Support for SQLite
  142. pass
  143. def publish (src_args,dest_args,folder="/data"):
  144. FILES = []
  145. for root,_dir,f in os.walk(folder) :
  146. if f :
  147. FILES += [os.sep.join([root,name]) for name in f]
  148. #
  149. # @TODO: Add support for SQLite ....
  150. FILES = np.array_split(FILES,4)