__init__.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. from flask import Flask, request,render_template, send_from_directory
  2. from healthcareio.params import SYS_ARGS
  3. import healthcareio.analytics
  4. import os
  5. import json
  6. import time
  7. # import smart
  8. import transport
  9. import pandas as pd
  10. import numpy as np
  11. from healthcareio import x12
  12. from healthcareio.export import export
  13. from multiprocessing import Process
  14. # from flask_socketio import SocketIO, emit, disconnect,send
  15. from healthcareio.server import proxy
  16. PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  17. app = Flask(__name__)
  18. # socket_ = SocketIO(app)
  19. def resume (files):
  20. _args = SYS_ARGS['config']['store'].copy()
  21. if 'mongo' in SYS_ARGS['config']['store']['type'] :
  22. _args['type'] = 'mongo.MongoReader'
  23. reader = transport.factory.instance(**_args)
  24. _files = []
  25. try:
  26. pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}]
  27. _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
  28. _files = reader.read(mongo = _args)
  29. _files = [item['name'] for item in _files]
  30. except Exception as e :
  31. pass
  32. return list(set(files) - set(_files))
  33. def run ():
  34. #
  35. # let's get the files in the folder (perhaps recursively traverse them)
  36. #
  37. FILES = []
  38. BATCH = int(SYS_ARGS['config']['args']['batch']) #-- number of processes (poorly named variable)
  39. for root,_dir,f in os.walk(SYS_ARGS['config']['args']['folder']) :
  40. if f :
  41. FILES += [os.sep.join([root,name]) for name in f]
  42. FILES = resume(FILES)
  43. FILES = np.array_split(FILES,BATCH)
  44. procs = []
  45. for FILE_GROUP in FILES :
  46. FILE_GROUP = FILE_GROUP.tolist()
  47. # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
  48. # proc = Process(target=apply,args=(row,info['store'],_info,))
  49. parser = x12.Parser(PATH) #os.sep.join([PATH,'config.json']))
  50. parser.set.files(FILE_GROUP)
  51. parser.start()
  52. procs.append(parser)
  53. SYS_ARGS['procs'] = procs
  54. # @socket_.on('data',namespace='/stream')
  55. def push() :
  56. _args = dict(SYS_ARGS['config']['store'].copy(),**{"type":"mongo.MongoReader"})
  57. reader = transport.factory.instance(**_args)
  58. pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
  59. _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
  60. r = pd.DataFrame(reader.read(mongo=_args))
  61. r = healthcareio.analytics.Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})
  62. # emit("update",r,json=True)
  63. return r
  64. # @socket_.on('connect')
  65. # def client_connect(**r):
  66. # print ('Connection received')
  67. # print (r)
  68. # push()
  69. # pass
  70. @app.route("/favicon.ico")
  71. def _icon():
  72. return send_from_directory(os.path.join([app.root_path, 'static','img','logo.svg']),
  73. 'favicon.ico', mimetype='image/vnd.microsoft.icon')
  74. @app.route("/")
  75. def init():
  76. e = SYS_ARGS['engine']
  77. sections = {"remits":e.info['835'],"claims":e.info['837']}
  78. _args = {"sections":sections,"store":SYS_ARGS["config"]["store"],"owner":SYS_ARGS['config']['owner'],"args":SYS_ARGS["config"]["args"]}
  79. return render_template("index.html",**_args)
  80. @app.route("/format/<id>/<index>",methods=['POST'])
  81. def _format(id,index):
  82. e = SYS_ARGS['engine']
  83. key = '837' if id == 'claims' else '835'
  84. index = int(index)
  85. # p = e.info[key][index]
  86. p = e.filter(type=id,index=index)
  87. r = []
  88. for item in p['pipeline'] :
  89. _item= dict(item)
  90. _item = dict(_item,**healthcareio.analytics.Apex.apply(item))
  91. del _item['data']
  92. if 'apex' in _item or 'html' in _item:
  93. r.append(_item)
  94. r = {"id":p['id'],"pipeline":r}
  95. return json.dumps(r),200
  96. @app.route("/get/<id>/<index>",methods=['GET'])
  97. def get(id,index):
  98. e = SYS_ARGS['engine']
  99. key = '837' if id == 'claims' else '835'
  100. index = int(index)
  101. # p = e.info[key][index]
  102. p = e.filter(type=id,index=index)
  103. r = {}
  104. for item in p[0]['pipeline'] :
  105. _item= [dict(item)]
  106. # r[item['label']] = item['data'].to_dict(orient='record')
  107. r[item['label']] = item['data'].to_dict('record')
  108. return json.dumps(r),200
  109. @app.route("/reset",methods=["POST"])
  110. def reset():
  111. return "1",200
  112. @app.route("/data",methods=['GET'])
  113. def get_data ():
  114. """
  115. This function will return statistical data about the services i.e general statistics about what has/been processed
  116. """
  117. HEADER = {"Content-type":"application/json"}
  118. _args = SYS_ARGS['config']
  119. options = dict(proxy.get.files(_args),**proxy.get.processes(_args))
  120. return json.dumps(options),HEADER
  121. @app.route("/log/<id>",methods=["POST","PUT","GET"])
  122. def log(id) :
  123. HEADER = {"Content-Type":"application/json; charset=utf8"}
  124. if id == 'params' and request.method in ['PUT', 'POST']:
  125. info = request.json
  126. _args = {"batch":info['batch'] if 'batch' in info else 1,"resume":True}
  127. #
  128. # We should update the configuration
  129. SYS_ARGS['config']['args'] = _args
  130. PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  131. write = lambda content: (open(PATH,'w')).write(json.dumps(content))
  132. proc = Process(target=write,args=(SYS_ARGS['config'],))
  133. proc.start()
  134. return "1",HEADER
  135. pass
  136. @app.route("/io/<id>",methods=['POST'])
  137. def io_data(id):
  138. if id == 'params' :
  139. _args = request.json
  140. #
  141. # Expecting batch,folder as parameters
  142. _args = request.json
  143. _args['resume'] = True
  144. print (_args)
  145. #
  146. # We should update the configuration
  147. SYS_ARGS['config']['args'] = _args
  148. # PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  149. try:
  150. write = lambda content: (open(PATH,'w')).write(json.dumps(content))
  151. proc = Process(target=write,args=(SYS_ARGS['config'],))
  152. proc.start()
  153. # proc.join()
  154. return "1",200
  155. except Exception as e :
  156. return "0",403
  157. pass
  158. elif id == 'stop' :
  159. stop()
  160. pass
  161. elif id == 'run' :
  162. # run()
  163. _args = {"args":SYS_ARGS['config']['args'],"store":SYS_ARGS["config"]["store"]}
  164. proxy.run(_args)
  165. return "1",200
  166. pass
  167. @app.route("/export")
  168. def export_form():
  169. _args = {"context":SYS_ARGS['context']}
  170. return render_template("store.html",**_args)
  171. @app.route("/export",methods=['POST','PUT'])
  172. def apply_etl():
  173. _info = request.json
  174. m = {'s3':'s3.s3Writer','mongo':'mongo.MongoWriter'}
  175. if _info :
  176. dest_args = {'type':m[_info['type']],"args": _info['content'] }
  177. src_args = SYS_ARGS['config']['store']
  178. # print (_args)
  179. # writer = transport.factory.instance(**_args)
  180. proxy.publish(src_args,dest_args)
  181. return "1",405
  182. else:
  183. return "0",404
  184. @app.route("/update")
  185. def update():
  186. pass
  187. return "0",405
  188. @app.route("/reload",methods=['POST'])
  189. def reload():
  190. # e = SYS_ARGS['engine']
  191. # e.apply()
  192. PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  193. e = healthcareio.analytics.engine(PATH)
  194. # e.apply()
  195. SYS_ARGS['engine'] = e
  196. return "1",200
  197. if __name__ == '__main__' :
  198. PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
  199. DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
  200. SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
  201. #
  202. #
  203. PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  204. #
  205. # Adjusting configuration with parameters (batch,folder,resume)
  206. SYS_ARGS['config'] = json.loads(open(PATH).read())
  207. if 'args' not in SYS_ARGS['config'] :
  208. SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
  209. SYS_ARGS['procs'] = []
  210. # SYS_ARGS['path'] = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  211. e = healthcareio.analytics.engine(PATH)
  212. e.apply(type='claims',serialize=False)
  213. SYS_ARGS['engine'] = e
  214. app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=True)