proxy.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. """
  2. This file serves as proxy to healthcare-io, it will be embedded into the API
  3. """
  4. import os
  5. import transport
  6. import numpy as np
  7. from healthcareio import x12
  8. import pandas as pd
  9. import smart
  10. from healthcareio.analytics import Apex
  11. import time
  12. class get :
  13. PROCS = []
  14. PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  15. @staticmethod
  16. def resume (files,args):
  17. """
  18. This function will determine the appropriate files to be processed by performing a simple complementary set operation against the logs
  19. @TODO: Support data-stores other than mongodb
  20. :param files list of files within a folder
  21. :param _args configuration
  22. """
  23. _args = args['store'].copy()
  24. if 'mongo' in _args['type'] :
  25. _args['type'] = 'mongo.MongoReader'
  26. reader = transport.factory.instance(**_args)
  27. _files = []
  28. try:
  29. pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}]
  30. _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
  31. _files = reader.read(mongo = _args)
  32. _files = [item['name'] for item in _files]
  33. except Exception as e :
  34. pass
  35. print ( [len(list(set(files) - set(_files))),' files to be processed'])
  36. return list(set(files) - set(_files))
  37. @staticmethod
  38. def processes(_args):
  39. APP_NAME ='healthcare-io'
  40. _info = smart.top.read(name=APP_NAME) #pd.DataFrame(smart.top.read(name='healthcare-io'))[['name','cpu','mem']]
  41. if _info.shape[0] == 0 :
  42. _info = pd.DataFrame({"name":[APP_NAME],"cpu":[0],"mem":[0]})
  43. # _info = pd.DataFrame(_info.groupby(['name']).sum())
  44. # _info['name'] = ['healthcare-io.py']
  45. m = {'cpu':'CPU','mem':'RAM','name':'name'}
  46. _info = _info.rename(columns=m)
  47. # _info.columns = [m[name] for name in _info.columns.tolist() if name in m]
  48. _info.index = np.arange(_info.shape[0])
  49. charts = []
  50. for label in ['CPU','RAM'] :
  51. value = _info[label].sum()
  52. df = pd.DataFrame({"name":[label],label:[value]})
  53. charts.append (
  54. Apex.apply(
  55. {"data":df, "chart":{"type":"radial","axis":{"x":label,"y":"name"}}}
  56. )['apex']
  57. )
  58. return {"process":{"chart":charts,"counts":_info.shape[0]}}
  59. @staticmethod
  60. def files (_args):
  61. folder = _args['args']['folder']
  62. _info = smart.folder.read(path=folder)
  63. N = _info.files.tolist()[0]
  64. store_args = _args['store'].copy()
  65. store_args['context'] = 'read'
  66. # if 'mongo' in _args['store']['type'] :
  67. if _args['store']['provider'] in ['mongo', 'mongodb']:
  68. # store_args = dict(_args['store'].copy(),**{"type":"mongo.MongoReader"})
  69. # reader = transport.factory.instance(**_args)
  70. pipeline = [{"$group":{"_id":"$name","count":{"$sum":{"$cond":[{"$eq":["$completed",True]},1,0]}} }},{"$group":{"_id":None,"count":{"$sum":"$count"}}},{"$project":{"_id":0,"status":"completed","count":1}}]
  71. query = {"mongo":{"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline}}
  72. # _info = pd.DataFrame(reader.read(mongo={"aggregate":"logs","allowDiskUse":True,"cursor":{},"pipeline":pipeline}))
  73. pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
  74. _query = {"mongo":{"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}} #-- distribution claims/remits
  75. else:
  76. # store_args = dict(_args['store'].copy(),**{"type":"disk.SQLiteReader"})
  77. # store_args['args']['table'] = 'logs'
  78. store_args['table'] = 'logs'
  79. query= {"sql":"select count(distinct json_extract(data,'$.name')) as count, 'completed' status from logs where json_extract(data,'$.completed') = true"}
  80. _query={"sql":"select json_extract(data,'$.parse') as type,count(distinct json_extract(data,'$.name')) as count from logs group by type"} #-- distribution claim/remits
  81. reader = transport.factory.instance(**store_args)
  82. _info = pd.DataFrame(reader.read(**query))
  83. if not _info.shape[0] :
  84. _info = pd.DataFrame({"status":["completed"],"count":[0]})
  85. _info['count'] = np.round( (_info['count'] * 100 )/N,2)
  86. charts = [Apex.apply({"data":_info,"chart":{"type":"radial","axis":{"y":"status","x":"count"}}})['apex']]
  87. #
  88. # Let us classify the files now i.e claims / remits
  89. #
  90. r = pd.DataFrame(reader.read(**_query)) #-- distribution claims/remits
  91. r = Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})['apex']
  92. r['chart']['height'] = '100%'
  93. r['legend']['position'] = 'bottom'
  94. charts += [r]
  95. return {"files":{"counts":N,"chart":charts}}
  96. pass
  97. #
  98. # Process handling ....
  99. def run (_args) :
  100. """
  101. This function will run the jobs and insure as processes (as daemons).
  102. :param _args system configuration
  103. """
  104. FILES = []
  105. BATCH = int(_args['args']['batch']) #-- number of processes (poorly named variable)
  106. for root,_dir,f in os.walk(_args['args']['folder']) :
  107. if f :
  108. FILES += [os.sep.join([root,name]) for name in f]
  109. FILES = get.resume(FILES,_args)
  110. FILES = np.array_split(FILES,BATCH)
  111. for FILE_GROUP in FILES :
  112. FILE_GROUP = FILE_GROUP.tolist()
  113. # logger.write({"process":index,"parse":_args['parse'],"file_count":len(row)})
  114. # proc = Process(target=apply,args=(row,info['store'],_info,))
  115. parser = x12.Parser(get.PATH) #os.sep.join([PATH,'config.json']))
  116. parser.set.files(FILE_GROUP)
  117. parser.daemon = True
  118. parser.start()
  119. get.PROCS.append(parser)
  120. time.sleep(3)
  121. #
  122. # @TODO:consider submitting an update to clients via publish/subscribe framework
  123. #
  124. return get.PROCS
  125. def stop(_args):
  126. for job in get.PROCS :
  127. if job.is_alive() :
  128. job.terminate()
  129. get.PROCS = []
  130. #
  131. # @TODO: consider submitting an update to clients via publish/subscribe framework
  132. pass
  133. def write(src_args,dest_args,files) :
  134. #
  135. # @TODO: Support for SQLite
  136. pass
  137. def publish (src_args,dest_args,folder="/data"):
  138. FILES = []
  139. for root,_dir,f in os.walk(folder) :
  140. if f :
  141. FILES += [os.sep.join([root,name]) for name in f]
  142. #
  143. # @TODO: Add support for SQLite ....
  144. FILES = np.array_split(FILES,4)