123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- from flask import Flask, request,render_template, send_from_directory
- from healthcareio.params import SYS_ARGS
- import healthcareio.analytics
- import os
- import json
- import time
- # import smart
- import transport
- import pandas as pd
- import numpy as np
- from healthcareio import x12
- from healthcareio.export import export
- from multiprocessing import Process
- # from flask_socketio import SocketIO, emit, disconnect,send
- from healthcareio.server import proxy
- PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
- app = Flask(__name__)
- # socket_ = SocketIO(app)
- def resume (files):
- _args = SYS_ARGS['config']['store'].copy()
- if 'mongo' in SYS_ARGS['config']['store']['type'] :
- _args['type'] = 'mongo.MongoReader'
- reader = transport.factory.instance(**_args)
- _files = []
- try:
- pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}]
- _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
- _files = reader.read(mongo = _args)
- _files = [item['name'] for item in _files]
- except Exception as e :
- pass
-
- return list(set(files) - set(_files))
-
-
- def run ():
- #
- # let's get the files in the folder (perhaps recursively traverse them)
- #
- FILES = []
- BATCH = int(SYS_ARGS['config']['args']['batch']) #-- number of processes (poorly named variable)
- for root,_dir,f in os.walk(SYS_ARGS['config']['args']['folder']) :
- if f :
- FILES += [os.sep.join([root,name]) for name in f]
- FILES = resume(FILES)
- FILES = np.array_split(FILES,BATCH)
- procs = []
- for FILE_GROUP in FILES :
-
- FILE_GROUP = FILE_GROUP.tolist()
- # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
- # proc = Process(target=apply,args=(row,info['store'],_info,))
- parser = x12.Parser(PATH) #os.sep.join([PATH,'config.json']))
- parser.set.files(FILE_GROUP)
- parser.start()
- procs.append(parser)
- SYS_ARGS['procs'] = procs
- # @socket_.on('data',namespace='/stream')
- def push() :
- _args = dict(SYS_ARGS['config']['store'].copy(),**{"type":"mongo.MongoReader"})
- reader = transport.factory.instance(**_args)
- pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
- _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
- r = pd.DataFrame(reader.read(mongo=_args))
- r = healthcareio.analytics.Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})
- # emit("update",r,json=True)
- return r
- # @socket_.on('connect')
- # def client_connect(**r):
- # print ('Connection received')
- # print (r)
- # push()
- # pass
-
- @app.route("/favicon.ico")
- def _icon():
- return send_from_directory(os.path.join([app.root_path, 'static','img','logo.svg']),
- 'favicon.ico', mimetype='image/vnd.microsoft.icon')
- @app.route("/")
- def init():
- e = SYS_ARGS['engine']
- sections = {"remits":e.info['835'],"claims":e.info['837']}
- _args = {"sections":sections,"store":SYS_ARGS["config"]["store"],"owner":SYS_ARGS['config']['owner'],"args":SYS_ARGS["config"]["args"]}
- return render_template("index.html",**_args)
- @app.route("/format/<id>/<index>",methods=['POST'])
- def _format(id,index):
-
- e = SYS_ARGS['engine']
- key = '837' if id == 'claims' else '835'
- index = int(index)
- # p = e.info[key][index]
- p = e.filter(type=id,index=index)
-
- r = []
- for item in p['pipeline'] :
- _item= dict(item)
- _item = dict(_item,**healthcareio.analytics.Apex.apply(item))
- del _item['data']
- if 'apex' in _item or 'html' in _item:
- r.append(_item)
-
-
- r = {"id":p['id'],"pipeline":r}
- return json.dumps(r),200
- @app.route("/get/<id>/<index>",methods=['GET'])
- def get(id,index):
- e = SYS_ARGS['engine']
- key = '837' if id == 'claims' else '835'
- index = int(index)
- # p = e.info[key][index]
- p = e.filter(type=id,index=index)
- r = {}
- for item in p[0]['pipeline'] :
-
- _item= [dict(item)]
-
- # r[item['label']] = item['data'].to_dict(orient='record')
- r[item['label']] = item['data'].to_dict('record')
- return json.dumps(r),200
- @app.route("/reset",methods=["POST"])
- def reset():
- return "1",200
- @app.route("/data",methods=['GET'])
- def get_data ():
- """
- This function will return statistical data about the services i.e general statistics about what has/been processed
- """
- HEADER = {"Content-type":"application/json"}
- _args = SYS_ARGS['config']
- options = dict(proxy.get.files(_args),**proxy.get.processes(_args))
- return json.dumps(options),HEADER
- @app.route("/log/<id>",methods=["POST","PUT","GET"])
- def log(id) :
- HEADER = {"Content-Type":"application/json; charset=utf8"}
- if id == 'params' and request.method in ['PUT', 'POST']:
- info = request.json
- _args = {"batch":info['batch'] if 'batch' in info else 1,"resume":True}
- #
- # We should update the configuration
- SYS_ARGS['config']['args'] = _args
- PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
- write = lambda content: (open(PATH,'w')).write(json.dumps(content))
- proc = Process(target=write,args=(SYS_ARGS['config'],))
- proc.start()
- return "1",HEADER
- pass
- @app.route("/io/<id>",methods=['POST'])
- def io_data(id):
- if id == 'params' :
- _args = request.json
- #
- # Expecting batch,folder as parameters
- _args = request.json
- _args['resume'] = True
- print (_args)
- #
- # We should update the configuration
- SYS_ARGS['config']['args'] = _args
- # PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
- try:
- write = lambda content: (open(PATH,'w')).write(json.dumps(content))
- proc = Process(target=write,args=(SYS_ARGS['config'],))
- proc.start()
- # proc.join()
- return "1",200
- except Exception as e :
- return "0",403
- pass
- elif id == 'stop' :
- stop()
- pass
- elif id == 'run' :
- # run()
- _args = {"args":SYS_ARGS['config']['args'],"store":SYS_ARGS["config"]["store"]}
- proxy.run(_args)
- return "1",200
- pass
-
- @app.route("/export")
- def export_form():
- _args = {"context":SYS_ARGS['context']}
- return render_template("store.html",**_args)
- @app.route("/export",methods=['POST','PUT'])
- def apply_etl():
- _info = request.json
- m = {'s3':'s3.s3Writer','mongo':'mongo.MongoWriter'}
- if _info :
- dest_args = {'type':m[_info['type']],"args": _info['content'] }
- src_args = SYS_ARGS['config']['store']
- # print (_args)
- # writer = transport.factory.instance(**_args)
- proxy.publish(src_args,dest_args)
- return "1",405
-
- else:
- return "0",404
- @app.route("/update")
- def update():
- pass
- return "0",405
- @app.route("/reload",methods=['POST'])
- def reload():
- # e = SYS_ARGS['engine']
- # e.apply()
- PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
- e = healthcareio.analytics.engine(PATH)
- # e.apply()
- SYS_ARGS['engine'] = e
- return "1",200
-
- if __name__ == '__main__' :
- PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
- DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
- SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
- #
- #
- PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
- #
- # Adjusting configuration with parameters (batch,folder,resume)
- SYS_ARGS['config'] = json.loads(open(PATH).read())
- if 'args' not in SYS_ARGS['config'] :
- SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
-
- SYS_ARGS['procs'] = []
-
-
- # SYS_ARGS['path'] = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
- e = healthcareio.analytics.engine(PATH)
- e.apply(type='claims',serialize=False)
- SYS_ARGS['engine'] = e
- app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=True)
|