healthcare-io.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451
  1. #!/usr/bin/env python3
  2. """
  3. (c) 2019 Claims Toolkit,
  4. Health Information Privacy Lab, Vanderbilt University Medical Center
  5. Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
  6. Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
  7. This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
  8. The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
  9. Usage :
  10. Commandline :
  11. python edi-parser --scope --config <path> --folder <path> --store <[mongo|disk|couch]> --<db|path]> <id|path>
  12. with :
  13. --scope <claims|remits>
  14. --config path of the x12 to be parsed i.e it could be 835, or 837
  15. --folder location of the files (they must be decompressed)
  16. --store data store could be disk, mongodb, couchdb
  17. --db|path name of the folder to store the output or the database name
  18. Embedded in Code :
  19. import edi.parser
  20. import json
  21. file = '/data/claim_1.x12'
  22. conf = json.loads(open('config/837.json').read())
  23. edi.parser.get_content(filename,conf)
  24. """
  25. from healthcareio.params import SYS_ARGS
  26. from transport import factory
  27. import requests
  28. from healthcareio import analytics
  29. from healthcareio import server
  30. from healthcareio.parser import get_content
  31. import os
  32. import json
  33. import sys
  34. import numpy as np
  35. from multiprocessing import Process
  36. import time
  37. from healthcareio import x12
  38. import smart
  39. import pandas as pd
  40. PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
  41. OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
  42. INFO = None
  43. URL = "https://healthcareio.the-phi.com"
  44. if not os.path.exists(PATH) :
  45. os.mkdir(PATH)
  46. import platform
  47. import sqlite3 as lite
  48. # PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
  49. def register (**args) :
  50. """
  51. :email user's email address
  52. :url url of the provider to register
  53. """
  54. email = args['email']
  55. url = args['url'] if 'url' in args else URL
  56. folders = [PATH,OUTPUT_FOLDER]
  57. for path in folders :
  58. if not os.path.exists(path) :
  59. os.mkdir(path)
  60. #
  61. #
  62. store = args['store'] if 'store' in args else 'sqlite'
  63. headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']}
  64. http = requests.session()
  65. r = http.post(url,headers=headers)
  66. #
  67. # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
  68. # if 'store' in args :
  69. # store = args['store']
  70. filename = (os.sep.join([PATH,'config.json']))
  71. info = r.json() #{"parser":r.json(),"store":store}
  72. info = dict({"owner":email},**info)
  73. info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
  74. info['out-folder'] = OUTPUT_FOLDER
  75. file = open( filename,'w')
  76. file.write( json.dumps(info))
  77. file.close()
  78. #
  79. # Create the sqlite3 database to
  80. def log(**args):
  81. """
  82. This function will perform a log of anything provided to it
  83. """
  84. pass
  85. def init():
  86. """
  87. read all the configuration from the
  88. """
  89. filename = os.sep.join([PATH,'config.json'])
  90. info = None
  91. if os.path.exists(filename):
  92. file = open(filename)
  93. info = json.loads(file.read())
  94. if not os.path.exists(info['out-folder']) :
  95. os.mkdir(info['out-folder'])
  96. if info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) :
  97. conn = lite.connect(info['store']['args']['path'],isolation_level=None)
  98. for key in info['schema'] :
  99. _sql = info['schema'][key]['create']
  100. # r = conn.execute("select * from sqlite_master where name in ('claims','remits')")
  101. conn.execute(_sql)
  102. conn.commit()
  103. conn.close()
  104. return info
  105. #
  106. # Global variables that load the configuration files
  107. def parse(**args):
  108. """
  109. This function will parse the content of a claim or remittance (x12 format) give the following parameters
  110. :filename absolute path of the file to be parsed
  111. :type claims|remits in x12 format
  112. """
  113. global INFO
  114. if not INFO :
  115. INFO = init()
  116. if args['type'] == 'claims' :
  117. CONFIG = INFO['parser']['837']
  118. elif args['type'] == 'remits' :
  119. CONFIG = INFO['parser']['835']
  120. else:
  121. CONFIG = None
  122. if CONFIG :
  123. # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0]
  124. CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1]
  125. SECTION = CONFIG['SECTION']
  126. os.environ['HEALTHCAREIO_SALT'] = INFO['owner']
  127. return get_content(args['filename'],CONFIG,SECTION)
  128. def resume (files,id,config):
  129. _args = config['store'].copy()
  130. if 'mongo' in config['store']['type'] :
  131. _args['type'] = 'mongo.MongoReader'
  132. reader = factory.instance(**_args)
  133. _files = []
  134. if 'resume' in config['analytics'] :
  135. _args = config['analytics']['resume'][id]
  136. _files = reader.read(**_args)
  137. _files = [item['name'] for item in _files if item['name'] != None]
  138. return list(set(files) - set(_files))
  139. return files
  140. pass
  141. def apply(files,store_info,logger_info=None):
  142. """
  143. :files list of files to be processed in this given thread/process
  144. :store_info information about data-store, for now disk isn't thread safe
  145. :logger_info information about where to store the logs
  146. """
  147. if not logger_info :
  148. logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
  149. else:
  150. logger = factory.instance(**logger_info)
  151. writer = factory.instance(**store_info)
  152. for filename in files :
  153. if filename.strip() == '':
  154. continue
  155. # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
  156. #
  157. try:
  158. content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
  159. if content :
  160. writer.write(content)
  161. if logs :
  162. [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
  163. else:
  164. logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
  165. except Exception as e:
  166. logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
  167. # print ([filename,len(content)])
  168. #
  169. # @TODO: forward this data to the writer and log engine
  170. #
  171. def upgrade(**args):
  172. """
  173. :email provide us with who you are
  174. :key upgrade key provided by the server for a given email
  175. """
  176. url = args['url'] if 'url' in args else URL+"/upgrade"
  177. headers = {"key":args['key'],"email":args["email"],"url":url}
  178. if __name__ == '__main__' :
  179. info = init()
  180. if 'out-folder' in SYS_ARGS :
  181. OUTPUT_FOLDER = SYS_ARGS['out-folder']
  182. if set(list(SYS_ARGS.keys())) & set(['signup','init']):
  183. #
  184. # This command will essentially get a new copy of the configurations
  185. # @TODO: Tie the request to a version ?
  186. #
  187. email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
  188. url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com'
  189. store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
  190. db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
  191. register(email=email,url=url,store=store,db=db)
  192. # else:
  193. # m = """
  194. # usage:
  195. # healthcareio --signup --email myemail@provider.com [--url <host>]
  196. # """
  197. # print (m)
  198. elif 'upgrade' in SYS_ARGS :
  199. #
  200. # perform an upgrade i.e some code or new parsers information will be provided
  201. #
  202. pass
  203. elif 'parse' in SYS_ARGS and info:
  204. """
  205. In this section of the code we are expecting the user to provide :
  206. :folder location of the files to process or file to process
  207. :
  208. """
  209. files = []
  210. if 'file' in SYS_ARGS :
  211. files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else []
  212. if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
  213. names = os.listdir(SYS_ARGS['folder'])
  214. files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
  215. else:
  216. #
  217. # raise an erro
  218. pass
  219. #
  220. # if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't
  221. #
  222. if 'resume' in SYS_ARGS :
  223. files = resume(files,SYS_ARGS['parse'],info)
  224. print (["Found ",len(files)," files unprocessed"])
  225. #
  226. # @TODO: Log this here so we know what is being processed or not
  227. SCOPE = None
  228. if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
  229. # logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
  230. # if info['store']['type'] == 'disk.DiskWriter' :
  231. # info['store']['args']['path'] += (os.sep + 'healthcare-io.json')
  232. # elif info['store']['type'] == 'disk.SQLiteWriter' :
  233. # # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3')
  234. # pass
  235. # if info['store']['type'] == 'disk.SQLiteWriter' :
  236. # info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower()
  237. # _info = json.loads(json.dumps(info['store']))
  238. # _info['args']['table']='logs'
  239. # else:
  240. # #
  241. # # if we are working with no-sql we will put the logs in it (performance )?
  242. # info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower()
  243. # _info = json.loads(json.dumps(info['store']))
  244. # _info['args']['doc'] = 'logs'
  245. # logger = factory.instance(**_info)
  246. # writer = factory.instance(**info['store'])
  247. #
  248. # we need to have batches ready for this in order to run some of these queries in parallel
  249. # @TODO: Make sure it is with a persistence storage (not disk .. not thread/process safe yet)
  250. # - Make sure we can leverage this on n-cores later on, for now the assumption is a single core
  251. #
  252. BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
  253. files = np.array_split(files,BATCH_COUNT)
  254. procs = []
  255. index = 0
  256. for row in files :
  257. row = row.tolist()
  258. # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
  259. # proc = Process(target=apply,args=(row,info['store'],_info,))
  260. parser = x12.Parser(os.sep.join([PATH,'config.json']))
  261. parser.set.files(row)
  262. parser.start()
  263. procs.append(parser)
  264. # index = index + 1
  265. while len(procs) > 0 :
  266. procs = [proc for proc in procs if proc.is_alive()]
  267. time.sleep(2)
  268. # for filename in files :
  269. # if filename.strip() == '':
  270. # continue
  271. # # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
  272. # #
  273. # try:
  274. # content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
  275. # if content :
  276. # writer.write(content)
  277. # if logs :
  278. # [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
  279. # else:
  280. # logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
  281. # except Exception as e:
  282. # logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
  283. # # print ([filename,len(content)])
  284. # #
  285. # # @TODO: forward this data to the writer and log engine
  286. # #
  287. pass
  288. elif 'analytics' in SYS_ARGS :
  289. PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
  290. DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
  291. SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
  292. #
  293. #
  294. # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  295. if os.path.exists(os.sep.join([PATH,'config.json'])) :
  296. e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible
  297. e.apply(type='claims',serialize=True)
  298. SYS_ARGS['engine'] = e
  299. SYS_ARGS['config'] = json.loads(open(os.sep.join([PATH,'config.json'])).read())
  300. else:
  301. SYS_ARGS['config'] = {"owner":None,"store":None}
  302. if 'args' not in SYS_ARGS['config'] :
  303. SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
  304. me = pd.DataFrame(smart.top.read(name='healthcare-io.py')).args.unique().tolist()
  305. SYS_ARGS['me'] = me[0] #-- This key will identify the current process
  306. pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
  307. pthread = Process(target=pointer,args=())
  308. pthread.start()
  309. elif 'export' in SYS_ARGS:
  310. #
  311. # this function is designed to export the data to csv
  312. #
  313. format = SYS_ARGS['format'] if 'format' in SYS_ARGS else 'csv'
  314. format = format.lower()
  315. if set([format]) not in ['xls','csv'] :
  316. format = 'csv'
  317. else:
  318. msg = """
  319. cli:
  320. healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
  321. healthcare-io.py --parse claims --folder <path> [--batch <value>]
  322. healthcare-io.py --parse remits --folder <path> [--batch <value>] [--resume]
  323. parameters :
  324. --<[signup|init]> signup or get a configuration file from a parsing server
  325. --store data store mongo or sqlite or mongodb
  326. --resume will attempt to resume if there was an interruption
  327. """
  328. print(msg)
  329. pass
  330. # """
  331. # The program was called from the command line thus we are expecting
  332. # parse in [claims,remits]
  333. # config os.sep.path.exists(path)
  334. # folder os.sep.path.exists(path)
  335. # store store ()
  336. # """
  337. # p = len( set(['store','config','folder']) & set(SYS_ARGS.keys())) == 3 and ('db' in SYS_ARGS or 'path' in SYS_ARGS)
  338. # TYPE = {
  339. # 'mongo':'mongo.MongoWriter',
  340. # 'couch':'couch.CouchWriter',
  341. # 'disk':'disk.DiskWriter'
  342. # }
  343. # INFO = {
  344. # '837':{'scope':'claims','section':'HL'},
  345. # '835':{'scope':'remits','section':'CLP'}
  346. # }
  347. # if p :
  348. # args = {}
  349. # scope = SYS_ARGS['config'][:-5].split(os.sep)[-1]
  350. # CONTEXT = INFO[scope]['scope']
  351. # #
  352. # # @NOTE:
  353. # # improve how database and data stores are handled.
  354. # if SYS_ARGS['store'] == 'couch' :
  355. # args = {'url': SYS_ARGS['url'] if 'url' in SYS_ARGS else 'http://localhost:5984'}
  356. # args['dbname'] = SYS_ARGS['db']
  357. # elif SYS_ARGS ['store'] == 'mongo':
  358. # args = {'host':SYS_ARGS['host']if 'host' in SYS_ARGS else 'localhost:27017'}
  359. # if SYS_ARGS['store'] in ['mongo','couch']:
  360. # args['dbname'] = SYS_ARGS['db'] if 'db' in SYS_ARGS else 'claims_outcomes'
  361. # args['doc'] = CONTEXT
  362. # TYPE = TYPE[SYS_ARGS['store']]
  363. # writer = factory.instance(type=TYPE,args=args)
  364. # if SYS_ARGS['store'] == 'disk':
  365. # writer.init(path = 'output-claims.json')
  366. # logger = factory.instance(type=TYPE,args= dict(args,**{"doc":"logs"}))
  367. # files = os.listdir(SYS_ARGS['folder'])
  368. # CONFIG = json.loads(open(SYS_ARGS['config']).read())
  369. # SECTION = INFO[scope]['section']
  370. # for file in files :
  371. # if 'limit' in SYS_ARGS and files.index(file) == int(SYS_ARGS['limit']) :
  372. # break
  373. # else:
  374. # filename = os.sep.join([SYS_ARGS['folder'],file])
  375. # try:
  376. # content,logs = get_content(filename,CONFIG,SECTION)
  377. # except Exception as e:
  378. # if sys.version_info[0] > 2 :
  379. # logs = [{"filename":filename,"msg":e.args[0]}]
  380. # else:
  381. # logs = [{"filename":filename,"msg":e.message}]
  382. # content = None
  383. # if content :
  384. # writer.write(content)
  385. # if logs:
  386. # logger.write(logs)
  387. # pass
  388. # else:
  389. # print (__doc__)