healthcare-io.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. #!/usr/bin/env python3
  2. """
  3. (c) 2019 Claims Toolkit,
  4. Health Information Privacy Lab, Vanderbilt University Medical Center
  5. Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
  6. Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
  7. This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
  8. The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
  9. Usage :
  10. Commandline :
  11. python edi-parser --scope --config <path> --folder <path> --store <[mongo|disk|couch]> --<db|path]> <id|path>
  12. with :
  13. --scope <claims|remits>
  14. --config path of the x12 to be parsed i.e it could be 835, or 837
  15. --folder location of the files (they must be decompressed)
  16. --store data store could be disk, mongodb, couchdb
  17. --db|path name of the folder to store the output or the database name
  18. Embedded in Code :
  19. import edi.parser
  20. import json
  21. file = '/data/claim_1.x12'
  22. conf = json.loads(open('config/837.json').read())
  23. edi.parser.get_content(filename,conf)
  24. """
  25. from healthcareio.params import SYS_ARGS
  26. from transport import factory
  27. import requests
  28. from healthcareio import analytics
  29. from healthcareio import server
  30. from healthcareio.parser import get_content
  31. import os
  32. import json
  33. import sys
  34. import numpy as np
  35. from multiprocessing import Process
  36. import time
  37. PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
  38. OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
  39. INFO = None
  40. URL = "https://healthcareio.the-phi.com"
  41. if not os.path.exists(PATH) :
  42. os.mkdir(PATH)
  43. import platform
  44. import sqlite3 as lite
  45. # PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
  46. def register (**args) :
  47. """
  48. :email user's email address
  49. :url url of the provider to register
  50. """
  51. email = args['email']
  52. url = args['url'] if 'url' in args else URL
  53. folders = [PATH,OUTPUT_FOLDER]
  54. for path in folders :
  55. if not os.path.exists(path) :
  56. os.mkdir(path)
  57. #
  58. #
  59. store = args['store'] if 'store' in args else 'sqlite'
  60. headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']}
  61. http = requests.session()
  62. r = http.post(url,headers=headers)
  63. #
  64. # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
  65. # if 'store' in args :
  66. # store = args['store']
  67. filename = (os.sep.join([PATH,'config.json']))
  68. info = r.json() #{"parser":r.json(),"store":store}
  69. info = dict({"owner":email},**info)
  70. info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
  71. info['out-folder'] = OUTPUT_FOLDER
  72. file = open( filename,'w')
  73. file.write( json.dumps(info))
  74. file.close()
  75. #
  76. # Create the sqlite3 database to
  77. def log(**args):
  78. """
  79. This function will perform a log of anything provided to it
  80. """
  81. pass
  82. def init():
  83. """
  84. read all the configuration from the
  85. """
  86. filename = os.sep.join([PATH,'config.json'])
  87. info = None
  88. if os.path.exists(filename):
  89. file = open(filename)
  90. info = json.loads(file.read())
  91. if not os.path.exists(info['out-folder']) :
  92. os.mkdir(info['out-folder'])
  93. if info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) :
  94. conn = lite.connect(info['store']['args']['path'],isolation_level=None)
  95. for key in info['schema'] :
  96. _sql = info['schema'][key]['create']
  97. # r = conn.execute("select * from sqlite_master where name in ('claims','remits')")
  98. conn.execute(_sql)
  99. conn.commit()
  100. conn.close()
  101. return info
  102. #
  103. # Global variables that load the configuration files
  104. def parse(**args):
  105. """
  106. This function will parse the content of a claim or remittance (x12 format) give the following parameters
  107. :filename absolute path of the file to be parsed
  108. :type claims|remits in x12 format
  109. """
  110. global INFO
  111. if not INFO :
  112. INFO = init()
  113. if args['type'] == 'claims' :
  114. CONFIG = INFO['parser']['837']
  115. elif args['type'] == 'remits' :
  116. CONFIG = INFO['parser']['835']
  117. else:
  118. CONFIG = None
  119. if CONFIG :
  120. # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0]
  121. CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1]
  122. SECTION = CONFIG['SECTION']
  123. os.environ['HEALTHCAREIO_SALT'] = INFO['owner']
  124. return get_content(args['filename'],CONFIG,SECTION)
  125. def resume (files,id,config):
  126. _args = config['store'].copy()
  127. if 'mongo' in config['store']['type'] :
  128. _args['type'] = 'mongo.MongoReader'
  129. reader = factory.instance(**_args)
  130. _files = []
  131. if 'resume' in config['analytics'] :
  132. _args = config['analytics']['resume'][id]
  133. _files = reader.read(**_args)
  134. _files = [item['name'] for item in _files if item['name'] != None]
  135. return list(set(files) - set(_files))
  136. return files
  137. pass
  138. def apply(files,store_info,logger_info=None):
  139. """
  140. :files list of files to be processed in this given thread/process
  141. :store_info information about data-store, for now disk isn't thread safe
  142. :logger_info information about where to store the logs
  143. """
  144. if not logger_info :
  145. logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
  146. else:
  147. logger = factory.instance(**logger_info)
  148. writer = factory.instance(**store_info)
  149. for filename in files :
  150. if filename.strip() == '':
  151. continue
  152. # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
  153. #
  154. try:
  155. content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
  156. if content :
  157. writer.write(content)
  158. if logs :
  159. [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
  160. else:
  161. logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
  162. except Exception as e:
  163. logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
  164. # print ([filename,len(content)])
  165. #
  166. # @TODO: forward this data to the writer and log engine
  167. #
  168. def upgrade(**args):
  169. """
  170. :email provide us with who you are
  171. :key upgrade key provided by the server for a given email
  172. """
  173. url = args['url'] if 'url' in args else URL+"/upgrade"
  174. headers = {"key":args['key'],"email":args["email"],"url":url}
  175. if __name__ == '__main__' :
  176. info = init()
  177. if 'out-folder' in SYS_ARGS :
  178. OUTPUT_FOLDER = SYS_ARGS['out-folder']
  179. if set(list(SYS_ARGS.keys())) & set(['signup','init']):
  180. #
  181. # This command will essentially get a new copy of the configurations
  182. # @TODO: Tie the request to a version ?
  183. #
  184. email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
  185. url = SYS_ARGS['url'] if 'url' in SYS_ARGS else 'https://healthcareio.the-phi.com'
  186. store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
  187. db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
  188. register(email=email,url=url,store=store,db=db)
  189. # else:
  190. # m = """
  191. # usage:
  192. # healthcareio --signup --email myemail@provider.com [--url <host>]
  193. # """
  194. # print (m)
  195. elif 'upgrade' in SYS_ARGS :
  196. #
  197. # perform an upgrade i.e some code or new parsers information will be provided
  198. #
  199. pass
  200. elif 'parse' in SYS_ARGS and info:
  201. """
  202. In this section of the code we are expecting the user to provide :
  203. :folder location of the files to process or file to process
  204. :
  205. """
  206. files = []
  207. if 'file' in SYS_ARGS :
  208. files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else []
  209. if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
  210. names = os.listdir(SYS_ARGS['folder'])
  211. files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
  212. else:
  213. #
  214. # raise an erro
  215. pass
  216. #
  217. # if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't
  218. #
  219. if 'resume' in SYS_ARGS :
  220. files = resume(files,SYS_ARGS['parse'],info)
  221. print (["Found ",len(files)," files unprocessed"])
  222. #
  223. # @TODO: Log this here so we know what is being processed or not
  224. SCOPE = None
  225. if files and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
  226. # _map = {'claims':'837','remits':'835'}
  227. # key = _map[SYS_ARGS['parse']]
  228. # CONFIG = info['parser'][key]
  229. # if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) :
  230. # CONFIG = CONFIG[ int(SYS_ARGS['version'])]
  231. # else:
  232. # CONFIG = CONFIG[-1]
  233. logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
  234. if info['store']['type'] == 'disk.DiskWriter' :
  235. info['store']['args']['path'] += (os.sep + 'healthcare-io.json')
  236. elif info['store']['type'] == 'disk.SQLiteWriter' :
  237. # info['store']['args']['path'] += (os.sep + 'healthcare-io.db3')
  238. pass
  239. if info['store']['type'] == 'disk.SQLiteWriter' :
  240. info['store']['args']['table'] = SYS_ARGS['parse'].strip().lower()
  241. else:
  242. #
  243. # if we are working with no-sql we will put the logs in it (performance )?
  244. info['store']['args']['doc'] = SYS_ARGS['parse'].strip().lower()
  245. _info = json.loads(json.dumps(info['store']))
  246. _info['args']['doc'] = 'logs'
  247. logger = factory.instance(**_info)
  248. writer = factory.instance(**info['store'])
  249. #
  250. # we need to have batches ready for this in order to run some of these queries in parallel
  251. # @TODO: Make sure it is with a persistence storage (not disk .. not thread/process safe yet)
  252. # - Make sure we can leverage this on n-cores later on, for now the assumption is a single core
  253. #
  254. BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
  255. #logger = factory.instance(type='mongo.MongoWriter',args={'db':'healthcareio','doc':SYS_ARGS['parse']+'_logs'})
  256. # schema = info['schema']
  257. # for key in schema :
  258. # sql = schema[key]['create']
  259. # writer.write(sql)
  260. files = np.array_split(files,BATCH_COUNT)
  261. procs = []
  262. index = 0
  263. for row in files :
  264. row = row.tolist()
  265. logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
  266. proc = Process(target=apply,args=(row,info['store'],_info,))
  267. proc.start()
  268. procs.append(proc)
  269. index = index + 1
  270. while len(procs) > 0 :
  271. procs = [proc for proc in procs if proc.is_alive()]
  272. time.sleep(2)
  273. # for filename in files :
  274. # if filename.strip() == '':
  275. # continue
  276. # # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
  277. # #
  278. # try:
  279. # content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
  280. # if content :
  281. # writer.write(content)
  282. # if logs :
  283. # [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
  284. # else:
  285. # logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
  286. # except Exception as e:
  287. # logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
  288. # # print ([filename,len(content)])
  289. # #
  290. # # @TODO: forward this data to the writer and log engine
  291. # #
  292. pass
  293. elif 'analytics' in SYS_ARGS :
  294. PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
  295. DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
  296. SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
  297. #
  298. #
  299. # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  300. e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible
  301. e.apply(type='claims',serialize=True)
  302. SYS_ARGS['engine'] = e
  303. pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
  304. pthread = Process(target=pointer,args=())
  305. pthread.start()
  306. elif 'export' in SYS_ARGS:
  307. #
  308. # this function is designed to export the data to csv
  309. #
  310. format = SYS_ARGS['format'] if 'format' in SYS_ARGS else 'csv'
  311. format = format.lower()
  312. if set([format]) not in ['xls','csv'] :
  313. format = 'csv'
  314. else:
  315. msg = """
  316. cli:
  317. healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
  318. healthcare-io.py --parse claims --folder <path> [--batch <value>]
  319. healthcare-io.py --parse remits --folder <path> [--batch <value>] [--resume]
  320. parameters :
  321. --<[signup|init]> signup or get a configuration file from a parsing server
  322. --store data store mongo or sqlite or mongodb
  323. --resume will attempt to resume if there was an interruption
  324. """
  325. print(msg)
  326. pass
  327. # """
  328. # The program was called from the command line thus we are expecting
  329. # parse in [claims,remits]
  330. # config os.sep.path.exists(path)
  331. # folder os.sep.path.exists(path)
  332. # store store ()
  333. # """
  334. # p = len( set(['store','config','folder']) & set(SYS_ARGS.keys())) == 3 and ('db' in SYS_ARGS or 'path' in SYS_ARGS)
  335. # TYPE = {
  336. # 'mongo':'mongo.MongoWriter',
  337. # 'couch':'couch.CouchWriter',
  338. # 'disk':'disk.DiskWriter'
  339. # }
  340. # INFO = {
  341. # '837':{'scope':'claims','section':'HL'},
  342. # '835':{'scope':'remits','section':'CLP'}
  343. # }
  344. # if p :
  345. # args = {}
  346. # scope = SYS_ARGS['config'][:-5].split(os.sep)[-1]
  347. # CONTEXT = INFO[scope]['scope']
  348. # #
  349. # # @NOTE:
  350. # # improve how database and data stores are handled.
  351. # if SYS_ARGS['store'] == 'couch' :
  352. # args = {'url': SYS_ARGS['url'] if 'url' in SYS_ARGS else 'http://localhost:5984'}
  353. # args['dbname'] = SYS_ARGS['db']
  354. # elif SYS_ARGS ['store'] == 'mongo':
  355. # args = {'host':SYS_ARGS['host']if 'host' in SYS_ARGS else 'localhost:27017'}
  356. # if SYS_ARGS['store'] in ['mongo','couch']:
  357. # args['dbname'] = SYS_ARGS['db'] if 'db' in SYS_ARGS else 'claims_outcomes'
  358. # args['doc'] = CONTEXT
  359. # TYPE = TYPE[SYS_ARGS['store']]
  360. # writer = factory.instance(type=TYPE,args=args)
  361. # if SYS_ARGS['store'] == 'disk':
  362. # writer.init(path = 'output-claims.json')
  363. # logger = factory.instance(type=TYPE,args= dict(args,**{"doc":"logs"}))
  364. # files = os.listdir(SYS_ARGS['folder'])
  365. # CONFIG = json.loads(open(SYS_ARGS['config']).read())
  366. # SECTION = INFO[scope]['section']
  367. # for file in files :
  368. # if 'limit' in SYS_ARGS and files.index(file) == int(SYS_ARGS['limit']) :
  369. # break
  370. # else:
  371. # filename = os.sep.join([SYS_ARGS['folder'],file])
  372. # try:
  373. # content,logs = get_content(filename,CONFIG,SECTION)
  374. # except Exception as e:
  375. # if sys.version_info[0] > 2 :
  376. # logs = [{"filename":filename,"msg":e.args[0]}]
  377. # else:
  378. # logs = [{"filename":filename,"msg":e.message}]
  379. # content = None
  380. # if content :
  381. # writer.write(content)
  382. # if logs:
  383. # logger.write(logs)
  384. # pass
  385. # else:
  386. # print (__doc__)