healthcare-io.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454
  1. #!/usr/bin/env python3
  2. """
  3. (c) 2019 Claims Toolkit,
  4. Health Information Privacy Lab, Vanderbilt University Medical Center
  5. Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
  6. Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
  7. This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
  8. The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
  9. Usage :
  10. Commandline :
  11. python edi-parser --scope --config <path> --folder <path> --store <[mongo|disk|couch]> --<db|path]> <id|path>
  12. with :
  13. --scope <claims|remits>
  14. --config path of the x12 to be parsed i.e it could be 835, or 837
  15. --folder location of the files (they must be decompressed)
  16. --store data store could be disk, mongodb, couchdb
  17. --db|path name of the folder to store the output or the database name
  18. Embedded in Code :
  19. import edi.parser
  20. import json
  21. file = '/data/claim_1.x12'
  22. conf = json.loads(open('config/837.json').read())
  23. edi.parser.get_content(filename,conf)
  24. """
  25. from healthcareio.params import SYS_ARGS
  26. from transport import factory
  27. import requests
  28. from healthcareio import analytics
  29. from healthcareio import server
  30. from healthcareio.parser import get_content
  31. import os
  32. import json
  33. import sys
  34. import numpy as np
  35. from multiprocessing import Process
  36. import time
  37. from healthcareio import x12
  38. import smart
  39. from healthcareio.server import proxy
  40. import pandas as pd
  41. PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
  42. OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
  43. INFO = None
  44. URL = "https://healthcareio.the-phi.com"
  45. if not os.path.exists(PATH) :
  46. os.mkdir(PATH)
  47. import platform
  48. import sqlite3 as lite
  49. # PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
  50. def signup (**args) :
  51. """
  52. :email user's email address
  53. :url url of the provider to signup
  54. """
  55. email = args['email']
  56. url = args['url'] if 'url' in args else URL
  57. folders = [PATH,OUTPUT_FOLDER]
  58. for path in folders :
  59. if not os.path.exists(path) :
  60. os.mkdir(path)
  61. #
  62. #
  63. store = args['store'] if 'store' in args else 'sqlite'
  64. headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']}
  65. http = requests.session()
  66. r = http.post(url,headers=headers)
  67. #
  68. # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
  69. # if 'store' in args :
  70. # store = args['store']
  71. filename = (os.sep.join([PATH,'config.json']))
  72. info = r.json() #{"parser":r.json(),"store":store}
  73. info = dict({"owner":email},**info)
  74. info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
  75. info['out-folder'] = OUTPUT_FOLDER
  76. file = open( filename,'w')
  77. file.write( json.dumps(info))
  78. file.close()
  79. #
  80. # Create the sqlite3 database to
  81. def log(**args):
  82. """
  83. This function will perform a log of anything provided to it
  84. """
  85. pass
  86. def init():
  87. """
  88. read all the configuration from the
  89. """
  90. filename = os.sep.join([PATH,'config.json'])
  91. info = None
  92. if os.path.exists(filename):
  93. file = open(filename)
  94. info = json.loads(file.read())
  95. if not os.path.exists(info['out-folder']) :
  96. os.mkdir(info['out-folder'])
  97. if info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) :
  98. conn = lite.connect(info['store']['args']['path'],isolation_level=None)
  99. for key in info['schema'] :
  100. _sql = info['schema'][key]['create']
  101. # r = conn.execute("select * from sqlite_master where name in ('claims','remits')")
  102. conn.execute(_sql)
  103. conn.commit()
  104. conn.close()
  105. return info
  106. #
  107. # Global variables that load the configuration files
  108. def parse(**args):
  109. """
  110. This function will parse the content of a claim or remittance (x12 format) give the following parameters
  111. :filename absolute path of the file to be parsed
  112. :type claims|remits in x12 format
  113. """
  114. global INFO
  115. if not INFO :
  116. INFO = init()
  117. if args['type'] == 'claims' :
  118. CONFIG = INFO['parser']['837']
  119. elif args['type'] == 'remits' :
  120. CONFIG = INFO['parser']['835']
  121. else:
  122. CONFIG = None
  123. if CONFIG :
  124. # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0]
  125. CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1]
  126. SECTION = CONFIG['SECTION']
  127. os.environ['HEALTHCAREIO_SALT'] = INFO['owner']
  128. return get_content(args['filename'],CONFIG,SECTION)
  129. def resume (files,id,config):
  130. _args = config['store'].copy()
  131. if 'mongo' in config['store']['type'] :
  132. _args['type'] = 'mongo.MongoReader'
  133. reader = factory.instance(**_args)
  134. _files = []
  135. if 'resume' in config['analytics'] :
  136. _args = config['analytics']['resume'][id]
  137. _files = reader.read(**_args)
  138. _files = [item['name'] for item in _files if item['name'] != None]
  139. return list(set(files) - set(_files))
  140. return files
  141. pass
  142. def apply(files,store_info,logger_info=None):
  143. """
  144. :files list of files to be processed in this given thread/process
  145. :store_info information about data-store, for now disk isn't thread safe
  146. :logger_info information about where to store the logs
  147. """
  148. if not logger_info :
  149. logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
  150. else:
  151. logger = factory.instance(**logger_info)
  152. writer = factory.instance(**store_info)
  153. for filename in files :
  154. if filename.strip() == '':
  155. continue
  156. # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
  157. #
  158. try:
  159. content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
  160. if content :
  161. writer.write(content)
  162. if logs :
  163. [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
  164. else:
  165. logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
  166. except Exception as e:
  167. logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
  168. # print ([filename,len(content)])
  169. #
  170. # @TODO: forward this data to the writer and log engine
  171. #
  172. def upgrade(**args):
  173. """
  174. :email provide us with who you are
  175. :key upgrade key provided by the server for a given email
  176. """
  177. url = args['url'] if 'url' in args else URL+"/upgrade"
  178. headers = {"key":args['key'],"email":args["email"],"url":url}
  179. def check(**_args):
  180. """
  181. This function will check if there is an update available (versions are in the configuration file)
  182. :param url
  183. """
  184. url = _args['url'][:-1] if _args['url'].endswith('/') else _args['url']
  185. url = url + "/version"
  186. if 'version' not in _args :
  187. version = {"_id":"version","current":0.0}
  188. else:
  189. version = _args['version']
  190. http = requests.session()
  191. r = http.get(url)
  192. return r.json()
  193. if __name__ == '__main__' :
  194. info = init()
  195. if 'out-folder' in SYS_ARGS :
  196. OUTPUT_FOLDER = SYS_ARGS['out-folder']
  197. SYS_ARGS['url'] = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
  198. if set(list(SYS_ARGS.keys())) & set(['signup','init']):
  199. #
  200. # This command will essentially get a new copy of the configurations
  201. # @TODO: Tie the request to a version ?
  202. #
  203. email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
  204. url = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
  205. store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
  206. db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
  207. signup(email=email,url=url,store=store,db=db)
  208. # else:
  209. # m = """
  210. # usage:
  211. # healthcareio --signup --email myemail@provider.com [--url <host>]
  212. # """
  213. # print (m)
  214. elif 'upgrade' in SYS_ARGS :
  215. #
  216. # perform an upgrade i.e some code or new parsers information will be provided
  217. #
  218. pass
  219. elif 'parse' in SYS_ARGS and info:
  220. """
  221. In this section of the code we are expecting the user to provide :
  222. :folder location of the files to process or file to process
  223. :
  224. """
  225. files = []
  226. if 'file' in SYS_ARGS :
  227. files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else []
  228. if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
  229. for root,_dir,f in os.walk(SYS_ARGS['folder']) :
  230. if f :
  231. files += [os.sep.join([root,name]) for name in f]
  232. # names = os.listdir(SYS_ARGS['folder'])
  233. # files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
  234. else:
  235. #
  236. # raise an error
  237. pass
  238. #
  239. # if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't
  240. #
  241. if 'resume' in SYS_ARGS :
  242. store_config = json.loads( (open(os.sep.join([PATH,'config.json']))).read() )
  243. files = proxy.get.resume(files,store_config )
  244. # print (["Found ",len(files)," files unprocessed"])
  245. #
  246. # @TODO: Log this here so we know what is being processed or not
  247. SCOPE = None
  248. if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
  249. BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
  250. files = np.array_split(files,BATCH_COUNT)
  251. procs = []
  252. index = 0
  253. for row in files :
  254. row = row.tolist()
  255. # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
  256. # proc = Process(target=apply,args=(row,info['store'],_info,))
  257. parser = x12.Parser(os.sep.join([PATH,'config.json']))
  258. parser.set.files(row)
  259. parser.start()
  260. procs.append(parser)
  261. # index = index + 1
  262. while len(procs) > 0 :
  263. procs = [proc for proc in procs if proc.is_alive()]
  264. time.sleep(2)
  265. pass
  266. elif 'analytics' in SYS_ARGS :
  267. PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
  268. DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
  269. SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
  270. #
  271. #
  272. # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  273. if os.path.exists(os.sep.join([PATH,'config.json'])) :
  274. e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible
  275. e.apply(type='claims',serialize=True)
  276. SYS_ARGS['engine'] = e
  277. SYS_ARGS['config'] = json.loads(open(os.sep.join([PATH,'config.json'])).read())
  278. else:
  279. SYS_ARGS['config'] = {"owner":None,"store":None}
  280. if 'args' not in SYS_ARGS['config'] :
  281. SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
  282. me = pd.DataFrame(smart.top.read(name='healthcare-io.py')).args.unique().tolist()
  283. SYS_ARGS['me'] = me[0] #-- This key will identify the current process
  284. pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
  285. pthread = Process(target=pointer,args=())
  286. pthread.start()
  287. elif 'check-update' in SYS_ARGS :
  288. _args = {"url":SYS_ARGS['url']}
  289. try:
  290. if os.path.exists(os.sep.join([PATH,'config.json'])) :
  291. SYS_ARGS['config'] = json.loads((open(os.sep.join([PATH,'config.json']))).read())
  292. else:
  293. SYS_ARGS['config'] = {}
  294. if 'version' in SYS_ARGS['config'] :
  295. _args['version'] = SYS_ARGS['config']['version']
  296. version = check(**_args)
  297. _version = {"current":0.0}if 'version' not in SYS_ARGS['config'] else SYS_ARGS['config']['version']
  298. if _version['current'] != version['current'] :
  299. print ()
  300. print ("You need to upgrade your system to version to ",version['current'])
  301. print ("\t- signup (for new configuration)")
  302. print ("\t- use pip to upgrade the codebase")
  303. else:
  304. print ()
  305. print ("You are running the current configuraiton version ",_version['current'])
  306. except Exception as e:
  307. print (e)
  308. pass
  309. elif 'export' in SYS_ARGS:
  310. #
  311. # this function is designed to export the data to csv
  312. #
  313. format = SYS_ARGS['format'] if 'format' in SYS_ARGS else 'csv'
  314. format = format.lower()
  315. if set([format]) not in ['xls','csv'] :
  316. format = 'csv'
  317. else:
  318. msg = """
  319. cli:
  320. healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
  321. healthcare-io.py --parse --folder <path> [--batch <value>] [--resume]
  322. healthcare-io.py --check-update
  323. action :
  324. --signup|init signup user and get configuration file
  325. --parse starts parsing
  326. --check checks for updates
  327. parameters :
  328. --<[signup|init]> signup or get a configuration file from a parsing server
  329. --folder location of the files (the program will recursively traverse it)
  330. --store data store mongo or sqlite or mongodb
  331. --resume will attempt to resume if there was an interruption
  332. """
  333. print(msg)
  334. pass
  335. # """
  336. # The program was called from the command line thus we are expecting
  337. # parse in [claims,remits]
  338. # config os.sep.path.exists(path)
  339. # folder os.sep.path.exists(path)
  340. # store store ()
  341. # """
  342. # p = len( set(['store','config','folder']) & set(SYS_ARGS.keys())) == 3 and ('db' in SYS_ARGS or 'path' in SYS_ARGS)
  343. # TYPE = {
  344. # 'mongo':'mongo.MongoWriter',
  345. # 'couch':'couch.CouchWriter',
  346. # 'disk':'disk.DiskWriter'
  347. # }
  348. # INFO = {
  349. # '837':{'scope':'claims','section':'HL'},
  350. # '835':{'scope':'remits','section':'CLP'}
  351. # }
  352. # if p :
  353. # args = {}
  354. # scope = SYS_ARGS['config'][:-5].split(os.sep)[-1]
  355. # CONTEXT = INFO[scope]['scope']
  356. # #
  357. # # @NOTE:
  358. # # improve how database and data stores are handled.
  359. # if SYS_ARGS['store'] == 'couch' :
  360. # args = {'url': SYS_ARGS['url'] if 'url' in SYS_ARGS else 'http://localhost:5984'}
  361. # args['dbname'] = SYS_ARGS['db']
  362. # elif SYS_ARGS ['store'] == 'mongo':
  363. # args = {'host':SYS_ARGS['host']if 'host' in SYS_ARGS else 'localhost:27017'}
  364. # if SYS_ARGS['store'] in ['mongo','couch']:
  365. # args['dbname'] = SYS_ARGS['db'] if 'db' in SYS_ARGS else 'claims_outcomes'
  366. # args['doc'] = CONTEXT
  367. # TYPE = TYPE[SYS_ARGS['store']]
  368. # writer = factory.instance(type=TYPE,args=args)
  369. # if SYS_ARGS['store'] == 'disk':
  370. # writer.init(path = 'output-claims.json')
  371. # logger = factory.instance(type=TYPE,args= dict(args,**{"doc":"logs"}))
  372. # files = os.listdir(SYS_ARGS['folder'])
  373. # CONFIG = json.loads(open(SYS_ARGS['config']).read())
  374. # SECTION = INFO[scope]['section']
  375. # for file in files :
  376. # if 'limit' in SYS_ARGS and files.index(file) == int(SYS_ARGS['limit']) :
  377. # break
  378. # else:
  379. # filename = os.sep.join([SYS_ARGS['folder'],file])
  380. # try:
  381. # content,logs = get_content(filename,CONFIG,SECTION)
  382. # except Exception as e:
  383. # if sys.version_info[0] > 2 :
  384. # logs = [{"filename":filename,"msg":e.args[0]}]
  385. # else:
  386. # logs = [{"filename":filename,"msg":e.message}]
  387. # content = None
  388. # if content :
  389. # writer.write(content)
  390. # if logs:
  391. # logger.write(logs)
  392. # pass
  393. # else:
  394. # print (__doc__)