浏览代码

feature: added support for custom configuration

Steve Nyemba 4 年之前
父节点
当前提交
2beddab6b4
共有 6 个文件被更改,包括 170 次插入91 次删除
  1. 25 6
      README.md
  2. 2 0
      healthcareio/__init__.py
  3. 62 61
      healthcareio/healthcare-io.py
  4. 10 10
      healthcareio/server/__init__.py
  5. 1 1
      healthcareio/server/proxy.py
  6. 70 13
      healthcareio/x12/__init__.py

+ 25 - 6
README.md

@@ -22,14 +22,38 @@ We wrote this frame to be used in both command line or as a library within in yo
     pip install --upgrade git+https://hiplab.mc.vanderbilt.edu/git/lab/parse-edi.git
 
 ## Usage 
+        cli:
+        
+            healthcare-io.py    --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
+            healthcare-io.py    --parse --folder <path> [--batch <value>] [--resume]
+            healthcare-io.py    --check-update
+        action :
+            --signup|init   signup user and get configuration file
+            --parse         starts parsing
+            --check         checks for updates
+        parameters :
+            --<[signup|init]>   signup or get a configuration file from a parsing server
+            --folder            location of the files (the program will recursively traverse it)
+            --store             data store mongo or sqlite or mongodb
+            --resume            will attempt to resume if there was an interruption
 
 **cli :**
 
 1. signup to get parsing configuration
 
+    The parser is driven by a configuration, file you need by signing up. 
+
         healthcare-io.py --signup <email> [--store <mongo|sqlite>]
+
+2. check version       
+
+    Occasionally the attributes in the configuration file may change, This function will determine if there is a new version available.
+
+        healthcare-io.py --check-update
         
-2. parsing claims in a folder
+3. parsing data in a folder
+
+    The parser will recursively traverse a directory with claims and or remittances
 
         healthcare-io.py --parse <claims|remits> --folder <path> [--batch <n>] [--resume]
         
@@ -39,11 +63,6 @@ We wrote this frame to be used in both command line or as a library within in yo
             --batch     number of processes to spawn to parse the files
             --resume    tells the parser to resume parsing 
                         if all files weren't processed or new files were added into the folder
-3. dashboard
-    
-    There is a built-in dashboard that has displays descriptive analytics in a web browser
-    
-        healthcare-io.py --server <port> [--context <name>]    
         
 **Embedded in Code   :**
 

+ 2 - 0
healthcareio/__init__.py

@@ -17,4 +17,6 @@ Usage :
 
 from healthcareio import analytics
 import healthcareio.x12 as x12
+import healthcareio.params as params
+
 # from healthcareio import server

+ 62 - 61
healthcareio/healthcare-io.py

@@ -32,9 +32,10 @@ Usage :
 from healthcareio.params import SYS_ARGS
 from transport import factory
 import requests
-
 from healthcareio import analytics
 from healthcareio import server
+
+
 from healthcareio.parser import get_content
 import os
 import json
@@ -56,10 +57,10 @@ if not os.path.exists(PATH) :
 import platform
 import sqlite3 as lite
 # PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
-def register (**args) :
+def signup (**args) :
     """
     :email  user's email address
-    :url    url of the provider to register
+    :url    url of the provider to signup
     """
     
     email = args['email']
@@ -203,13 +204,28 @@ def upgrade(**args):
     """    
     url = args['url'] if 'url' in args else URL+"/upgrade"
     headers = {"key":args['key'],"email":args["email"],"url":url}
+def check(**_args):
+    """
+    This function will check if there is an update available (versions are in the configuration file)
+    :param url
+    """
+    url = _args['url'][:-1] if _args['url'].endswith('/') else _args['url']
+    url = url + "/version"
+    if 'version' not in _args :
+        version = {"_id":"version","current":0.0}
+    else:
+        version = _args['version']
+    http = requests.session()    
+    r = http.get(url)
+    return r.json()
     
 if __name__ == '__main__' :
     info = init()
     
     if 'out-folder' in SYS_ARGS :
         OUTPUT_FOLDER = SYS_ARGS['out-folder']
-        
+    SYS_ARGS['url'] = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
+    
     if set(list(SYS_ARGS.keys())) & set(['signup','init']):
         #
         # This command will essentially get a new copy of the configurations
@@ -217,10 +233,10 @@ if __name__ == '__main__' :
         #
         
         email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
-        url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com'
+        url = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
         store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
         db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
-        register(email=email,url=url,store=store,db=db)
+        signup(email=email,url=url,store=store,db=db)
     # else:
     #     m = """
     #     usage:
@@ -244,11 +260,17 @@ if __name__ == '__main__' :
         if 'file' in SYS_ARGS :
             files = [SYS_ARGS['file']]  if not os.path.isdir(SYS_ARGS['file']) else []
         if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
-            names = os.listdir(SYS_ARGS['folder'])
-            files   += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
+            for root,_dir,f in os.walk(SYS_ARGS['folder']) :
+                
+                if f :
+                    files += [os.sep.join([root,name]) for name in f]
+            
+            # names = os.listdir(SYS_ARGS['folder'])
+            # files   += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
         else:
             #
-            # raise an erro
+            # raise an error
+            
             pass
         #
         # if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't
@@ -256,40 +278,13 @@ if __name__ == '__main__' :
         if 'resume' in SYS_ARGS :
             store_config = json.loads( (open(os.sep.join([PATH,'config.json']))).read() )
             files = proxy.get.resume(files,store_config )
-            print (["Found ",len(files)," files unprocessed"])
+            # print (["Found ",len(files)," files unprocessed"])
         #
         # @TODO: Log this here so we know what is being processed or not
         SCOPE = None
         
         if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
-            # logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
-            # if info['store']['type'] == 'disk.DiskWriter' :
-            #     info['store']['args']['path'] += (os.sep + 'healthcare-io.json')
-            # elif info['store']['type'] == 'disk.SQLiteWriter' :
-            #     # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3')
-            #     pass
-            
-            
-            # if info['store']['type'] == 'disk.SQLiteWriter' : 
-            #         info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower()
-            #         _info = json.loads(json.dumps(info['store']))
-            #         _info['args']['table']='logs'
-            # else:
-            #     #
-            #     # if we are working with no-sql we will put the logs in it (performance )?
-
-            #         info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower()
-            #         _info = json.loads(json.dumps(info['store']))
-            #         _info['args']['doc'] = 'logs'
-            #         logger = factory.instance(**_info)
 
-            # writer = factory.instance(**info['store'])
-            
-            #
-            # we need to have batches ready for this in order to run some of these queries in parallel
-            # @TODO: Make sure it is with a persistence storage (not disk .. not thread/process safe yet)
-            #   - Make sure we can leverage this on n-cores later on, for now the assumption is a single core
-            #
             BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
             
             files = np.array_split(files,BATCH_COUNT)
@@ -308,27 +303,7 @@ if __name__ == '__main__' :
             while len(procs) > 0 :
                 procs = [proc for proc in procs if proc.is_alive()]
                 time.sleep(2)
-            #     for filename in files :
-                    
-            #         if filename.strip() == '':
-            #             continue
-            #         # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])  
-            #         # 
-            #         try:              
-            #             content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
-            #             if content :                        
-            #                 writer.write(content)
-            #             if logs :
-            #                 [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
-            #             else:
-            #                 logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
-            #         except Exception as e:
-            #             logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
-            #         # print ([filename,len(content)])
-            #         #
-            #         # @TODO: forward this data to the writer and log engine 
-            #         #
-            
+
                 
 
         pass
@@ -358,6 +333,28 @@ if __name__ == '__main__' :
         pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
         pthread = Process(target=pointer,args=())
         pthread.start()
+    elif 'check-update' in SYS_ARGS :
+        _args = {"url":SYS_ARGS['url']}
+        try:
+            if os.path.exists(os.sep.join([PATH,'config.json'])) :
+                SYS_ARGS['config'] = json.loads((open(os.sep.join([PATH,'config.json']))).read())
+            else:
+                SYS_ARGS['config'] = {}
+            if 'version' in SYS_ARGS['config'] :
+                _args['version'] = SYS_ARGS['config']['version']
+            version = check(**_args)
+            _version = {"current":0.0}if 'version' not in SYS_ARGS['config'] else SYS_ARGS['config']['version']
+            if _version['current'] != version['current'] :
+                print ()
+                print ("You need to upgrade your system to version to ",version['current'])
+                print ("\t- signup (for new configuration)")
+                print ("\t- use pip to upgrade the codebase")
+            else:
+                print ()
+                print ("You are running the current configuraiton version ",_version.current)
+        except Exception as e:
+            print (e)
+            pass
         
     elif 'export' in SYS_ARGS:
         #
@@ -373,11 +370,15 @@ if __name__ == '__main__' :
         cli:
         
             healthcare-io.py    --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
-            healthcare-io.py    --parse claims --folder <path> [--batch <value>]
-            healthcare-io.py    --parse remits --folder <path> [--batch <value>] [--resume]
-        
+            healthcare-io.py    --parse --folder <path> [--batch <value>] [--resume]
+            healthcare-io.py    --check-update
+        action :
+            --signup|init   signup user and get configuration file
+            --parse         starts parsing
+            --check         checks for updates
         parameters :
             --<[signup|init]>   signup or get a configuration file from a parsing server
+            --folder            location of the files (the program will recursively traverse it)
             --store             data store mongo or sqlite or mongodb
             --resume            will attempt to resume if there was an interruption
         """

+ 10 - 10
healthcareio/server/__init__.py

@@ -11,11 +11,11 @@ import numpy as np
 from healthcareio import x12
 
 from multiprocessing import Process
-from flask_socketio import SocketIO, emit, disconnect,send
+# from flask_socketio import SocketIO, emit, disconnect,send
 from healthcareio.server import proxy
 PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
 app = Flask(__name__)
-socket_ = SocketIO(app)
+# socket_ = SocketIO(app)
 
 def resume (files):
     _args = SYS_ARGS['config']['store'].copy()
@@ -30,7 +30,7 @@ def resume (files):
         _files = [item['name'] for item in _files]
     except Exception as e :
         pass
-    print (["found ",len(files),"\tProcessed  ",len(_files)])
+    
     return list(set(files) - set(_files))
         
     
@@ -66,14 +66,14 @@ def push() :
     _args = {"aggregate":"logs","cursor":{},"allowDiskUse":True,"pipeline":pipeline}
     r = pd.DataFrame(reader.read(mongo=_args))
     r = healthcareio.analytics.Apex.apply({"chart":{"type":"donut","axis":{"x":"count","y":"type"}},"data":r})
-    emit("update",r,json=True)
+    # emit("update",r,json=True)
     return r
-@socket_.on('connect')
-def client_connect(**r):
-    print ('Connection received')
-    print (r)
-    push()
-    pass
+# @socket_.on('connect')
+# def client_connect(**r):
+#     print ('Connection received')
+#     print (r)
+#     push()
+#     pass
     
 @app.route("/favicon.ico")
 def _icon():

+ 1 - 1
healthcareio/server/proxy.py

@@ -32,7 +32,7 @@ class get :
             _files = [item['name'] for item in _files]
         except Exception as e :
             pass
-        print (["found ",len(files),"\tProcessed  ",len(_files)])
+        print ( [len(list(set(files) - set(_files))),' files to be processed'])
         return list(set(files) - set(_files))
 
     @staticmethod

+ 70 - 13
healthcareio/x12/__init__.py

@@ -24,6 +24,7 @@ import sys
 from itertools import islice
 from multiprocessing import Process
 import transport
+import jsonmerge
 class void :
     pass
 class Formatters :
@@ -70,7 +71,7 @@ class Formatters :
         else:
             
             value =  [ [prefix]+ self.split(item,'>') for item in row.replace('~','').split(sep)[1:] ]
-
+        
         return value if type(value) == list and type(value[0]) != list else value[0]
     def get_config(self,config,row):
         """
@@ -95,7 +96,6 @@ class Formatters :
             if _row[0] in config['SIMILAR']    :
                 key = config['SIMILAR'][_row[0]]
                 _info = config[key]
-        
         return _info
     
     def hash(self,value):
@@ -181,13 +181,16 @@ class Formatters :
         return x
 class Parser (Process):
     def __init__(self,path):
+        """
+            :path       path of the configuration file (it can be absolute)
+        """
         Process.__init__(self)
         self.utils  = Formatters()
         self.get    = void()
         self.get.value = self.get_map
         self.get.default_value = self.get_default_value
         _config = json.loads(open(path).read())
-        
+        self._custom_config = self.get_custom(path)
         self.config = _config['parser']
         self.store  = _config['store']        
         
@@ -197,6 +200,27 @@ class Parser (Process):
         self.emit = void()
         self.emit.pre =  None
         self.emit.post = None
+    def get_custom(self,path) :
+        """
+        :path   path of the configuration file (it can be absolute)
+        """
+        #
+        #
+        _path = path.replace('config.json','')
+        if _path.endswith(os.sep) :
+            _path = _path[:-1]
+        
+        _config = {}
+        _path = os.sep.join([_path,'custom'])
+        if os.path.exists(_path) :
+            
+            files = os.listdir(_path)
+            if files :
+                fullname = os.sep.join([_path,files[0]])
+                
+                _config = json.loads ( (open(fullname)).read() )
+        return _config
+
     def set_files(self,files):
         self.files = files
     def get_map(self,row,config,version=None):
@@ -247,15 +271,18 @@ class Parser (Process):
                         
                         value = {key:value} if key not  in value else value
                         
+                        
                     else:
                         if 'syn' in config and value in config['syn'] :
                             value = config['syn'][value]
                     if type(value) == dict :
                         
                         object_value = dict(object_value, **value) 
+                        
                     else:
                         
                         object_value[key] = value
+                        
         else:
             #
             # we are dealing with a complex object
@@ -275,26 +302,35 @@ class Parser (Process):
         return object_value
     def apply(self,content,_code) :
         """
-            :file content i.e a segment with the envelope
-            :_code  837 or 835 (helps get the appropriate configuration)
+        :content    content of a file i.e a segment with the envelope
+        :_code  837 or 835 (helps get the appropriate configuration)
         """
         util   = Formatters()
         # header       = default_value.copy()
         value = {}
+        
         for row in content[:] :
             
+            
             row     = util.split(row.replace('\n','').replace('~',''))
             _info   = util.get.config(self.config[_code][0],row)
+            if self._custom_config and _code in self._custom_config:
+                _cinfo   = util.get.config(self._custom_config[_code],row)
+            else:
+                _cinfo = {}
+            # _info   = self.consolidate(row=row,type=_code,config=_info,util=util)
+            # print ([row[0],_info])
+            # print ()
+            # continue
+            # _cinfo   = util.get.config(self._custom_config[_code],row)
+            
             
             if _info :
+
                 try:
-                    
+                    _info = jsonmerge.merge(_info,_cinfo)
                     tmp = self.get.value(row,_info)
                     
-                    # if 'P1080351470' in content[0] and 'PLB' in row:
-                    #     print (_info)
-                    #     print (row)
-                    #     print (tmp)
                     if not tmp :
                         continue 
                     if 'label' in _info :
@@ -326,7 +362,9 @@ class Parser (Process):
                     elif 'field' in _info :
                         
                         name = _info['field']
-                        value[name] = tmp
+                        # value[name] = tmp
+                        value = jsonmerge.merge(value,{name:tmp})
+                      
                     else:
                         
 
@@ -341,6 +379,7 @@ class Parser (Process):
         return value if value else {}
 
     def get_default_value(self,content,_code):
+        
         util = Formatters()
         TOP_ROW = content[1].split('*')
         CATEGORY= content[2].split('*')[1].strip()
@@ -359,6 +398,8 @@ class Parser (Process):
             value['payer_id'] = SENDER_ID               
         else:
             value['provider_id'] = SENDER_ID
+        #
+        # Let's parse this for default values            
         return value
 
     def read(self,filename) :
@@ -381,8 +422,14 @@ class Parser (Process):
                 INITIAL_ROWS = file[:4]
             if len(INITIAL_ROWS) < 3 :
                 return None,[{"name":filename,"completed":False}],None
-            section = 'HL' if INITIAL_ROWS[1].split('*')[1] == 'HC' else 'CLP'            
-            _code   = '837' if section == 'HL' else '835'
+            # section = 'HL' if INITIAL_ROWS[1].split('*')[1] == 'HC' else 'CLP'       
+            # _code   = '837' if section == 'HL' else '835'
+            # print ([_code,section])
+            _code = INITIAL_ROWS[2].split('*')[1].strip()
+            # section = 'CLP' if _code == '835' else 'HL'
+            section  = self.config[_code][0]['SECTION'].strip()
+            #
+            # adjusting the 
             DEFAULT_VALUE = self.get.default_value(INITIAL_ROWS,_code)
             DEFAULT_VALUE['name'] = filename.strip()
             #
@@ -390,22 +437,30 @@ class Parser (Process):
             #   index 1 identifies file type i.e CLM for claim and CLP for remittance
             segment = []
             index = 0;
+            _toprows = []
             for row in file :
+                row = row.replace('\r','')
+                if not segment and not row.startswith(section):
+                    _toprows += [row]
                 if row.startswith(section) and not segment:
                     
                     segment = [row]
+
                     continue
                     
                 elif segment and not row.startswith(section):
                     
                     segment.append(row)
+                
                     
                 if len(segment) > 1 and row.startswith(section):
                     #
                     # process the segment somewhere (create a thread maybe?)
                     # 
                     # default_claim = dict({"index":index},**DEFAULT_VALUE)
+                    # print (_toprows)
                     _claim = self.apply(segment,_code)
+                    
                     # if _claim['claim_id'] == 'P1080351470' :
                     #     print (_claim)
                         # _claim = dict(DEFAULT_VALUE,**_claim)
@@ -425,12 +480,14 @@ class Parser (Process):
                 claim = self.apply(segment,_code)
                 if claim :
                     claim['index'] = len(claims)
+                    claim = jsonmerge.merge(claim,self.apply(_toprows,_code))
                     claims.append(dict(DEFAULT_VALUE,**claim))
             if type(file) != list :
                 file.close()
 
             # x12_file = open(filename.strip(),errors='ignore').read().split('\n')
         except Exception as e:
+           
             logs.append ({"parse":_code,"completed":False,"name":filename,"msg":e.args[0]})
             return [],logs,None