123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400 |
- #!/usr/bin/env python3
- """
- (c) 2019 Claims Toolkit,
- Health Information Privacy Lab, Vanderbilt University Medical Center
- Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
- Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
- This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
- The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
- Usage :
- Commandline :
- python edi-parser --scope --config <path> --folder <path> --store <[mongo|disk|couch]> --<db|path]> <id|path>
- with :
- --scope <claims|remits>
- --config path of the x12 to be parsed i.e it could be 835, or 837
- --folder location of the files (they must be decompressed)
- --store data store could be disk, mongodb, couchdb
- --db|path name of the folder to store the output or the database name
-
- Embedded in Code :
- import edi.parser
- import json
- file = '/data/claim_1.x12'
- conf = json.loads(open('config/837.json').read())
- edi.parser.get_content(filename,conf)
- """
- from healthcareio.params import SYS_ARGS
- from transport import factory
- import requests
- from healthcareio import analytics
- from healthcareio import server
- from healthcareio.parser import get_content
- import os
- import json
- import sys
- import numpy as np
- from multiprocessing import Process
- import time
- from healthcareio import x12
- from healthcareio.export import export
- import smart
- import transport
- from healthcareio.server import proxy
- import pandas as pd
- PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
- OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
- INFO = None
- URL = "https://healthcareio.the-phi.com"
- if not os.path.exists(PATH) :
- os.mkdir(PATH)
- import platform
- import sqlite3 as lite
- # PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
- CONFIG_FILE = os.sep.join([PATH,'config.json']) if 'config' not in SYS_ARGS else SYS_ARGS['config']
- HELP_MESSAGE = """
- cli:
- #
- # Signup, allows parsing configuration to be downloaded
- #
- # Support for SQLite3
- healthcare-io.py --signup steve@the-phi.com --store sqlite
-
- #or support for mongodb
- healthcare-io.py --signup steve@the-phi.com --store mongo
-
- healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
- healthcare-io.py --parse --folder <path> [--batch <value>] [--resume]
- healthcare-io.py --check-update
- healthcare-io.py --export <835|837> --config <config-path>
- action :
- --signup|init signup user and get configuration file
- --parse starts parsing
- --check-update checks for updates
- --export export data of a 835 or 837 into another database
- parameters :
- --<[signup|init]> signup or get a configuration file from a parsing server
- --folder location of the files (the program will recursively traverse it)
- --store data store mongo or sqlite or mongodb
- --resume will attempt to resume if there was an interruption
- """
- def signup (**args) :
- """
- :email user's email address
- :url url of the provider to signup
- """
-
- email = args['email']
- url = args['url'] if 'url' in args else URL
- folders = [PATH,OUTPUT_FOLDER]
- for path in folders :
- if not os.path.exists(path) :
- os.mkdir(path)
-
- #
- #
- store = args['store'] if 'store' in args else 'sqlite'
- headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']}
- http = requests.session()
- r = http.post(url,headers=headers)
-
- #
- # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
- # if 'store' in args :
- # store = args['store']
- # filename = (os.sep.join([PATH,'config.json']))
- filename = CONFIG_FILE
- info = r.json() #{"parser":r.json(),"store":store}
- info = dict({"owner":email},**info)
- info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
- info['out-folder'] = OUTPUT_FOLDER
- file = open( filename,'w')
- file.write( json.dumps(info))
- file.close()
- _m = """
- Thank you for signingup!!
- Your configuration file is store in :path,
- - More information visit https://healthcareio.the-phi.com/parser
- - Access the source https://healthcareio.the-phi.com/git/code/parser
- """.replace(":path",CONFIG_FILE)
- print (_m)
- #
- # Create the sqlite3 database to
- def log(**args):
- """
- This function will perform a log of anything provided to it
- """
- pass
- def init():
- """
- read all the configuration from disk.
- Requirements for configuration file :
- {out-folder,store,837,835 }
- """
- # filename = os.sep.join([PATH,'config.json'])
- filename = CONFIG_FILE
- info = None
- if os.path.exists(filename):
- #
- # Loading the configuration file (JSON format)
- file = open(filename)
- info = json.loads(file.read())
-
- if 'output-folder' not in info and not os.path.exists(OUTPUT_FOLDER) :
- os.mkdir(OUTPUT_FOLDER)
- elif 'output-folder' in info and not os.path.exists(info['out-folder']) :
- os.mkdir(info['out-folder'])
- # if 'type' in info['store'] :
- lwriter = None
- is_sqlite = False
- if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter' :
- lwriter = transport.factory.instance(**info['store'])
- is_sqlite = True
- elif 'provider' in info['store'] and info['store']['provider'] == 'sqlite' :
- lwriter = transport.instance(**info['store']) ;
- is_sqlite = True
-
- if lwriter and is_sqlite:
- for key in info['schema'] :
- if key != 'logs' :
- _id = 'claims' if key == '837' else 'remits'
- else:
- _id = key
-
- if not lwriter.has(table=_id) :
- lwriter.apply(info['schema'][key]['create'])
- # [lwriter.apply( info['schema'][key]['create']) for key in info['schema'] if not lwriter.has(table=key)]
- lwriter.close()
- return info
- def upgrade(**args):
- """
- :email provide us with who you are
- :key upgrade key provided by the server for a given email
- """
- url = args['url'] if 'url' in args else URL+"/upgrade"
- headers = {"key":args['key'],"email":args["email"],"url":url}
- def check(**_args):
- """
- This function will check if there is an update available (versions are in the configuration file)
- :param url
- """
- url = _args['url'][:-1] if _args['url'].endswith('/') else _args['url']
- url = url + "/version"
- if 'version' not in _args :
- version = {"_id":"version","current":0.0}
- else:
- version = _args['version']
- http = requests.session()
- r = http.get(url)
- return r.json()
-
- if __name__ == '__main__' :
- info = init()
-
- if 'out-folder' in SYS_ARGS :
- OUTPUT_FOLDER = SYS_ARGS['out-folder']
- SYS_ARGS['url'] = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
-
- if set(list(SYS_ARGS.keys())) & set(['signup','init']):
- #
- # This command will essentially get a new copy of the configurations
- # @TODO: Tie the request to a version ?
- #
-
- email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
- url = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
- store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
- db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
- signup(email=email,url=url,store=store,db=db)
- # else:
- # m = """
- # usage:
- # healthcareio --signup --email myemail@provider.com [--url <host>]
-
- # """
- # print (m)
- elif 'upgrade' in SYS_ARGS :
- #
- # perform an upgrade i.e some code or new parsers information will be provided
- #
- pass
- elif 'parse' in SYS_ARGS and info:
- """
- In this section of the code we are expecting the user to provide :
- :folder location of the files to process or file to process
- :
- """
- files = []
- if 'file' in SYS_ARGS :
- files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else []
- if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
- for root,_dir,f in os.walk(SYS_ARGS['folder']) :
-
- if f :
- files += [os.sep.join([root,name]) for name in f]
-
- # names = os.listdir(SYS_ARGS['folder'])
- # files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
- else:
- #
- # raise an error
-
- pass
- #
- # if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't
- #
- if 'resume' in SYS_ARGS :
- store_config = json.loads( (open(CONFIG_FILE)).read() )
- files = proxy.get.resume(files,store_config )
- # print (["Found ",len(files)," files unprocessed"])
- #
- # @TODO: Log this here so we know what is being processed or not
- SCOPE = None
-
- if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
- BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
-
- files = np.array_split(files,BATCH_COUNT)
- procs = []
- index = 0
- for row in files :
-
- row = row.tolist()
- # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
- # proc = Process(target=apply,args=(row,info['store'],_info,))
- # parser = x12.Parser(os.sep.join([PATH,'config.json']))
-
- parser = x12.Parser(CONFIG_FILE)
- parser.set.files(row)
- parser.start()
- procs.append(parser)
- # index = index + 1
- while len(procs) > 0 :
- procs = [proc for proc in procs if proc.is_alive()]
- time.sleep(2)
- uri = OUTPUT_FOLDER
- store_config = json.loads( (open(CONFIG_FILE)).read() )['store']
- if 'type' in store_config :
- uri = store_config['args']['host'] if 'host' in store_config['args'] else ( store_config['args']['path'] if 'path' in store_config['args'] else store_config['args']['database'])
- if 'SQLite' in store_config['type']:
- provider = 'sqlite'
- elif 'sql' in store_config['type'] :
- provider = 'SQL'
- else:
- provider = 'mongo'
- else:
- provider = store_config['provider']
- _msg = """
- Completed Parsing, The data is available in :provider database at :uri
- Logs are equally available for errors and summary statistics to be compiled
- """.replace(":provider",provider).replace(":uri",uri)
- print (_msg)
-
- pass
- elif 'analytics' in SYS_ARGS :
- PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
- DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
- SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
- #
- #
-
- # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
-
- if os.path.exists(CONFIG_FILE) :
- e = analytics.engine(CONFIG_FILE) #--@TODO: make the configuration file globally accessible
- e.apply(type='claims',serialize=True)
- SYS_ARGS['engine'] = e
- SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read())
- else:
- SYS_ARGS['config'] = {"owner":None,"store":None}
- if 'args' not in SYS_ARGS['config'] :
- SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
-
- me = pd.DataFrame(smart.top.read(name='healthcare-io.py')).args.unique().tolist()
- SYS_ARGS['me'] = me[0] #-- This key will identify the current process
- pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
- pthread = Process(target=pointer,args=())
- pthread.start()
- elif 'check-update' in SYS_ARGS :
- _args = {"url":SYS_ARGS['url']}
- try:
- if os.path.exists(CONFIG_FILE) :
- SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read())
- else:
- SYS_ARGS['config'] = {}
- if 'version' in SYS_ARGS['config'] :
- _args['version'] = SYS_ARGS['config']['version']
- version = check(**_args)
- _version = {"current":0.0}if 'version' not in SYS_ARGS['config'] else SYS_ARGS['config']['version']
- if _version['current'] != version['current'] :
- print ()
- print ("You need to upgrade your system to version to ",version['current'])
- print ("\t- signup (for new configuration)")
- print ("\t- use pip to upgrade the codebase")
- else:
- print ()
- print ("You are running the current configuraiton version ",_version['current'])
- except Exception as e:
- print (e)
- pass
-
- elif 'export' in SYS_ARGS:
- #
- # this function is designed to export the data to csv
- #
- path = SYS_ARGS['export-config']
-
- X12_TYPE = SYS_ARGS['export'] if 'export' in SYS_ARGS else '835'
- if not os.path.exists(path) or X12_TYPE not in ['835','837']:
- print (HELP_MESSAGE)
- else:
- #
- # Let's run the export function ..., This will push files into a data-store of choice Redshift, PostgreSQL, MySQL ...
- #
-
- # _store = {"type":"sql.SQLWriter","args":json.loads( (open(path) ).read())}
- _store = json.loads( (open(path) ).read())
-
- pipes = export.Factory.instance(type=X12_TYPE,write_store=_store,config = CONFIG_FILE) #"inspect":0,"cast":0}})
- # pipes[0].run()
- # print (pipes)
-
-
- for thread in pipes:
-
- if 'table' in SYS_ARGS and SYS_ARGS['table'] != thread.table :
- continue
- thread.start()
- time.sleep(1)
- thread.join()
-
-
- else:
-
- print(HELP_MESSAGE)
|