healthcare-io.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. #!/usr/bin/env python3
  2. """
  3. (c) 2019 Claims Toolkit,
  4. Health Information Privacy Lab, Vanderbilt University Medical Center
  5. Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
  6. Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
  7. This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
  8. The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
  9. Usage :
  10. Commandline :
  11. python edi-parser --scope --config <path> --folder <path> --store <[mongo|disk|couch]> --<db|path]> <id|path>
  12. with :
  13. --scope <claims|remits>
  14. --config path of the x12 to be parsed i.e it could be 835, or 837
  15. --folder location of the files (they must be decompressed)
  16. --store data store could be disk, mongodb, couchdb
  17. --db|path name of the folder to store the output or the database name
  18. Embedded in Code :
  19. import edi.parser
  20. import json
  21. file = '/data/claim_1.x12'
  22. conf = json.loads(open('config/837.json').read())
  23. edi.parser.get_content(filename,conf)
  24. """
  25. from healthcareio.params import SYS_ARGS
  26. from transport import factory
  27. import requests
  28. from healthcareio import analytics
  29. from healthcareio import server
  30. from healthcareio.parser import get_content
  31. import os
  32. import json
  33. import sys
  34. import numpy as np
  35. from multiprocessing import Process
  36. import time
  37. from healthcareio import x12
  38. from healthcareio.export import export
  39. import smart
  40. from healthcareio.server import proxy
  41. import pandas as pd
  42. PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
  43. OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
  44. INFO = None
  45. URL = "https://healthcareio.the-phi.com"
  46. if not os.path.exists(PATH) :
  47. os.mkdir(PATH)
  48. import platform
  49. import sqlite3 as lite
  50. # PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
  51. HELP_MESSAGE = """
  52. cli:
  53. healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
  54. healthcare-io.py --parse --folder <path> [--batch <value>] [--resume]
  55. healthcare-io.py --check-update
  56. healthcare-io.py --export <835|837> --config <config-path>
  57. action :
  58. --signup|init signup user and get configuration file
  59. --parse starts parsing
  60. --check checks for updates
  61. --export export data of a 835 or 837 into another database
  62. parameters :
  63. --<[signup|init]> signup or get a configuration file from a parsing server
  64. --folder location of the files (the program will recursively traverse it)
  65. --store data store mongo or sqlite or mongodb
  66. --resume will attempt to resume if there was an interruption
  67. """
  68. def signup (**args) :
  69. """
  70. :email user's email address
  71. :url url of the provider to signup
  72. """
  73. email = args['email']
  74. url = args['url'] if 'url' in args else URL
  75. folders = [PATH,OUTPUT_FOLDER]
  76. for path in folders :
  77. if not os.path.exists(path) :
  78. os.mkdir(path)
  79. #
  80. #
  81. store = args['store'] if 'store' in args else 'sqlite'
  82. headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']}
  83. http = requests.session()
  84. r = http.post(url,headers=headers)
  85. #
  86. # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
  87. # if 'store' in args :
  88. # store = args['store']
  89. filename = (os.sep.join([PATH,'config.json']))
  90. info = r.json() #{"parser":r.json(),"store":store}
  91. info = dict({"owner":email},**info)
  92. info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
  93. info['out-folder'] = OUTPUT_FOLDER
  94. file = open( filename,'w')
  95. file.write( json.dumps(info))
  96. file.close()
  97. #
  98. # Create the sqlite3 database to
  99. def log(**args):
  100. """
  101. This function will perform a log of anything provided to it
  102. """
  103. pass
  104. def init():
  105. """
  106. read all the configuration from the
  107. """
  108. filename = os.sep.join([PATH,'config.json'])
  109. info = None
  110. if os.path.exists(filename):
  111. file = open(filename)
  112. info = json.loads(file.read())
  113. if not os.path.exists(info['out-folder']) :
  114. os.mkdir(info['out-folder'])
  115. if info['store']['type'] == 'disk.SQLiteWriter' and not os.path.exists(info['store']['args']['path']) :
  116. conn = lite.connect(info['store']['args']['path'],isolation_level=None)
  117. for key in info['schema'] :
  118. _sql = info['schema'][key]['create']
  119. # r = conn.execute("select * from sqlite_master where name in ('claims','remits')")
  120. conn.execute(_sql)
  121. conn.commit()
  122. conn.close()
  123. return info
  124. #
  125. # Global variables that load the configuration files
  126. # def parse(**args):
  127. # """
  128. # This function will parse the content of a claim or remittance (x12 format) give the following parameters
  129. # :filename absolute path of the file to be parsed
  130. # :type claims|remits in x12 format
  131. # """
  132. # global INFO
  133. # if not INFO :
  134. # INFO = init()
  135. # if args['type'] == 'claims' :
  136. # CONFIG = INFO['parser']['837']
  137. # elif args['type'] == 'remits' :
  138. # CONFIG = INFO['parser']['835']
  139. # else:
  140. # CONFIG = None
  141. # if CONFIG :
  142. # # CONFIG = CONFIG[-1] if 'version' not in args and (args['version'] < len(CONFIG)) else CONFIG[0]
  143. # CONFIG = CONFIG[int(args['version'])-1] if 'version' in SYS_ARGS and int(SYS_ARGS['version']) < len(CONFIG) else CONFIG[-1]
  144. # SECTION = CONFIG['SECTION']
  145. # os.environ['HEALTHCAREIO_SALT'] = INFO['owner']
  146. # return get_content(args['filename'],CONFIG,SECTION)
  147. # def resume (files,id,config):
  148. # _args = config['store'].copy()
  149. # if 'mongo' in config['store']['type'] :
  150. # _args['type'] = 'mongo.MongoReader'
  151. # reader = factory.instance(**_args)
  152. # _files = []
  153. # if 'resume' in config['analytics'] :
  154. # _args = config['analytics']['resume'][id]
  155. # _files = reader.read(**_args)
  156. # _files = [item['name'] for item in _files if item['name'] != None]
  157. # return list(set(files) - set(_files))
  158. # return files
  159. # pass
  160. # def apply(files,store_info,logger_info=None):
  161. # """
  162. # :files list of files to be processed in this given thread/process
  163. # :store_info information about data-store, for now disk isn't thread safe
  164. # :logger_info information about where to store the logs
  165. # """
  166. # if not logger_info :
  167. # logger = factory.instance(type='disk.DiskWriter',args={'path':os.sep.join([info['out-folder'],SYS_ARGS['parse']+'.log'])})
  168. # else:
  169. # logger = factory.instance(**logger_info)
  170. # writer = factory.instance(**store_info)
  171. # for filename in files :
  172. # if filename.strip() == '':
  173. # continue
  174. # # content,logs = get_content(filename,CONFIG,CONFIG['SECTION'])
  175. # #
  176. # try:
  177. # content,logs = parse(filename = filename,type=SYS_ARGS['parse'])
  178. # if content :
  179. # writer.write(content)
  180. # if logs :
  181. # [logger.write(dict(_row,**{"parse":SYS_ARGS['parse']})) for _row in logs]
  182. # else:
  183. # logger.write({"parse":SYS_ARGS['parse'],"name":filename,"completed":True,"rows":len(content)})
  184. # except Exception as e:
  185. # logger.write({"parse":SYS_ARGS['parse'],"filename":filename,"completed":False,"rows":-1,"msg":e.args[0]})
  186. # # print ([filename,len(content)])
  187. # #
  188. # # @TODO: forward this data to the writer and log engine
  189. #
  190. def upgrade(**args):
  191. """
  192. :email provide us with who you are
  193. :key upgrade key provided by the server for a given email
  194. """
  195. url = args['url'] if 'url' in args else URL+"/upgrade"
  196. headers = {"key":args['key'],"email":args["email"],"url":url}
  197. def check(**_args):
  198. """
  199. This function will check if there is an update available (versions are in the configuration file)
  200. :param url
  201. """
  202. url = _args['url'][:-1] if _args['url'].endswith('/') else _args['url']
  203. url = url + "/version"
  204. if 'version' not in _args :
  205. version = {"_id":"version","current":0.0}
  206. else:
  207. version = _args['version']
  208. http = requests.session()
  209. r = http.get(url)
  210. return r.json()
  211. if __name__ == '__main__' :
  212. info = init()
  213. if 'out-folder' in SYS_ARGS :
  214. OUTPUT_FOLDER = SYS_ARGS['out-folder']
  215. SYS_ARGS['url'] = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
  216. if set(list(SYS_ARGS.keys())) & set(['signup','init']):
  217. #
  218. # This command will essentially get a new copy of the configurations
  219. # @TODO: Tie the request to a version ?
  220. #
  221. email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
  222. url = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
  223. store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
  224. db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
  225. signup(email=email,url=url,store=store,db=db)
  226. # else:
  227. # m = """
  228. # usage:
  229. # healthcareio --signup --email myemail@provider.com [--url <host>]
  230. # """
  231. # print (m)
  232. elif 'upgrade' in SYS_ARGS :
  233. #
  234. # perform an upgrade i.e some code or new parsers information will be provided
  235. #
  236. pass
  237. elif 'parse' in SYS_ARGS and info:
  238. """
  239. In this section of the code we are expecting the user to provide :
  240. :folder location of the files to process or file to process
  241. :
  242. """
  243. files = []
  244. if 'file' in SYS_ARGS :
  245. files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else []
  246. if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
  247. for root,_dir,f in os.walk(SYS_ARGS['folder']) :
  248. if f :
  249. files += [os.sep.join([root,name]) for name in f]
  250. # names = os.listdir(SYS_ARGS['folder'])
  251. # files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
  252. else:
  253. #
  254. # raise an error
  255. pass
  256. #
  257. # if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't
  258. #
  259. if 'resume' in SYS_ARGS :
  260. store_config = json.loads( (open(os.sep.join([PATH,'config.json']))).read() )
  261. files = proxy.get.resume(files,store_config )
  262. # print (["Found ",len(files)," files unprocessed"])
  263. #
  264. # @TODO: Log this here so we know what is being processed or not
  265. SCOPE = None
  266. if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
  267. BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
  268. files = np.array_split(files,BATCH_COUNT)
  269. procs = []
  270. index = 0
  271. for row in files :
  272. row = row.tolist()
  273. # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
  274. # proc = Process(target=apply,args=(row,info['store'],_info,))
  275. parser = x12.Parser(os.sep.join([PATH,'config.json']))
  276. parser.set.files(row)
  277. parser.start()
  278. procs.append(parser)
  279. # index = index + 1
  280. while len(procs) > 0 :
  281. procs = [proc for proc in procs if proc.is_alive()]
  282. time.sleep(2)
  283. pass
  284. elif 'analytics' in SYS_ARGS :
  285. PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
  286. DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
  287. SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
  288. #
  289. #
  290. # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  291. if os.path.exists(os.sep.join([PATH,'config.json'])) :
  292. e = analytics.engine(os.sep.join([PATH,'config.json'])) #--@TODO: make the configuration file globally accessible
  293. e.apply(type='claims',serialize=True)
  294. SYS_ARGS['engine'] = e
  295. SYS_ARGS['config'] = json.loads(open(os.sep.join([PATH,'config.json'])).read())
  296. else:
  297. SYS_ARGS['config'] = {"owner":None,"store":None}
  298. if 'args' not in SYS_ARGS['config'] :
  299. SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
  300. me = pd.DataFrame(smart.top.read(name='healthcare-io.py')).args.unique().tolist()
  301. SYS_ARGS['me'] = me[0] #-- This key will identify the current process
  302. pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
  303. pthread = Process(target=pointer,args=())
  304. pthread.start()
  305. elif 'check-update' in SYS_ARGS :
  306. _args = {"url":SYS_ARGS['url']}
  307. try:
  308. if os.path.exists(os.sep.join([PATH,'config.json'])) :
  309. SYS_ARGS['config'] = json.loads((open(os.sep.join([PATH,'config.json']))).read())
  310. else:
  311. SYS_ARGS['config'] = {}
  312. if 'version' in SYS_ARGS['config'] :
  313. _args['version'] = SYS_ARGS['config']['version']
  314. version = check(**_args)
  315. _version = {"current":0.0}if 'version' not in SYS_ARGS['config'] else SYS_ARGS['config']['version']
  316. if _version['current'] != version['current'] :
  317. print ()
  318. print ("You need to upgrade your system to version to ",version['current'])
  319. print ("\t- signup (for new configuration)")
  320. print ("\t- use pip to upgrade the codebase")
  321. else:
  322. print ()
  323. print ("You are running the current configuraiton version ",_version['current'])
  324. except Exception as e:
  325. print (e)
  326. pass
  327. elif 'export' in SYS_ARGS:
  328. #
  329. # this function is designed to export the data to csv
  330. #
  331. path = SYS_ARGS['config']
  332. TYPE = SYS_ARGS['export'] if 'export' in SYS_ARGS else '835'
  333. if not os.path.exists(path) or TYPE not in ['835','837']:
  334. print (HELP_MESSAGE)
  335. else:
  336. #
  337. # Let's run the export function ..., This will push files into a data-store of choice Redshift, PostgreSQL, MySQL ...
  338. #
  339. _store = {"type":"sql.SQLWriter","args":json.loads( (open(path) ).read())}
  340. pipes = export.Factory.instance(type=TYPE,write_store=_store) #"inspect":0,"cast":0}})
  341. # pipes[0].run()
  342. for thread in pipes:
  343. thread.start()
  344. time.sleep(1)
  345. while pipes :
  346. pipes = [thread for thread in pipes if thread.is_alive()]
  347. time.sleep(1)
  348. else:
  349. print(HELP_MESSAGE)