Browse Source

new: added export function (hopefully it works)

Steve Nyemba 4 years ago
parent
commit
abac60786e
4 changed files with 116 additions and 155 deletions
  1. 11 1
      README.md
  2. 103 152
      healthcareio/healthcare-io.py
  3. 1 1
      healthcareio/server/__init__.py
  4. 1 1
      setup.py

+ 11 - 1
README.md

@@ -27,7 +27,7 @@ We wrote this frame to be used in both command line or as a library within in yo
 
 1. signup to get parsing configuration
 
-    The parser is driven by a configuration, file you need by signing up. 
+    The parser is driven by a configuration file that specifies fields to parse and how to parse them. You need by signing up, to get a copy of the configuration file. 
 
         healthcare-io.py --signup <email> [--store <mongo|sqlite>]
 
@@ -49,7 +49,17 @@ We wrote this frame to be used in both command line or as a library within in yo
             --batch     number of processes to spawn to parse the files
             --resume    tells the parser to resume parsing 
                         if all files weren't processed or new files were added into the folder
+
+4. export data to a relational data-store
+
+    The parser will export data into other data-stores as a  relational tables allowing users to construct views to support a variety of studies.
+
+        healthcare-io.py --export <835|837> --config <path-export.json>
+
+        with:
+            --config    configuration to support data-store
         
+
 **Embedded in Code   :**
 
 The Healthcare/IO **parser** can be used within your code base as a library and handle storing data in a data store of choice

+ 103 - 152
healthcareio/healthcare-io.py

@@ -44,6 +44,7 @@ import numpy as np
 from multiprocessing import Process
 import time
 from healthcareio import x12
+from healthcareio.export import export
 import smart
 from healthcareio.server import proxy
 import pandas as pd
@@ -57,6 +58,24 @@ if not os.path.exists(PATH) :
 import platform
 import sqlite3 as lite
 # PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
+HELP_MESSAGE = """
+        cli:
+        
+            healthcare-io.py    --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
+            healthcare-io.py    --parse --folder <path> [--batch <value>] [--resume]
+            healthcare-io.py    --check-update
+            healthcare-io.py    --export <835|837> --config <config-path>
+        action :
+            --signup|init   signup user and get configuration file
+            --parse         starts parsing
+            --check         checks for updates
+            --export        export data of a 835 or 837 into another database
+        parameters :
+            --<[signup|init]>   signup or get a configuration file from a parsing server
+            --folder            location of the files (the program will recursively traverse it)
+            --store             data store mongo or sqlite or mongodb
+            --resume            will attempt to resume if there was an interruption
+        """
 def signup (**args) :
     """
     :email  user's email address
@@ -125,77 +144,77 @@ def init():
 #
 # Global variables that load the configuration files
 
-def parse(**args):
-    """
-    This function will parse the content of a claim or remittance (x12 format) give the following parameters
-    :filename   absolute path of the file to be parsed
-    :type       claims|remits in x12 format
-    """
-    global INFO
-    if not INFO :
-        INFO = init()
-    if args['type'] == 'claims' :
-        CONFIG = INFO['parser']['837']
-    elif args['type'] == 'remits' :
-        CONFIG = INFO['parser']['835']
-    else:
-        CONFIG = None
-    if CONFIG :
-        # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0]
-        CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1]
-        SECTION = CONFIG['SECTION']
-        os.environ['HEALTHCAREIO_SALT'] = INFO['owner']
+# def parse(**args):
+#     """
+#     This function will parse the content of a claim or remittance (x12 format) give the following parameters
+#     :filename   absolute path of the file to be parsed
+#     :type       claims|remits in x12 format
+#     """
+#     global INFO
+#     if not INFO :
+#         INFO = init()
+#     if args['type'] == 'claims' :
+#         CONFIG = INFO['parser']['837']
+#     elif args['type'] == 'remits' :
+#         CONFIG = INFO['parser']['835']
+#     else:
+#         CONFIG = None
+#     if CONFIG :
+#         # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0]
+#         CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1]
+#         SECTION = CONFIG['SECTION']
+#         os.environ['HEALTHCAREIO_SALT'] = INFO['owner']
 
         
-        return get_content(args['filename'],CONFIG,SECTION)
-def resume (files,id,config):
-    _args = config['store'].copy()
-    if 'mongo' in config['store']['type'] :
-        _args['type'] = 'mongo.MongoReader'
-        reader = factory.instance(**_args)
-    _files = []
-    if 'resume' in config['analytics'] :
-        _args = config['analytics']['resume'][id]
-        _files = reader.read(**_args)
-        _files = [item['name'] for item in _files if item['name'] != None]
-        return list(set(files) - set(_files))
+#         return get_content(args['filename'],CONFIG,SECTION)
+# def resume (files,id,config):
+#     _args = config['store'].copy()
+#     if 'mongo' in config['store']['type'] :
+#         _args['type'] = 'mongo.MongoReader'
+#         reader = factory.instance(**_args)
+#     _files = []
+#     if 'resume' in config['analytics'] :
+#         _args = config['analytics']['resume'][id]
+#         _files = reader.read(**_args)
+#         _files = [item['name'] for item in _files if item['name'] != None]
+#         return list(set(files) - set(_files))
         
-    return files
-    pass
-def apply(files,store_info,logger_info=None):
-    """
-        :files          list of files to be processed in this given thread/process
-        :store_info     information about data-store, for now disk isn't thread safe
-        :logger_info    information about where to store the logs
-    """
+    # return files
+    # pass
+# def apply(files,store_info,logger_info=None):
+#     """
+#         :files          list of files to be processed in this given thread/process
+#         :store_info     information about data-store, for now disk isn't thread safe
+#         :logger_info    information about where to store the logs
+#     """
 
-    if not logger_info :
-        logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
-    else:
-        logger = factory.instance(**logger_info)
+#     if not logger_info :
+#         logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
+#     else:
+#         logger = factory.instance(**logger_info)
 
-    writer = factory.instance(**store_info)
-    for filename in files :
+#     writer = factory.instance(**store_info)
+#     for filename in files :
         
-        if filename.strip() == '':
-            continue
-        # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])  
-        # 
-        try:              
-            content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
+#         if filename.strip() == '':
+#             continue
+#         # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])  
+#         # 
+#         try:              
+#             content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
             
-            if content :                        
-                writer.write(content)
-            if logs :
-                [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
-            else:
-                logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
-        except Exception as e:
+#             if content :                        
+#                 writer.write(content)
+#             if logs :
+#                 [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
+#             else:
+#                 logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
+#         except Exception as e:
             
-            logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
-        # print ([filename,len(content)])
-        #
-        # @TODO: forward this data to the writer and log engine 
+#             logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
+#         # print ([filename,len(content)])
+#         #
+#         # @TODO: forward this data to the writer and log engine 
         #
 def upgrade(**args):
     """
@@ -360,95 +379,27 @@ if __name__ == '__main__' :
         #
         # this function is designed to export the data to csv
         #
-        format = SYS_ARGS['format'] if 'format' in SYS_ARGS else 'csv'
-        format = format.lower()
-        if set([format]) not in ['xls','csv'] :
-            format = 'csv'
-        
-    else:
-        msg = """
-        cli:
-        
-            healthcare-io.py    --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
-            healthcare-io.py    --parse --folder <path> [--batch <value>] [--resume]
-            healthcare-io.py    --check-update
-        action :
-            --signup|init   signup user and get configuration file
-            --parse         starts parsing
-            --check         checks for updates
-        parameters :
-            --<[signup|init]>   signup or get a configuration file from a parsing server
-            --folder            location of the files (the program will recursively traverse it)
-            --store             data store mongo or sqlite or mongodb
-            --resume            will attempt to resume if there was an interruption
-        """
-        print(msg)
-        pass
-    # """
-    # The program was called from the command line thus we are expecting 
-    #     parse   in [claims,remits]
-    #     config  os.sep.path.exists(path)
-    #     folder    os.sep.path.exists(path)
-    #     store   store ()
-    # """
-    # p = len( set(['store','config','folder']) & set(SYS_ARGS.keys())) == 3 and ('db' in SYS_ARGS or 'path' in SYS_ARGS)
-    # TYPE = {
-    #     'mongo':'mongo.MongoWriter',
-    #     'couch':'couch.CouchWriter',
-    #     'disk':'disk.DiskWriter'
-    # }
-    # INFO = {
-    #     '837':{'scope':'claims','section':'HL'},
-    #     '835':{'scope':'remits','section':'CLP'}
-    # }
-    # if p :
-    #     args = {}
-    #     scope = SYS_ARGS['config'][:-5].split(os.sep)[-1]
-    #     CONTEXT = INFO[scope]['scope']
-    #     #
-    #     # @NOTE:
-    #     # improve how database and data stores are handled.
-    #     if SYS_ARGS['store'] == 'couch' :
-    #         args = {'url': SYS_ARGS['url'] if 'url' in SYS_ARGS else 'http://localhost:5984'}
-    #         args['dbname'] = SYS_ARGS['db']
+        path = SYS_ARGS['config']
+        TYPE = SYS_ARGS['export'] if 'export' in SYS_ARGS else '835'
+        if not os.path.exists(path) or TYPE not in ['835','837']:
+            print (HELP_MESSAGE)
+        else:
+            #
+            # Let's run the export function  ..., This will push files into a data-store of choice Redshift, PostgreSQL, MySQL ...
+            #
+            _store = json.loads( (open(path) ).read())
             
-    #     elif SYS_ARGS ['store'] == 'mongo':
-    #         args = {'host':SYS_ARGS['host']if 'host' in SYS_ARGS else 'localhost:27017'}
-    #     if SYS_ARGS['store'] in ['mongo','couch']:
-    #         args['dbname'] = SYS_ARGS['db'] if 'db' in SYS_ARGS else 'claims_outcomes'
-    #         args['doc'] = CONTEXT
+            pipes = export.Factory.instance(type=TYPE,write_store={"type":"sql.SQLWriter","args":{"provider":"postgresql","db":"sample",}}) #"inspect":0,"cast":0}})  
+            # pipes[0].run()
+            for thread in pipes:
+                thread.start()
+                time.sleep(1)
+            while pipes :
+                pipes = [thread for thread in pipes if thread.is_alive()]     
+                time.sleep(1)       
+
 
-    #     TYPE    = TYPE[SYS_ARGS['store']] 
-    #     writer  = factory.instance(type=TYPE,args=args)
-    #     if SYS_ARGS['store'] == 'disk':
-    #         writer.init(path = 'output-claims.json')
-    #     logger  = factory.instance(type=TYPE,args= dict(args,**{"doc":"logs"}))
-    #     files   = os.listdir(SYS_ARGS['folder'])
-    #     CONFIG  = json.loads(open(SYS_ARGS['config']).read())
-    #     SECTION = INFO[scope]['section']
         
-    #     for file in files :
-    #         if 'limit' in SYS_ARGS and files.index(file) == int(SYS_ARGS['limit']) :
-    #             break
-    #         else:
-    #             filename = os.sep.join([SYS_ARGS['folder'],file])
-                
-    #             try:
-    #                 content,logs = get_content(filename,CONFIG,SECTION)
-    #             except Exception as e:
-    #                 if sys.version_info[0] > 2 :
-    #                     logs = [{"filename":filename,"msg":e.args[0]}]
-    #                 else:
-    #                     logs = [{"filename":filename,"msg":e.message}]
-    #                 content = None
-    #             if content :
-                    
-    #                 writer.write(content)
-    #             if logs:
-                    
-    #                 logger.write(logs)
-            
-                
-    #     pass
-    # else:
-    #     print (__doc__)
+    else:
+        
+        print(HELP_MESSAGE)

+ 1 - 1
healthcareio/server/__init__.py

@@ -9,7 +9,7 @@ import transport
 import pandas as pd
 import numpy as np
 from healthcareio import x12
-
+from healthcareio.export import export
 from multiprocessing import Process
 # from flask_socketio import SocketIO, emit, disconnect,send
 from healthcareio.server import proxy

+ 1 - 1
setup.py

@@ -8,7 +8,7 @@ import sys
 def read(fname):
     return open(os.path.join(os.path.dirname(__file__), fname)).read() 
 args = {
-    "name":"healthcareio","version":"1.4.6",
+    "name":"healthcareio","version":"1.4.8",
     "author":"Vanderbilt University Medical Center",
     "author_email":"steve.l.nyemba@vumc.org",
     "include_package_data":True,