فهرست منبع

bug fix with parser

Steve Nyemba 4 سال پیش
والد
کامیت
670559aef7
5فایلهای تغییر یافته به همراه185 افزوده شده و 97 حذف شده
  1. 2 46
      healthcareio/__init__.py
  2. 29 8
      healthcareio/analytics.py
  3. 125 40
      healthcareio/healthcare-io.py
  4. 3 1
      healthcareio/params.py
  5. 26 2
      healthcareio/parser.py

+ 2 - 46
healthcareio/__init__.py

@@ -14,50 +14,6 @@ Usage :
     Embedded    :
 
 """
-import healthcareio
-import os
-import requests
-import platform
-import sqlite3 as lite
-from transport import factory
-import json
-#import healthcareio.params as params
-PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
-OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
-
-def register (**args) :
-    """
-    This function will reset/register a user i.e they will download the configuration
-    :email  user's email address
-    :url    url of the provider to register
-    """
-    URL = "https://healthcareio.the-phi.com" if 'url' not in args else args['url']
-
-    args['out_folder'] = os.sep.join([args['path'],args['out_folder']])
-    email = args['email']
-    url = args['url'] if 'url' in args else URL
-    folders = [PATH,OUTPUT_FOLDER]
-    for path in folders :
-        if not os.path.exists(path) :
-            os.mkdir(path)
-    
-    #
-    # 
-    headers = {"email":email,"client":platform.node()}
-    http = requests.session()    
-    r = http.post(url,headers=headers)
-    
-    #
-    # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
-    # if 'store' in args :
-    #     store = args['store']
-    filename =  (os.sep.join([PATH,'config.json']))
-    info = r.json() #{"parser":r.json(),"store":store}
-    info = dict({"owner":email},**info)
-    info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
-    info['out-folder'] = OUTPUT_FOLDER
-
-    file = open( filename,'w')
-    file.write( json.dumps(info))
-    file.close()
 
+from healthcareio import analytics
+# from healthcareio import server

+ 29 - 8
healthcareio/analytics.py

@@ -175,7 +175,8 @@ class Apex :
     @staticmethod
     def scalar(item):
         _df = item['data']
-        name = str(_df.columns[0])
+        print (_df)
+        name = _df.columns.tolist()[0]
         value = _df[name].values.round(2)[0]
         html = '<div class="scalar"><div class="value">:value</div><div class="label">:label</div></div>'
         if value > 999 and value < 1000000 :
@@ -240,9 +241,15 @@ class Apex :
         if type(y) == list :
             y = y[0]
         axis['x'] = [axis['x']] if type(axis['x']) != list else axis['x']
+        if not set(axis['x']) & set(df.columns.tolist()) :
+            print (set(axis['x']) & set(df.columns.tolist()))
+            print (axis['x'])
+            print (df.columns)
+            # df.columns = axis['x']
         series = []
         _min=_max = 0
         for x in axis['x'] :
+            
             series += [{"data": df[x].values.tolist()[:N],"name":x.upper().replace('_',' ')}]
             _min = df[x].min() if df[x].min() < _min else _min
             _max = df[x].max() if df[x].max() > _max else _max
@@ -317,6 +324,12 @@ class engine :
         _config = json.loads(f.read())
         self.store_config = _config['store']          
         self.info   = _config['analytics']
+        _args = self.store_config
+        if self.store_config['type'] == 'mongo.MongoWriter' :
+            _args['type'] = 'mongo.MongoReader'
+        else:
+            _args['type'] = 'disk.SQLiteReader'
+        self.reader = transport.factory.instance(**_args)
 
     def apply (self,**args) :
         """
@@ -332,19 +345,26 @@ class engine :
             analytics = [analytics[index]]
             
         _info = list(analytics) if 'filter' not in args else [item for item in analytics if args['filter'] == item['id']]
-        conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
-        conn.create_aggregate("stdev",1,stdev)
-
+        # conn = lite.connect(self.store_config['args']['path'],isolation_level=None)
+        # conn.create_aggregate("stdev",1,stdev)
+        DB_TYPE = 'mongo' if (type(self.reader) == transport.mongo.MongoReader) else 'sql'
         r = []
         for row in _info :
             
             for item in row['pipeline'] :
-                item['data'] = pd.read_sql(item['sql'],conn)
+                # item['data'] = pd.read_sql(item['sql'],conn)
+                query = {DB_TYPE:item[DB_TYPE]}
+                item['data'] = self.reader.read(**item)
                 if 'serialize' in args :
-                    item['data'] = json.dumps(item['data'].to_dict(orient='record'))
+                    
+                    item['data'] = json.dumps(item['data'].to_dict(orient='record')) if type(item['data']) == pd.DataFrame else item['data']
+                else:
+                    item['data'] = (pd.DataFrame(item['data']))
+                    
+                    
                 # if 'info' in item:
                 #     item['info'] = item['info'].replace(":rows",str(item["data"].shape[0]))
-        conn.close()
+        # conn.close()
         
         return _info
 
@@ -540,4 +560,5 @@ css = """
 # print (p[2]['pipeline'][0]['data'])
 # e.export (p[0])
 # features = ['diagnosis.code']
-# split(folder = folder, features=features)
+# split(folder = folder, features=features)
+

+ 125 - 40
healthcareio/healthcare-io.py

@@ -32,10 +32,18 @@ Usage :
 from healthcareio.params import SYS_ARGS
 from transport import factory
 import requests
+
+from healthcareio import analytics
+from healthcareio import server
 from healthcareio.parser import get_content
 import os
 import json
 import sys
+import numpy as np
+from multiprocessing import Process
+import time
+
+
 PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
 OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
 INFO = None
@@ -60,7 +68,8 @@ def register (**args) :
     
     #
     # 
-    headers = {"email":email,"client":platform.node()}
+    store = args['store'] if 'store' in args else 'sqlite'
+    headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']}
     http = requests.session()    
     r = http.post(url,headers=headers)
     
@@ -82,22 +91,6 @@ def register (**args) :
     # Create the sqlite3 database to 
 
 
-def analytics(**args):
-    """
-    This fucntion will only compute basic distributions of a given feature for a given claim   
-    @args
-        @param x:  vector of features to process
-        @param apply:  operation to be applied {dist} 
-    """
-    if args['apply'] in ['dist','distribution'] :
-        """
-        This section of the code will return the distribution of a given space. 
-        It is intended to be applied on several claims/remits
-        """
-        x = pd.DataFrame(args['x'],columns=['x'])
-        return x.groupby(['x']).size().to_frame().T.to_dict(orient='record')
-            
-
 def log(**args):
     """
     This function will perform a log of anything provided to it
@@ -152,7 +145,39 @@ def parse(**args):
         
         return get_content(args['filename'],CONFIG,SECTION)
 
+def apply(files,store_info,logger_info=None):
+    """
+        :files          list of files to be processed in this given thread/process
+        :store_info     information about data-store, for now disk isn't thread safe
+        :logger_info    information about where to store the logs
+    """
 
+    if not logger_info :
+        logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
+    else:
+        logger = factory.instance(**logger_info)
+
+    writer = factory.instance(**store_info)
+    for filename in files :
+        
+        if filename.strip() == '':
+            continue
+        # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])  
+        # 
+        try:              
+            content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
+            if content :                        
+                writer.write(content)
+            if logs :
+                [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
+            else:
+                logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
+        except Exception as e:
+            logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
+        # print ([filename,len(content)])
+        #
+        # @TODO: forward this data to the writer and log engine 
+        #
 def upgrade(**args):
     """
     :email  provide us with who you are
@@ -175,8 +200,9 @@ if __name__ == '__main__' :
         
         email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
         url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com'
-        
-        register(email=email,url=url)
+        store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
+        db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
+        register(email=email,url=url,store=store,db=db)
     # else:
     #     m = """
     #     usage:
@@ -218,46 +244,95 @@ if __name__ == '__main__' :
             #     CONFIG = CONFIG[ int(SYS_ARGS['version'])]
             # else:
             #     CONFIG = CONFIG[-1]
+            logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
             if info['store']['type'] == 'disk.DiskWriter' :
                 info['store']['args']['path'] += (os.sep + 'healthcare-io.json')
             elif info['store']['type'] == 'disk.SQLiteWriter' :
                 # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3')
                 pass
+            
+            
             if info['store']['type'] == 'disk.SQLiteWriter' : 
                     info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower()
             else:
+                #
+                # if we are working with no-sql we will put the logs in it (performance )?
 
                     info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower()
+                    _info = json.loads(json.dumps(info['store']))
+                    _info['args']['doc'] = 'logs'
+                    logger = factory.instance(**_info)
+
             writer = factory.instance(**info['store'])
-            logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
+            
+            #
+            # we need to have batches ready for this in order to run some of these queries in parallel
+            # @TODO: Make sure it is with a persistence storage (not disk .. not thread/process safe yet)
+            #   - Make sure we can leverage this on n-cores later on, for now the assumption is a single core
+            #
+            BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
+            
             #logger = factory.instance(type='mongo.MongoWriter',args={'db':'healthcareio','doc':SYS_ARGS['parse']+'_logs'})
             # schema = info['schema']
             
             # for key in schema :                
             #     sql = schema[key]['create']
             #     writer.write(sql)
-            for filename in files :
+            files = np.array_split(files,BATCH_COUNT)
+            procs = []
+            index = 0
+            for row in files :
+                
+                row = row.tolist()
+                logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
+                proc = Process(target=apply,args=(row,info['store'],_info,))
+                proc.start()
+                procs.append(proc)
+                index = index + 1
+            while len(procs) > 0 :
+                procs = [proc for proc in procs if proc.is_alive()]
+                time.sleep(2)
+            #     for filename in files :
+                    
+            #         if filename.strip() == '':
+            #             continue
+            #         # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])  
+            #         # 
+            #         try:              
+            #             content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
+            #             if content :                        
+            #                 writer.write(content)
+            #             if logs :
+            #                 [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
+            #             else:
+            #                 logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
+            #         except Exception as e:
+            #             logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
+            #         # print ([filename,len(content)])
+            #         #
+            #         # @TODO: forward this data to the writer and log engine 
+            #         #
+            
                 
-                if filename.strip() == '':
-                    continue
-                # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])  
-                # 
-                try:              
-                    content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
-                    if content :                        
-                        writer.write(content)
-                    if logs :
-                        [logger.write(_row) for _row in logs]
-                    else:
-                        logger.write({"name":filename,"completed":True,"rows":len(content)})
-                except Exception as e:
-                    logger.write({"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
-                # print ([filename,len(content)])
-                #
-                # @TODO: forward this data to the writer and log engine 
-                #
 
         pass
+    elif 'analytics' in SYS_ARGS :
+        PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
+        DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
+        SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
+        #
+        # 
+        
+        # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
+        
+        e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible
+        e.apply(type='claims',serialize=True)
+        SYS_ARGS['engine'] = e
+        
+        pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
+        pthread = Process(target=pointer,args=())
+        pthread.start()
+        
     elif 'export' in SYS_ARGS:
         #
         # this function is designed to export the data to csv
@@ -267,7 +342,17 @@ if __name__ == '__main__' :
         if set([format]) not in ['xls','csv'] :
             format = 'csv'
         
-
+    else:
+        msg = """
+        CLI Usage
+            healthcare-io.py    --register <email> --store <sqlite|mongo>
+            healthcare-io.py    --parse claims --folder <path> [--batch <value>]
+            healthcare-io.py    --parse remits --folder <path> [--batch <value>]
+        parameters :
+            --<[signup|init]>   signup or get a configuration file from a parsing server
+            --store             data store mongo or sqlite
+        """
+        print(msg)
         pass
     # """
     # The program was called from the command line thus we are expecting 

+ 3 - 1
healthcareio/params.py

@@ -8,10 +8,12 @@ if len(sys.argv) > 1:
 		value = None
 		if sys.argv[i].startswith('--'):
 			key = sys.argv[i][2:] #.replace('-','')
+			
 			SYS_ARGS[key] = 1			
+			
 			if i + 1 < N:
 				value = sys.argv[i + 1] = sys.argv[i+1].strip()
-			if key and value:
+			if key and value and not value.startswith('--'):
 				SYS_ARGS[key] = value
 				
 		

+ 26 - 2
healthcareio/parser.py

@@ -91,6 +91,27 @@ def format_date(value) :
         return "-".join([year,month,day])
 def format_time(value):
     return ":".join([value[:2],value[2:] ])[:5]
+def sv2_parse(value):
+    #
+    # @TODO: Sometimes there's a suffix (need to inventory all the variations)
+    #
+    if '>' in value or ':' in value:
+        xchar = '>' if '>' in value else ':'
+        _values = value.split(xchar)
+        modifier = {}
+        
+        if len(_values) > 2 :
+
+            modifier= {"code":_values[2]}
+            if len(_values) > 3 :
+                modifier['type'] = _values[3]
+        _value = {"code":_values[1],"type":_values[0]}
+        if modifier :
+            _value['modifier'] = modifier
+
+        return _value
+    else:
+        return value
 def format_proc(value):
     for xchar in [':','<'] :
         if xchar in value and len(value.split(xchar)) > 1 :
@@ -110,11 +131,11 @@ def format_pos(value):
     x =  {"code":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"code":x[0],"indicator":None,"frequency":None}
     return x
     
-def get_map(row,config,version):
+def get_map(row,config,version=None):
     
     label = config['label'] if 'label' in config else None    
     
-    omap = config['map'] if version not in config else config[version]
+    omap = config['map'] if not version or version not in config else config[version]
     anchors = config['anchors'] if 'anchors' in config else []
     if type(row[0]) == str:        
         object_value = {}
@@ -136,6 +157,9 @@ def get_map(row,config,version):
                     
                 if type(value) == dict :
                     for objkey in value :
+                        
+                        if type(value[objkey]) == dict :
+                            continue 
                         if 'syn' in config and value[objkey] in config['syn'] :
                             value[objkey] = config['syn'][ value[objkey]]
                     value = {key:value} if key not  in value else value