|
@@ -3,7 +3,78 @@ from healthcareio.params import SYS_ARGS
|
|
|
import healthcareio.analytics
|
|
|
import os
|
|
|
import json
|
|
|
+import time
|
|
|
+import smart
|
|
|
+import transport
|
|
|
+import pandas as pd
|
|
|
+import numpy as np
|
|
|
+import x12
|
|
|
+
|
|
|
+from multiprocessing import Process
|
|
|
+from flask_socketio import SocketIO, emit, disconnect,send
|
|
|
+from healthcareio.server import proxy
|
|
|
+PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
|
|
|
app = Flask(__name__)
|
|
|
+socket_ = SocketIO(app)
|
|
|
+
|
|
|
+def resume (files):
|
|
|
+ _args = SYS_ARGS['config']['store'].copy()
|
|
|
+ if 'mongo' in SYS_ARGS['config']['store']['type'] :
|
|
|
+ _args['type'] = 'mongo.MongoReader'
|
|
|
+ reader = transport.factory.instance(**_args)
|
|
|
+ _files = []
|
|
|
+ try:
|
|
|
+ pipeline = [{"$match":{"completed":{"$eq":True}}},{"$group":{"_id":"$name"}},{"$project":{"name":"$_id","_id":0}}]
|
|
|
+ _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
|
|
|
+ _files = reader.read(mongo = _args)
|
|
|
+ _files = [item['name'] for item in _files]
|
|
|
+ except Exception as e :
|
|
|
+ pass
|
|
|
+ print (["found ",len(files),"\tProcessed ",len(_files)])
|
|
|
+ return list(set(files) - set(_files))
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+def run ():
|
|
|
+ #
|
|
|
+ # let's get the files in the folder (perhaps recursively traverse them)
|
|
|
+ #
|
|
|
+ FILES = []
|
|
|
+ BATCH = int(SYS_ARGS['config']['args']['batch']) #-- number of processes (poorly named variable)
|
|
|
+
|
|
|
+ for root,_dir,f in os.walk(SYS_ARGS['config']['args']['folder']) :
|
|
|
+ if f :
|
|
|
+ FILES += [os.sep.join([root,name]) for name in f]
|
|
|
+ FILES = resume(FILES)
|
|
|
+ FILES = np.array_split(FILES,BATCH)
|
|
|
+ procs = []
|
|
|
+ for FILE_GROUP in FILES :
|
|
|
+
|
|
|
+ FILE_GROUP = FILE_GROUP.tolist()
|
|
|
+ # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
|
|
|
+ # proc = Process(target=apply,args=(row,info['store'],_info,))
|
|
|
+ parser = x12.Parser(PATH) #os.sep.join([PATH,'config.json']))
|
|
|
+ parser.set.files(FILE_GROUP)
|
|
|
+ parser.start()
|
|
|
+ procs.append(parser)
|
|
|
+ SYS_ARGS['procs'] = procs
|
|
|
+# @socket_.on('data',namespace='/stream')
|
|
|
+def push() :
|
|
|
+ _args = dict(SYS_ARGS['config']['store'].copy(),**{"type":"mongo.MongoReader"})
|
|
|
+ reader = transport.factory.instance(**_args)
|
|
|
+ pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
|
|
|
+ _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
|
|
|
+ r = pd.DataFrame(reader.read(mongo=_args))
|
|
|
+ r = healthcareio.analytics.Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})
|
|
|
+ emit("update",r,json=True)
|
|
|
+ return r
|
|
|
+@socket_.on('connect')
|
|
|
+def client_connect(**r):
|
|
|
+ print ('Connection received')
|
|
|
+ print (r)
|
|
|
+ push()
|
|
|
+ pass
|
|
|
+
|
|
|
@app.route("/favicon.ico")
|
|
|
def _icon():
|
|
|
return send_from_directory(os.path.join([app.root_path, 'static','img','logo.svg']),
|
|
@@ -12,7 +83,7 @@ def _icon():
|
|
|
def init():
|
|
|
e = SYS_ARGS['engine']
|
|
|
sections = {"remits":e.info['835'],"claims":e.info['837']}
|
|
|
- _args = {"sections":sections}
|
|
|
+ _args = {"sections":sections,"store":SYS_ARGS["config"]["store"],"owner":SYS_ARGS['config']['owner'],"args":SYS_ARGS["config"]["args"]}
|
|
|
return render_template("index.html",**_args)
|
|
|
@app.route("/format/<id>/<index>",methods=['POST'])
|
|
|
def _format(id,index):
|
|
@@ -21,43 +92,117 @@ def _format(id,index):
|
|
|
key = '837' if id == 'claims' else '835'
|
|
|
index = int(index)
|
|
|
# p = e.info[key][index]
|
|
|
- p = e.apply(type=id,index=index)
|
|
|
-
|
|
|
- #
|
|
|
+ p = e.filter(type=id,index=index)
|
|
|
+
|
|
|
r = []
|
|
|
- for item in p[0]['pipeline'] :
|
|
|
- _item= dict(item)
|
|
|
- del _item['sql']
|
|
|
- del _item ['data']
|
|
|
-
|
|
|
+ for item in p['pipeline'] :
|
|
|
+ _item= dict(item)
|
|
|
_item = dict(_item,**healthcareio.analytics.Apex.apply(item))
|
|
|
+ del _item['data']
|
|
|
if 'apex' in _item or 'html' in _item:
|
|
|
r.append(_item)
|
|
|
|
|
|
- r = {"id":p[0]['id'],"pipeline":r}
|
|
|
+
|
|
|
+ r = {"id":p['id'],"pipeline":r}
|
|
|
return json.dumps(r),200
|
|
|
+
|
|
|
@app.route("/get/<id>/<index>",methods=['GET'])
|
|
|
def get(id,index):
|
|
|
e = SYS_ARGS['engine']
|
|
|
key = '837' if id == 'claims' else '835'
|
|
|
index = int(index)
|
|
|
# p = e.info[key][index]
|
|
|
- p = e.apply(type=id,index=index)
|
|
|
+ p = e.filter(type=id,index=index)
|
|
|
r = {}
|
|
|
for item in p[0]['pipeline'] :
|
|
|
|
|
|
_item= [dict(item)]
|
|
|
|
|
|
- r[item['label']] = item['data'].to_dict(orient='record')
|
|
|
- # del _item['sql']
|
|
|
- # del _item ['data']
|
|
|
- # print (item['label'])
|
|
|
- # _item['apex'] = healthcareio.analytics.Apex.apply(item)
|
|
|
- # if _item['apex']:
|
|
|
- # r.append(_item)
|
|
|
-
|
|
|
- # r = {"id":p[0]['id'],"pipeline":r}
|
|
|
+ # r[item['label']] = item['data'].to_dict(orient='record')
|
|
|
+ r[item['label']] = item['data'].to_dict('record')
|
|
|
return json.dumps(r),200
|
|
|
+
|
|
|
+@app.route("/reset",methods=["POST"])
|
|
|
+def reset():
|
|
|
+ return "1",200
|
|
|
+@app.route("/data",methods=['GET'])
|
|
|
+def get_data ():
|
|
|
+ """
|
|
|
+ This function will return statistical data about the services i.e general statistics about what has/been processed
|
|
|
+ """
|
|
|
+ HEADER = {"Content-type":"application/json"}
|
|
|
+ _args = SYS_ARGS['config']
|
|
|
+ options = dict(proxy.get.files(_args),**proxy.get.processes(_args))
|
|
|
+ return json.dumps(options),HEADER
|
|
|
+@app.route("/log/<id>",methods=["POST","PUT","GET"])
|
|
|
+def log(id) :
|
|
|
+ HEADER = {"Content-Type":"application/json; charset=utf8"}
|
|
|
+ if id == 'params' and request.method in ['PUT', 'POST']:
|
|
|
+ info = request.json
|
|
|
+ _args = {"batch":info['batch'] if 'batch' in info else 1,"resume":True}
|
|
|
+ #
|
|
|
+ # We should update the configuration
|
|
|
+ SYS_ARGS['config']['args'] = _args
|
|
|
+ PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
|
|
|
+ write = lambda content: (open(PATH,'w')).write(json.dumps(content))
|
|
|
+ proc = Process(target=write,args=(SYS_ARGS['config'],))
|
|
|
+ proc.start()
|
|
|
+ return "1",HEADER
|
|
|
+ pass
|
|
|
+@app.route("/io/<id>",methods=['POST'])
|
|
|
+def io_data(id):
|
|
|
+ if id == 'params' :
|
|
|
+ _args = request.json
|
|
|
+ #
|
|
|
+ # Expecting batch,folder as parameters
|
|
|
+ _args = request.json
|
|
|
+ _args['resume'] = True
|
|
|
+ print (_args)
|
|
|
+ #
|
|
|
+ # We should update the configuration
|
|
|
+ SYS_ARGS['config']['args'] = _args
|
|
|
+ # PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
|
|
|
+ try:
|
|
|
+ write = lambda content: (open(PATH,'w')).write(json.dumps(content))
|
|
|
+ proc = Process(target=write,args=(SYS_ARGS['config'],))
|
|
|
+ proc.start()
|
|
|
+ # proc.join()
|
|
|
+ return "1",200
|
|
|
+ except Exception as e :
|
|
|
+ return "0",403
|
|
|
+ pass
|
|
|
+ elif id == 'stop' :
|
|
|
+ stop()
|
|
|
+ pass
|
|
|
+ elif id == 'run' :
|
|
|
+ # run()
|
|
|
+ _args = {"args":SYS_ARGS['config']['args'],"store":SYS_ARGS["config"]["store"]}
|
|
|
+ proxy.run(_args)
|
|
|
+ return "1",200
|
|
|
+ pass
|
|
|
+
|
|
|
+@app.route("/export")
|
|
|
+def export_form():
|
|
|
+ _args = {"context":SYS_ARGS['context']}
|
|
|
+ return render_template("store.html",**_args)
|
|
|
+@app.route("/export",methods=['POST','PUT'])
|
|
|
+def apply_etl():
|
|
|
+ _info = request.json
|
|
|
+ m = {'s3':'s3.s3Writer','mongo':'mongo.MongoWriter'}
|
|
|
+ if _info :
|
|
|
+ dest_args = {'type':m[_info['type']],"args": _info['content'] }
|
|
|
+ src_args = SYS_ARGS['config']['store']
|
|
|
+ # print (_args)
|
|
|
+ # writer = transport.factory.instance(**_args)
|
|
|
+ proxy.publish(src_args,dest_args)
|
|
|
+ return "1",405
|
|
|
+
|
|
|
+ else:
|
|
|
+ return "0",404
|
|
|
+@app.route("/update")
|
|
|
+def update():
|
|
|
+ pass
|
|
|
+ return "0",405
|
|
|
@app.route("/reload",methods=['POST'])
|
|
|
def reload():
|
|
|
# e = SYS_ARGS['engine']
|
|
@@ -74,11 +219,20 @@ if __name__ == '__main__' :
|
|
|
PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
|
|
|
DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
|
|
|
SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
|
|
|
+
|
|
|
#
|
|
|
#
|
|
|
PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
|
|
|
+ #
|
|
|
+ # Adjusting configuration with parameters (batch,folder,resume)
|
|
|
+ if 'args' not in SYS_ARGS['config'] :
|
|
|
+ SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
|
|
|
+
|
|
|
+ SYS_ARGS['procs'] = []
|
|
|
+
|
|
|
|
|
|
+ # SYS_ARGS['path'] = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
|
|
|
e = healthcareio.analytics.engine(PATH)
|
|
|
- # e.apply(type='claims',serialize=True)
|
|
|
+ e.apply(type='claims',serialize=False)
|
|
|
SYS_ARGS['engine'] = e
|
|
|
app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=True)
|