Переглянути джерело

Merge branch 'dev' of hiplab/parser into master

Steve Nyemba 2 роки тому
батько
коміт
0547746f62

+ 37 - 35
healthcareio/analytics.py

@@ -192,18 +192,23 @@ class Apex :
     @staticmethod
     def scalar(item):
         _df = item['data']
-        
-        name = _df.columns.tolist()[0]
-        value = _df[name].values.round(2)[0]
+        value = '0'
+        unit = ''
         html = '<div class="scalar"><div class="value">:value</div><div class="label">:label</div></div>'
-        if value > 999 and value < 1000000 :
-            value = " ".join([str(np.divide(value,1000).round(2)),"K"])
-        elif value > 999999  :
-            #@ Think of considering the case of a billion ...
-            value = " ".join([str(np.divide(value,1000000).round(2)),"M"])
-        else:
-            value = str(value)
-        unit = name.replace('_',' ') if 'unit' not in item else item['unit']
+        if _df.shape[0] > 0 :
+            print (_df)
+            print ('_____________________________________')
+            name = _df.columns.tolist()[0]
+            value = _df[name].values[0]
+            
+            if value > 999 and value < 1000000 :
+                value = " ".join([str(np.divide(value,1000).round(2)),"K"])
+            elif value > 999999  :
+                #@ Think of considering the case of a billion ...
+                value = " ".join([str(np.divide(value,1000000).round(2)),"M"])
+            else:
+                value = str(value)
+            unit = name.replace('_',' ') if 'unit' not in item else item['unit']
         return {'html':html.replace(':value',value).replace(":label",unit)}
     @staticmethod
     def column(item):
@@ -319,8 +324,9 @@ class Apex :
             values = df[x_cols].values.round(2).tolist()
         else:
             labels = [name.upper().replace('_',' ') for name in df.columns.tolist()]
-            df = df.astype(float)
-            values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist()
+            # df = df.astype(float)
+            # values = df.values.round(2).tolist()[0] if df.shape[1] > 1 else df.values.round(2).tolist()
+            values = df[[name for name in df.columns if df[name].dtype in [float,int]] ].values.round(2).tolist()
             
         colors  = COLORS[:len(values)]
         options = {"series":values,"colors":colors,"labels":labels,"dataLabels":{"enabled":True,"style":{"colors":["#000000"]},"dropShadow":{"enabled":False}},"chart":{"type":"donut","width":200},"plotOptions":{"pie":{"customScale":.9}},"legend":{"position":"right"}}
@@ -343,10 +349,10 @@ class engine :
         self.store_config = _config['store']          
         self.info   = _config['analytics']
         _args = self.store_config
-        if self.store_config['type'] == 'mongo.MongoWriter' :
-            _args['type'] = 'mongo.MongoReader'
-        else:
-            _args['type'] = 'disk.SQLiteReader'
+        if 'type' not in self.store_config :           
+            #
+            # This is the newer version of data-transport
+            self.store_config['context'] = 'read'
         self.store_config = _args ;
     
     def filter (self,**args):
@@ -367,8 +373,8 @@ class engine :
         # conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
         # conn.create_aggregate("stdev",1,stdev)
         DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql'
-        if DB_TYPE == 'mongo' :
-            self.store_config['args']['doc'] = args['type']
+        # if DB_TYPE == 'mongo' :
+        #     self.store_config['args']['doc'] = args['type']
         
         self.reader = transport.factory.instance(**self.store_config)
         r = []
@@ -414,20 +420,8 @@ class engine :
             _analytics = [_analytics[index]]
             
         _info = list(_analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']]
-        # conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
-        # conn.create_aggregate("stdev",1,stdev)
-        #
-        # @TODO: Find a better way to handle database variance
-        #
-        # DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql'
-        
-        if 'mongo' in self.store_config['type'] :
-            DB_TYPE='mongo'
-        else:
-            DB_TYPE='sql'
-            self.store_config['args']['table'] = args['type']
-        
         self.reader = transport.factory.instance(**self.store_config)
+        DB_TYPE  = 'mongo' if self.store_config ['provider'] in ['mongodb','mongo'] else 'sql'
         r = []
         for row in _info :
             pipeline = row['pipeline']
@@ -440,14 +434,22 @@ class engine :
                     continue
                 if DB_TYPE == 'sql' :
                     query = {"sql":query}
+                else:
+                    query = {DB_TYPE:query}
                 
-                item['data'] = self.reader.read(**query) #item)
+                _df = self.reader.read(**query) #item)
+                print (query)
+                print (self.reader)
                 if 'serialize' in args :
                     
                     # item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data']
-                    item['data'] = json.dumps(item['data'].to_dict('record')) if type(item['data']) == pd.DataFrame else item['data']
+                    item['data'] = json.dumps(_df.to_dict(orient='record')) 
                 else:
-                    item['data'] = (pd.DataFrame(item['data']))
+                    # item['data'] = (pd.DataFrame(item['data']))
+                    item['data'] = _df
+                    pass
+                print (_df.head())
+                break
                 pipeline[index] = item
                 index += 1
             #

+ 1 - 1
healthcareio/docker/Dockerfile

@@ -15,7 +15,7 @@ RUN ["apt-get","install","-y","mongodb","sqlite3","sqlite3-pcre","libsqlite3-dev
 #
 RUN ["pip3","install","--upgrade","pip"]
 RUN ["pip3","install","numpy","pandas","git+https://dev.the-phi.com/git/steve/data-transport","botocore","matplotlib"]
-# RUN ["pip3","install","git+https://healthcare.the-phi.com/git/code/parser.git","botocore"]
+RUN ["pip3","install","git+https://healthcare.the-phi.com/git/code/parser.git"]
 # RUN ["useradd", "-ms", "/bin/bash", "health-user"]
 # USER health-user
 #

+ 10 - 3
healthcareio/export/export.py

@@ -191,6 +191,7 @@ def init_sql(**_args):
         _config  = jsonmerge.merge(_config,CUSTOM_CONFIG[TYPE])
     #  
     _info = meta(_config)
+    
     _projectSQLite = [] #-- sqlite projection
     for field_name in _info['main'] :
         _projectSQLite += ["json_extract(data,'$."+field_name+"') "+field_name]    
@@ -211,12 +212,17 @@ def init_sql(**_args):
         SQL = "SELECT DISTINCT :fields FROM "+TABLE_NAME+", json_each(data) x, json_each(x.value) i where x.key = ':table'"
         SQL = SQL.replace(":table",table).replace(":fields",",".join(project))
         r += [{"table":table,"read":{"sql":SQL},"sql":create(table=table,key='claim_id',fields=fields)}]
+    
     return r
 def init(**_args):
-    if 'provider' in CONFIG['store'] and CONFIG['store']['provider'] == 'sqlite' :
-        return init_sql(**_args)
-    else:
+    # if 'provider' in CONFIG['store'] and CONFIG['store']['provider'] == 'sqlite' :
+    #     return init_sql(**_args)
+    # else:
+    #     return init_mongo(**_args)
+    if ('provider' in CONFIG['store'] and CONFIG['store']['provider'] == 'mongo') or ('type' in CONFIG['store'] and 'mongo' in CONFIG['store']['type']):
         return init_mongo(**_args)
+    else:
+        return init_sql(**_args)
 def init_mongo (**_args) :   
     """
     This function is intended to determine the number of tables to be created, as well as their type.
@@ -330,6 +336,7 @@ class Factory:
         job_args = init(type=X12_TYPE)  #-- getting the queries that will generate the objects we are interested in
         # print (json.dumps(job_args))
         _jobs = []
+        
         for row in job_args:
             # _store = json.loads(json.dumps(wstore))
             _store = copy.deepcopy(wstore)

+ 15 - 9
healthcareio/export/workers.py

@@ -12,6 +12,7 @@ from multiprocessing import Process, Lock
 import numpy as np
 import json
 import pandas as pd
+from zmq import has
 
 class Subject (Process):
     cache = pd.DataFrame()
@@ -94,8 +95,8 @@ class Worker :
         except Exception as error:
             pass
         finally:
-            
-            self.caller.notify() 
+            if hasattr(self,'caller') :
+                self.caller.notify() 
     def _apply(self):        
         pass
     def get(self):
@@ -176,11 +177,16 @@ class Reader(Worker):
     def init(self,**_args):
         super().init(**_args)
         self.rows = []
-        
+              
   
     def _apply(self):
         try:
-            
+            if 'type' in self._info :
+                self._info['type'] = self._info['type'].replace('Writer','Reader')
+                if 'fields' in self._info['args'] :
+                    del self._info['args']['fields']
+            else:
+                self._info['context'] = 'read'
             self.reader = transport.factory.instance(**self._info) ; 
             
             # self.rows = self.reader.read(mongo=self.pipeline)
@@ -206,7 +212,7 @@ class Reader(Worker):
         except Exception as e :
             _log['status'] = 0
             _log['info']  = {"error":e.args[0]}
-            print (e)
+            print ([e])
             
 
         self.log(**_log)
@@ -221,13 +227,13 @@ class Writer(Worker):
         super().__init__(**_args)
         if 'provider' in self._info :
             self._info['context'] = 'write'
-
+        
     def init(self,**_args):
         """
         :param  store   output data-store needed for writing
         :param invalues input values with to be written somewhere
         """
-        super().init(**_args)
+        
         
         self._invalues = _args['invalues']
         
@@ -259,8 +265,8 @@ class Writer(Worker):
                     # Upon upgrade use the operator "$toString" in export.init function
                     #
                     rows = [dict(item,**{"_id":str(item["_id"])}) for item in rows]
-                    
-                    writer.write(rows)
+                    _df = pd.DataFrame(rows)                    
+                    writer.write(_df)
                 index += 1
                 # for _e in rows :
                     # writer.write(_e) 

+ 28 - 2
healthcareio/healthcare-io.py

@@ -122,7 +122,14 @@ def signup (**args) :
     file = open( filename,'w')
     file.write( json.dumps(info))
     file.close()
+    _m = """
+        Thank you for signingup!!
+        Your configuration file is store in :path,
+            - More information visit https://healthcareio.the-phi.com/parser 
+            - Access the source https://healthcareio.the-phi.com/git/code/parser
 
+    """.replace(":path",CONFIG_FILE)
+    print (_m)
     #
     # Create the sqlite3 database to 
 
@@ -155,12 +162,13 @@ def init():
         # if 'type' in info['store'] :            
         lwriter = None
         is_sqlite = False
-        if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter'  and  not os.path.exists(info['store']['args']['path']) :
+        if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter'   :
             lwriter = transport.factory.instance(**info['store'])
             is_sqlite = True
         elif 'provider' in info['store'] and info['store']['provider'] == 'sqlite' :
             lwriter = transport.instance(**info['store']) ;
             is_sqlite = True
+        
         if lwriter and is_sqlite:
             for key in info['schema'] :
                 if key != 'logs' :
@@ -168,7 +176,7 @@ def init():
                 else:
                     _id = key
                 
-                if not lwriter.has(table=_id) :
+                if not lwriter.has(table=_id) :                    
                     lwriter.apply(info['schema'][key]['create'])
 
             # [lwriter.apply( info['schema'][key]['create']) for key in info['schema'] if not lwriter.has(table=key)]
@@ -285,6 +293,24 @@ if __name__ == '__main__' :
                 procs = [proc for proc in procs if proc.is_alive()]
                 time.sleep(2)
 
+        uri = OUTPUT_FOLDER
+        store_config = json.loads( (open(CONFIG_FILE)).read() )['store']
+        if 'type' in store_config :
+            uri = store_config['args']['host'] if 'host' in store_config['args'] else ( store_config['args']['path'] if 'path' in store_config['args'] else store_config['args']['database'])
+            if 'SQLite' in store_config['type']:
+                provider = 'sqlite'
+            elif 'sql' in store_config['type'] :
+                provider = 'SQL'
+            else:
+                provider = 'mongo'
+
+        else:
+            provider = store_config['provider']
+        _msg = """
+            Completed Parsing, The data is available in :provider database at :uri
+            Logs are equally available for errors and summary statistics to be compiled
+        """.replace(":provider",provider).replace(":uri",uri)
+        print (_msg)
                 
 
         pass

+ 1 - 0
healthcareio/server/__init__.py

@@ -225,6 +225,7 @@ if __name__ == '__main__' :
     PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
     #
     # Adjusting configuration with parameters (batch,folder,resume)
+    SYS_ARGS['config'] = json.loads(open(PATH).read())
     if 'args' not in SYS_ARGS['config'] :
         SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
     

+ 10 - 6
healthcareio/server/index.py

@@ -6,15 +6,16 @@ import json
 app = Flask(__name__)
 @app.route("/favicon.ico")
 def _icon():
-    return send_from_directory(os.path.join([app.root_path, 'static','img','logo.svg']),
+    return send_from_directory(os.path.join([app.root_path, 'static/img/logo.svg']),
                                'favicon.ico', mimetype='image/vnd.microsoft.icon')
 @app.route("/")
 def init():
     e = SYS_ARGS['engine']
     sections = {"remits":e.info['835'],"claims":e.info['837']}
-    _args = {"sections":sections,"store":SYS_ARGS['config']['store']}
-    print (SYS_ARGS['config']['store'])
-    return render_template("setup.html",**_args)
+    
+    _args = {"sections":sections,"store":SYS_ARGS['config']['store'],'args':{'batch':5}}
+    
+    return render_template("index.html",**_args)
 @app.route("/format/<id>/<index>",methods=['POST'])
 def _format(id,index):
     
@@ -73,13 +74,16 @@ def reload():
     
 if __name__ == '__main__' :
     PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
-    DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
+    DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 1
     SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
     #
     # 
     PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
-    
+    #
+    if os.path.exists(PATH) :
+        SYS_ARGS['config'] = json.loads(open(PATH).read())
     e = healthcareio.analytics.engine(PATH)
     # e.apply(type='claims',serialize=True)
     SYS_ARGS['engine'] = e
+    
     app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=True)

+ 22 - 24
healthcareio/server/proxy.py

@@ -37,14 +37,17 @@ class get :
 
     @staticmethod
     def processes(_args):
-        _info = pd.DataFrame(smart.top.read(name='healthcare-io.py'))[['name','cpu','mem']]
+        APP_NAME ='healthcare-io'
+        _info = smart.top.read(name=APP_NAME) #pd.DataFrame(smart.top.read(name='healthcare-io'))[['name','cpu','mem']]
+        
         
         if _info.shape[0] == 0 :
-            _info = pd.DataFrame({"name":["healthcare-io.py"],"cpu":[0],"mem":[0]})
+            _info = pd.DataFrame({"name":[APP_NAME],"cpu":[0],"mem":[0]})
         # _info = pd.DataFrame(_info.groupby(['name']).sum())
         # _info['name'] = ['healthcare-io.py']
         m = {'cpu':'CPU','mem':'RAM','name':'name'}
-        _info.columns = [m[name] for name in _info.columns.tolist()]
+        _info  = _info.rename(columns=m)
+        # _info.columns = [m[name] for name in _info.columns.tolist() if name in m]
         _info.index = np.arange(_info.shape[0])
 
         charts = []
@@ -56,23 +59,20 @@ class get :
                     {"data":df, "chart":{"type":"radial","axis":{"x":label,"y":"name"}}}
                     )['apex']
                 )
-        #
-        # This will update the counts for the processes, upon subsequent requests so as to show the change
-        #     
-        N = 0
-        lprocs = []
-        for proc in get.PROCS :
-            if proc.is_alive() :
-                lprocs.append(proc)
-        N = len(lprocs)     
-        get.PROCS = lprocs
-        return {"process":{"chart":charts,"counts":N}}
+        
+        return {"process":{"chart":charts,"counts":_info.shape[0]}}
     @staticmethod
     def files (_args):
-        _info = smart.folder.read(path='/data')
+        folder = _args['args']['folder']
+        _info = smart.folder.read(path=folder)
+        
         N = _info.files.tolist()[0]
-        if 'mongo' in _args['store']['type'] :
-            store_args = dict(_args['store'].copy(),**{"type":"mongo.MongoReader"})
+        store_args = _args['store'].copy()
+        store_args['context'] = 'read'
+        
+        # if 'mongo' in _args['store']['type'] :
+        if _args['store']['provider'] in ['mongo', 'mongodb']:
+            # store_args = dict(_args['store'].copy(),**{"type":"mongo.MongoReader"})
             # reader = transport.factory.instance(**_args)
             
             pipeline = [{"$group":{"_id":"$name","count":{"$sum":{"$cond":[{"$eq":["$completed",True]},1,0]}} }},{"$group":{"_id":None,"count":{"$sum":"$count"}}},{"$project":{"_id":0,"status":"completed","count":1}}]
@@ -83,12 +83,15 @@ class get :
 
 
         else:
-            store_args = dict(_args['store'].copy(),**{"type":"disk.SQLiteReader"})
-            store_args['args']['table'] = 'logs'
+            # store_args = dict(_args['store'].copy(),**{"type":"disk.SQLiteReader"})
+            
+            # store_args['args']['table'] = 'logs'
+            store_args['table'] = 'logs'
             query= {"sql":"select count(distinct json_extract(data,'$.name')) as count, 'completed' status from logs where json_extract(data,'$.completed') = true"}
             _query={"sql":"select json_extract(data,'$.parse') as type,count(distinct json_extract(data,'$.name')) as count from logs group by type"} #-- distribution claim/remits
         reader = transport.factory.instance(**store_args)
         _info = pd.DataFrame(reader.read(**query))
+        
         if not _info.shape[0] :
             _info = pd.DataFrame({"status":["completed"],"count":[0]})
         _info['count'] = np.round( (_info['count'] * 100 )/N,2)
@@ -97,11 +100,6 @@ class get :
         #
         # Let us classify the files now i.e claims / remits
         #
-        
-        
-        # pipeline = [{"$group":{"_id":"$parse","claims":{"$addToSet":"$name"}}},{"$project":{"_id":0,"type":"$_id","count":{"$size":"$claims"}}}]
-        # _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
-        # r = pd.DataFrame(reader.read(mongo=_args))
         r = pd.DataFrame(reader.read(**_query)) #-- distribution claims/remits
         r = Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})['apex']
         r['chart']['height'] = '100%'

+ 2 - 2
healthcareio/server/templates/setup.html

@@ -63,10 +63,10 @@
                         //
                         // We should insure the listeners are enabled
                         if(monitor.listen.handler == null){
-                            monitor.listen.handler = setInterval(
+                            /*monitor.listen.handler = setInterval(
                                 function(){
                                     console.log('running ...')
-                                    monitor.data()},5000)
+                                    monitor.data()},5000)*/
 
                         }
                     }else{

+ 49 - 1
healthcareio/x12/__init__.py

@@ -24,6 +24,7 @@ import sys
 from itertools import islice
 from multiprocessing import Process
 import transport
+from transport import providers
 import jsonmerge
 
 import copy
@@ -236,7 +237,54 @@ class Parser (Process):
                         _config[_id] = [_config[_id]]     
         config['parser'] = _config
         return config   
-    
+    @staticmethod
+    def init(**_args):
+        """
+        This function allows to initialize the database that will store the claims if need be 
+        :path   configuration file
+        """
+        PATH = os.sep.join([os.environ['HOME'],'.healthcareio']) 
+        filename = os.sep.join([PATH,'config.json']) 
+
+        filename    = _args['path'] if 'path' in _args else filename
+        info        = None
+        if os.path.exists(filename):
+            #
+            # Loading the configuration file (JSON format)
+            file = open(filename)
+            info = json.loads(file.read())
+
+            
+            OUTPUT_FOLDER = info['out-folder']
+            if 'output-folder' not in info and not os.path.exists(OUTPUT_FOLDER) :
+                os.mkdir(OUTPUT_FOLDER)
+            elif 'output-folder' in info and not os.path.exists(info['out-folder']) :
+                os.mkdir(info['out-folder'])
+            # if 'type' in info['store'] :            
+            lwriter = None
+            IS_SQL = False 
+            if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter'   :
+                lwriter = transport.factory.instance(**info['store'])
+                IS_SQL = True
+            elif 'provider' in info['store'] and info['store']['provider'] == 'sqlite' :
+                lwriter = transport.instance(**info['store']) ;
+                IS_SQL = [providers.SQLITE,providers.POSTGRESQL,providers.NETEZZA,providers.MYSQL,providers.MARIADB]
+            
+            if lwriter and IS_SQL:
+                for key in info['schema'] :
+                    if key != 'logs' :
+                        _id = 'claims' if key == '837' else 'remits'
+                    else:
+                        _id = key
+                    
+                    if not lwriter.has(table=_id) :                    
+                        lwriter.apply(info['schema'][key]['create'])
+
+                # [lwriter.apply( info['schema'][key]['create']) for key in info['schema'] if not lwriter.has(table=key)]
+                lwriter.close()
+
+        return info        
+
     def __init__(self,path):
         """
             :path       path of the configuration file (it can be absolute)

+ 1 - 1
setup.py

@@ -8,7 +8,7 @@ import sys
 def read(fname):
     return open(os.path.join(os.path.dirname(__file__), fname)).read() 
 args = {
-    "name":"healthcareio","version":"1.6.4.6",
+    "name":"healthcareio","version":"1.6.4.8",
     "author":"Vanderbilt University Medical Center",
     "author_email":"steve.l.nyemba@vumc.org",
     "include_package_data":True,