__init__.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. from flask import Flask, request,render_template, send_from_directory
  2. from healthcareio.params import SYS_ARGS
  3. import healthcareio.analytics
  4. import os
  5. import json
  6. import time
  7. import smart
  8. import transport
  9. import pandas as pd
  10. import numpy as np
  11. from healthcareio import x12
  12. from multiprocessing import Process
  13. # from flask_socketio import SocketIO, emit, disconnect,send
  14. from healthcareio.server import proxy
  15. PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  16. app = Flask(__name__)
  17. # socket_ = SocketIO(app)
  18. def resume (files):
  19. _args = SYS_ARGS['config']['store'].copy()
  20. if 'mongo' in SYS_ARGS['config']['store']['type'] :
  21. _args['type'] = 'mongo.MongoReader'
  22. reader = transport.factory.instance(**_args)
  23. _files = []
  24. try:
  25. pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}]
  26. _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
  27. _files = reader.read(mongo = _args)
  28. _files = [item['name'] for item in _files]
  29. except Exception as e :
  30. pass
  31. return list(set(files) - set(_files))
  32. def run ():
  33. #
  34. # let's get the files in the folder (perhaps recursively traverse them)
  35. #
  36. FILES = []
  37. BATCH = int(SYS_ARGS['config']['args']['batch']) #-- number of processes (poorly named variable)
  38. for root,_dir,f in os.walk(SYS_ARGS['config']['args']['folder']) :
  39. if f :
  40. FILES += [os.sep.join([root,name]) for name in f]
  41. FILES = resume(FILES)
  42. FILES = np.array_split(FILES,BATCH)
  43. procs = []
  44. for FILE_GROUP in FILES :
  45. FILE_GROUP = FILE_GROUP.tolist()
  46. # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
  47. # proc = Process(target=apply,args=(row,info['store'],_info,))
  48. parser = x12.Parser(PATH) #os.sep.join([PATH,'config.json']))
  49. parser.set.files(FILE_GROUP)
  50. parser.start()
  51. procs.append(parser)
  52. SYS_ARGS['procs'] = procs
  53. # @socket_.on('data',namespace='/stream')
  54. def push() :
  55. _args = dict(SYS_ARGS['config']['store'].copy(),**{"type":"mongo.MongoReader"})
  56. reader = transport.factory.instance(**_args)
  57. pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
  58. _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
  59. r = pd.DataFrame(reader.read(mongo=_args))
  60. r = healthcareio.analytics.Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})
  61. # emit("update",r,json=True)
  62. return r
  63. # @socket_.on('connect')
  64. # def client_connect(**r):
  65. # print ('Connection received')
  66. # print (r)
  67. # push()
  68. # pass
  69. @app.route("/favicon.ico")
  70. def _icon():
  71. return send_from_directory(os.path.join([app.root_path, 'static','img','logo.svg']),
  72. 'favicon.ico', mimetype='image/vnd.microsoft.icon')
  73. @app.route("/")
  74. def init():
  75. e = SYS_ARGS['engine']
  76. sections = {"remits":e.info['835'],"claims":e.info['837']}
  77. _args = {"sections":sections,"store":SYS_ARGS["config"]["store"],"owner":SYS_ARGS['config']['owner'],"args":SYS_ARGS["config"]["args"]}
  78. return render_template("index.html",**_args)
  79. @app.route("/format/<id>/<index>",methods=['POST'])
  80. def _format(id,index):
  81. e = SYS_ARGS['engine']
  82. key = '837' if id == 'claims' else '835'
  83. index = int(index)
  84. # p = e.info[key][index]
  85. p = e.filter(type=id,index=index)
  86. r = []
  87. for item in p['pipeline'] :
  88. _item= dict(item)
  89. _item = dict(_item,**healthcareio.analytics.Apex.apply(item))
  90. del _item['data']
  91. if 'apex' in _item or 'html' in _item:
  92. r.append(_item)
  93. r = {"id":p['id'],"pipeline":r}
  94. return json.dumps(r),200
  95. @app.route("/get/<id>/<index>",methods=['GET'])
  96. def get(id,index):
  97. e = SYS_ARGS['engine']
  98. key = '837' if id == 'claims' else '835'
  99. index = int(index)
  100. # p = e.info[key][index]
  101. p = e.filter(type=id,index=index)
  102. r = {}
  103. for item in p[0]['pipeline'] :
  104. _item= [dict(item)]
  105. # r[item['label']] = item['data'].to_dict(orient='record')
  106. r[item['label']] = item['data'].to_dict('record')
  107. return json.dumps(r),200
  108. @app.route("/reset",methods=["POST"])
  109. def reset():
  110. return "1",200
  111. @app.route("/data",methods=['GET'])
  112. def get_data ():
  113. """
  114. This function will return statistical data about the services i.e general statistics about what has/been processed
  115. """
  116. HEADER = {"Content-type":"application/json"}
  117. _args = SYS_ARGS['config']
  118. options = dict(proxy.get.files(_args),**proxy.get.processes(_args))
  119. return json.dumps(options),HEADER
  120. @app.route("/log/<id>",methods=["POST","PUT","GET"])
  121. def log(id) :
  122. HEADER = {"Content-Type":"application/json; charset=utf8"}
  123. if id == 'params' and request.method in ['PUT', 'POST']:
  124. info = request.json
  125. _args = {"batch":info['batch'] if 'batch' in info else 1,"resume":True}
  126. #
  127. # We should update the configuration
  128. SYS_ARGS['config']['args'] = _args
  129. PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  130. write = lambda content: (open(PATH,'w')).write(json.dumps(content))
  131. proc = Process(target=write,args=(SYS_ARGS['config'],))
  132. proc.start()
  133. return "1",HEADER
  134. pass
  135. @app.route("/io/<id>",methods=['POST'])
  136. def io_data(id):
  137. if id == 'params' :
  138. _args = request.json
  139. #
  140. # Expecting batch,folder as parameters
  141. _args = request.json
  142. _args['resume'] = True
  143. print (_args)
  144. #
  145. # We should update the configuration
  146. SYS_ARGS['config']['args'] = _args
  147. # PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  148. try:
  149. write = lambda content: (open(PATH,'w')).write(json.dumps(content))
  150. proc = Process(target=write,args=(SYS_ARGS['config'],))
  151. proc.start()
  152. # proc.join()
  153. return "1",200
  154. except Exception as e :
  155. return "0",403
  156. pass
  157. elif id == 'stop' :
  158. stop()
  159. pass
  160. elif id == 'run' :
  161. # run()
  162. _args = {"args":SYS_ARGS['config']['args'],"store":SYS_ARGS["config"]["store"]}
  163. proxy.run(_args)
  164. return "1",200
  165. pass
  166. @app.route("/export")
  167. def export_form():
  168. _args = {"context":SYS_ARGS['context']}
  169. return render_template("store.html",**_args)
  170. @app.route("/export",methods=['POST','PUT'])
  171. def apply_etl():
  172. _info = request.json
  173. m = {'s3':'s3.s3Writer','mongo':'mongo.MongoWriter'}
  174. if _info :
  175. dest_args = {'type':m[_info['type']],"args": _info['content'] }
  176. src_args = SYS_ARGS['config']['store']
  177. # print (_args)
  178. # writer = transport.factory.instance(**_args)
  179. proxy.publish(src_args,dest_args)
  180. return "1",405
  181. else:
  182. return "0",404
  183. @app.route("/update")
  184. def update():
  185. pass
  186. return "0",405
  187. @app.route("/reload",methods=['POST'])
  188. def reload():
  189. # e = SYS_ARGS['engine']
  190. # e.apply()
  191. PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  192. e = healthcareio.analytics.engine(PATH)
  193. # e.apply()
  194. SYS_ARGS['engine'] = e
  195. return "1",200
  196. if __name__ == '__main__' :
  197. PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
  198. DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
  199. SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
  200. #
  201. #
  202. PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  203. #
  204. # Adjusting configuration with parameters (batch,folder,resume)
  205. if 'args' not in SYS_ARGS['config'] :
  206. SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
  207. SYS_ARGS['procs'] = []
  208. # SYS_ARGS['path'] = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  209. e = healthcareio.analytics.engine(PATH)
  210. e.apply(type='claims',serialize=False)
  211. SYS_ARGS['engine'] = e
  212. app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=True)