123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424 |
- #!/usr/bin/env python3
- """
- (c) 2019 Claims Toolkit,
- Health Information Privacy Lab, Vanderbilt University Medical Center
- Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
- Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
- This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
- The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
- Usage :
- Commandline :
- python edi-parser --scope --config <path> --folder <path> --store <[mongo|disk|couch]> --<db|path]> <id|path>
- with :
- --scope <claims|remits>
- --config path of the x12 to be parsed i.e it could be 835, or 837
- --folder location of the files (they must be decompressed)
- --store data store could be disk, mongodb, couchdb
- --db|path name of the folder to store the output or the database name
-
- Embedded in Code :
- import edi.parser
- import json
- file = '/data/claim_1.x12'
- conf = json.loads(open('config/837.json').read())
- edi.parser.get_content(filename,conf)
- """
- from healthcareio.params import SYS_ARGS
- from transport import factory
- import requests
- from healthcareio import analytics
- from healthcareio import server
- from healthcareio.parser import get_content
- import os
- import json
- import sys
- import numpy as np
- from multiprocessing import Process
- import time
- PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
- OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
- INFO = None
- URL = "https://healthcareio.the-phi.com"
- if not os.path.exists(PATH) :
- os.mkdir(PATH)
- import platform
- import sqlite3 as lite
- # PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
- def register (**args) :
- """
- :email user's email address
- :url url of the provider to register
- """
-
- email = args['email']
- url = args['url'] if 'url' in args else URL
- folders = [PATH,OUTPUT_FOLDER]
- for path in folders :
- if not os.path.exists(path) :
- os.mkdir(path)
-
- #
- #
- store = args['store'] if 'store' in args else 'sqlite'
- headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']}
- http = requests.session()
- r = http.post(url,headers=headers)
-
- #
- # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
- # if 'store' in args :
- # store = args['store']
- filename = (os.sep.join([PATH,'config.json']))
- info = r.json() #{"parser":r.json(),"store":store}
- info = dict({"owner":email},**info)
- info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
- info['out-folder'] = OUTPUT_FOLDER
- file = open( filename,'w')
- file.write( json.dumps(info))
- file.close()
- #
- # Create the sqlite3 database to
- def log(**args):
- """
- This function will perform a log of anything provided to it
- """
- pass
- def init():
- """
- read all the configuration from the
- """
- filename = os.sep.join([PATH,'config.json'])
- info = None
- if os.path.exists(filename):
- file = open(filename)
- info = json.loads(file.read())
- if not os.path.exists(info['out-folder']) :
- os.mkdir(info['out-folder'])
- if info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) :
- conn = lite.connect(info['store']['args']['path'],isolation_level=None)
- for key in info['schema'] :
- _sql = info['schema'][key]['create']
- # r = conn.execute("select * from sqlite_master where name in ('claims','remits')")
- conn.execute(_sql)
- conn.commit()
- conn.close()
- return info
- #
- # Global variables that load the configuration files
- def parse(**args):
- """
- This function will parse the content of a claim or remittance (x12 format) give the following parameters
- :filename absolute path of the file to be parsed
- :type claims|remits in x12 format
- """
- global INFO
- if not INFO :
- INFO = init()
- if args['type'] == 'claims' :
- CONFIG = INFO['parser']['837']
- elif args['type'] == 'remits' :
- CONFIG = INFO['parser']['835']
- else:
- CONFIG = None
- if CONFIG :
- # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0]
- CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1]
- SECTION = CONFIG['SECTION']
- os.environ['HEALTHCAREIO_SALT'] = INFO['owner']
-
- return get_content(args['filename'],CONFIG,SECTION)
- def apply(files,store_info,logger_info=None):
- """
- :files list of files to be processed in this given thread/process
- :store_info information about data-store, for now disk isn't thread safe
- :logger_info information about where to store the logs
- """
- if not logger_info :
- logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
- else:
- logger = factory.instance(**logger_info)
- writer = factory.instance(**store_info)
- for filename in files :
-
- if filename.strip() == '':
- continue
- # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
- #
- try:
- content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
- if content :
- writer.write(content)
- if logs :
- [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
- else:
- logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
- except Exception as e:
- logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
- # print ([filename,len(content)])
- #
- # @TODO: forward this data to the writer and log engine
- #
- def upgrade(**args):
- """
- :email provide us with who you are
- :key upgrade key provided by the server for a given email
- """
- url = args['url'] if 'url' in args else URL+"/upgrade"
- headers = {"key":args['key'],"email":args["email"],"url":url}
-
- if __name__ == '__main__' :
- info = init()
-
- if 'out-folder' in SYS_ARGS :
- OUTPUT_FOLDER = SYS_ARGS['out-folder']
-
- if set(list(SYS_ARGS.keys())) & set(['signup','init']):
- #
- # This command will essentially get a new copy of the configurations
- # @TODO: Tie the request to a version ?
- #
-
- email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
- url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com'
- store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
- db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
- register(email=email,url=url,store=store,db=db)
- # else:
- # m = """
- # usage:
- # healthcareio --signup --email myemail@provider.com [--url <host>]
-
- # """
- # print (m)
- elif 'upgrade' in SYS_ARGS :
- #
- # perform an upgrade i.e some code or new parsers information will be provided
- #
- pass
- elif 'parse' in SYS_ARGS and info:
- """
- In this section of the code we are expecting the user to provide :
- :folder location of the files to process or file to process
- :
- """
- files = []
- if 'file' in SYS_ARGS :
- files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else []
- if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
- names = os.listdir(SYS_ARGS['folder'])
- files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
- else:
- #
- # raise an erro
- pass
- #
- # @TODO: Log this here so we know what is being processed or not
- SCOPE = None
-
- if files and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
- # _map = {'claims':'837','remits':'835'}
- # key = _map[SYS_ARGS['parse']]
- # CONFIG = info['parser'][key]
- # if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) :
- # CONFIG = CONFIG[ int(SYS_ARGS['version'])]
- # else:
- # CONFIG = CONFIG[-1]
- logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
- if info['store']['type'] == 'disk.DiskWriter' :
- info['store']['args']['path'] += (os.sep + 'healthcare-io.json')
- elif info['store']['type'] == 'disk.SQLiteWriter' :
- # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3')
- pass
-
-
- if info['store']['type'] == 'disk.SQLiteWriter' :
- info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower()
- else:
- #
- # if we are working with no-sql we will put the logs in it (performance )?
- info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower()
- _info = json.loads(json.dumps(info['store']))
- _info['args']['doc'] = 'logs'
- logger = factory.instance(**_info)
- writer = factory.instance(**info['store'])
-
- #
- # we need to have batches ready for this in order to run some of these queries in parallel
- # @TODO: Make sure it is with a persistence storage (not disk .. not thread/process safe yet)
- # - Make sure we can leverage this on n-cores later on, for now the assumption is a single core
- #
- BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
-
- #logger = factory.instance(type='mongo.MongoWriter',args={'db':'healthcareio','doc':SYS_ARGS['parse']+'_logs'})
- # schema = info['schema']
-
- # for key in schema :
- # sql = schema[key]['create']
- # writer.write(sql)
- files = np.array_split(files,BATCH_COUNT)
- procs = []
- index = 0
- for row in files :
-
- row = row.tolist()
- logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
- proc = Process(target=apply,args=(row,info['store'],_info,))
- proc.start()
- procs.append(proc)
- index = index + 1
- while len(procs) > 0 :
- procs = [proc for proc in procs if proc.is_alive()]
- time.sleep(2)
- # for filename in files :
-
- # if filename.strip() == '':
- # continue
- # # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
- # #
- # try:
- # content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
- # if content :
- # writer.write(content)
- # if logs :
- # [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
- # else:
- # logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
- # except Exception as e:
- # logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
- # # print ([filename,len(content)])
- # #
- # # @TODO: forward this data to the writer and log engine
- # #
-
-
- pass
- elif 'analytics' in SYS_ARGS :
- PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
- DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
- SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
- #
- #
-
- # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
-
- e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible
- e.apply(type='claims',serialize=True)
- SYS_ARGS['engine'] = e
-
- pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
- pthread = Process(target=pointer,args=())
- pthread.start()
-
- elif 'export' in SYS_ARGS:
- #
- # this function is designed to export the data to csv
- #
- format = SYS_ARGS['format'] if 'format' in SYS_ARGS else 'csv'
- format = format.lower()
- if set([format]) not in ['xls','csv'] :
- format = 'csv'
-
- else:
- msg = """
- CLI Usage
- healthcare-io.py --register <email> --store <sqlite|mongo>
- healthcare-io.py --parse claims --folder <path> [--batch <value>]
- healthcare-io.py --parse remits --folder <path> [--batch <value>]
- parameters :
- --<[signup|init]> signup or get a configuration file from a parsing server
- --store data store mongo or sqlite
- """
- print(msg)
- pass
- # """
- # The program was called from the command line thus we are expecting
- # parse in [claims,remits]
- # config os.sep.path.exists(path)
- # folder os.sep.path.exists(path)
- # store store ()
- # """
- # p = len( set(['store','config','folder']) & set(SYS_ARGS.keys())) == 3 and ('db' in SYS_ARGS or 'path' in SYS_ARGS)
- # TYPE = {
- # 'mongo':'mongo.MongoWriter',
- # 'couch':'couch.CouchWriter',
- # 'disk':'disk.DiskWriter'
- # }
- # INFO = {
- # '837':{'scope':'claims','section':'HL'},
- # '835':{'scope':'remits','section':'CLP'}
- # }
- # if p :
- # args = {}
- # scope = SYS_ARGS['config'][:-5].split(os.sep)[-1]
- # CONTEXT = INFO[scope]['scope']
- # #
- # # @NOTE:
- # # improve how database and data stores are handled.
- # if SYS_ARGS['store'] == 'couch' :
- # args = {'url': SYS_ARGS['url'] if 'url' in SYS_ARGS else 'http://localhost:5984'}
- # args['dbname'] = SYS_ARGS['db']
-
- # elif SYS_ARGS ['store'] == 'mongo':
- # args = {'host':SYS_ARGS['host']if 'host' in SYS_ARGS else 'localhost:27017'}
- # if SYS_ARGS['store'] in ['mongo','couch']:
- # args['dbname'] = SYS_ARGS['db'] if 'db' in SYS_ARGS else 'claims_outcomes'
- # args['doc'] = CONTEXT
- # TYPE = TYPE[SYS_ARGS['store']]
- # writer = factory.instance(type=TYPE,args=args)
- # if SYS_ARGS['store'] == 'disk':
- # writer.init(path = 'output-claims.json')
- # logger = factory.instance(type=TYPE,args= dict(args,**{"doc":"logs"}))
- # files = os.listdir(SYS_ARGS['folder'])
- # CONFIG = json.loads(open(SYS_ARGS['config']).read())
- # SECTION = INFO[scope]['section']
-
- # for file in files :
- # if 'limit' in SYS_ARGS and files.index(file) == int(SYS_ARGS['limit']) :
- # break
- # else:
- # filename = os.sep.join([SYS_ARGS['folder'],file])
-
- # try:
- # content,logs = get_content(filename,CONFIG,SECTION)
- # except Exception as e:
- # if sys.version_info[0] > 2 :
- # logs = [{"filename":filename,"msg":e.args[0]}]
- # else:
- # logs = [{"filename":filename,"msg":e.message}]
- # content = None
- # if content :
-
- # writer.write(content)
- # if logs:
-
- # logger.write(logs)
-
-
- # pass
- # else:
- # print (__doc__)
|