healthcare-io.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. #!/usr/bin/env python3
  2. """
  3. (c) 2019 Claims Toolkit,
  4. Health Information Privacy Lab, Vanderbilt University Medical Center
  5. Steve L. Nyemba <steve.l.nyemba@vanderbilt.edu>
  6. Khanhly Nguyen <khanhly.t.nguyen@gmail.com>
  7. This code is intended to process and parse healthcare x12 837 (claims) and x12 835 (remittances) into human readable JSON format.
  8. The claims/outpout can be forwarded to a NoSQL Data store like couchdb and mongodb
  9. Usage :
  10. Commandline :
  11. python edi-parser --scope --config <path> --folder <path> --store <[mongo|disk|couch]> --<db|path]> <id|path>
  12. with :
  13. --scope <claims|remits>
  14. --config path of the x12 to be parsed i.e it could be 835, or 837
  15. --folder location of the files (they must be decompressed)
  16. --store data store could be disk, mongodb, couchdb
  17. --db|path name of the folder to store the output or the database name
  18. Embedded in Code :
  19. import edi.parser
  20. import json
  21. file = '/data/claim_1.x12'
  22. conf = json.loads(open('config/837.json').read())
  23. edi.parser.get_content(filename,conf)
  24. """
  25. from healthcareio.params import SYS_ARGS
  26. from transport import factory
  27. import requests
  28. from healthcareio import analytics
  29. from healthcareio import server
  30. from healthcareio.parser import get_content
  31. import os
  32. import json
  33. import sys
  34. import numpy as np
  35. from multiprocessing import Process
  36. import time
  37. from healthcareio import x12
  38. from healthcareio.export import export
  39. import smart
  40. import transport
  41. from healthcareio.server import proxy
  42. import pandas as pd
  43. PATH = os.sep.join([os.environ['HOME'],'.healthcareio'])
  44. OUTPUT_FOLDER = os.sep.join([os.environ['HOME'],'healthcare-io'])
  45. INFO = None
  46. URL = "https://healthcareio.the-phi.com"
  47. if not os.path.exists(PATH) :
  48. os.mkdir(PATH)
  49. import platform
  50. import sqlite3 as lite
  51. # PATH = os.sep.join([os.environ['HOME'],'.edi-parser'])
  52. CONFIG_FILE = os.sep.join([PATH,'config.json']) if 'config' not in SYS_ARGS else SYS_ARGS['config']
  53. HELP_MESSAGE = """
  54. cli:
  55. #
  56. # Signup, allows parsing configuration to be downloaded
  57. #
  58. # Support for SQLite3
  59. healthcare-io.py --signup steve@the-phi.com --store sqlite
  60. #or support for mongodb
  61. healthcare-io.py --signup steve@the-phi.com --store mongo
  62. healthcare-io.py --<[signup|init]> <email> --store <sqlite|mongo> [--batch <value>]
  63. healthcare-io.py --parse --folder <path> [--batch <value>] [--resume]
  64. healthcare-io.py --check-update
  65. healthcare-io.py --export <835|837> --config <config-path>
  66. action :
  67. --signup|init signup user and get configuration file
  68. --parse starts parsing
  69. --check-update checks for updates
  70. --export export data of a 835 or 837 into another database
  71. parameters :
  72. --<[signup|init]> signup or get a configuration file from a parsing server
  73. --folder location of the files (the program will recursively traverse it)
  74. --store data store mongo or sqlite or mongodb
  75. --resume will attempt to resume if there was an interruption
  76. """
  77. def signup (**args) :
  78. """
  79. :email user's email address
  80. :url url of the provider to signup
  81. """
  82. email = args['email']
  83. url = args['url'] if 'url' in args else URL
  84. folders = [PATH,OUTPUT_FOLDER]
  85. for path in folders :
  86. if not os.path.exists(path) :
  87. os.mkdir(path)
  88. #
  89. #
  90. store = args['store'] if 'store' in args else 'sqlite'
  91. headers = {"email":email,"client":platform.node(),"store":store,"db":args['db']}
  92. http = requests.session()
  93. r = http.post(url,headers=headers)
  94. #
  95. # store = {"type":"disk.DiskWriter","args":{"path":OUTPUT_FOLDER}}
  96. # if 'store' in args :
  97. # store = args['store']
  98. # filename = (os.sep.join([PATH,'config.json']))
  99. filename = CONFIG_FILE
  100. info = r.json() #{"parser":r.json(),"store":store}
  101. info = dict({"owner":email},**info)
  102. info['store']['args']['path'] =os.sep.join([OUTPUT_FOLDER,'healthcare-io.db3']) #-- sql
  103. info['out-folder'] = OUTPUT_FOLDER
  104. file = open( filename,'w')
  105. file.write( json.dumps(info))
  106. file.close()
  107. _m = """
  108. Thank you for signingup!!
  109. Your configuration file is store in :path,
  110. - More information visit https://healthcareio.the-phi.com/parser
  111. - Access the source https://healthcareio.the-phi.com/git/code/parser
  112. """.replace(":path",CONFIG_FILE)
  113. print (_m)
  114. #
  115. # Create the sqlite3 database to
  116. def log(**args):
  117. """
  118. This function will perform a log of anything provided to it
  119. """
  120. pass
  121. def init():
  122. """
  123. read all the configuration from disk.
  124. Requirements for configuration file :
  125. {out-folder,store,837,835 }
  126. """
  127. # filename = os.sep.join([PATH,'config.json'])
  128. filename = CONFIG_FILE
  129. info = None
  130. if os.path.exists(filename):
  131. #
  132. # Loading the configuration file (JSON format)
  133. file = open(filename)
  134. info = json.loads(file.read())
  135. if 'output-folder' not in info and not os.path.exists(OUTPUT_FOLDER) :
  136. os.mkdir(OUTPUT_FOLDER)
  137. elif 'output-folder' in info and not os.path.exists(info['out-folder']) :
  138. os.mkdir(info['out-folder'])
  139. # if 'type' in info['store'] :
  140. lwriter = None
  141. is_sqlite = False
  142. if'type' in info['store'] and info['store']['type'] == 'disk.SQLiteWriter' :
  143. lwriter = transport.factory.instance(**info['store'])
  144. is_sqlite = True
  145. elif 'provider' in info['store'] and info['store']['provider'] == 'sqlite' :
  146. lwriter = transport.instance(**info['store']) ;
  147. is_sqlite = True
  148. if lwriter and is_sqlite:
  149. for key in info['schema'] :
  150. if key != 'logs' :
  151. _id = 'claims' if key == '837' else 'remits'
  152. else:
  153. _id = key
  154. if not lwriter.has(table=_id) :
  155. lwriter.apply(info['schema'][key]['create'])
  156. # [lwriter.apply( info['schema'][key]['create']) for key in info['schema'] if not lwriter.has(table=key)]
  157. lwriter.close()
  158. return info
  159. def upgrade(**args):
  160. """
  161. :email provide us with who you are
  162. :key upgrade key provided by the server for a given email
  163. """
  164. url = args['url'] if 'url' in args else URL+"/upgrade"
  165. headers = {"key":args['key'],"email":args["email"],"url":url}
  166. def check(**_args):
  167. """
  168. This function will check if there is an update available (versions are in the configuration file)
  169. :param url
  170. """
  171. url = _args['url'][:-1] if _args['url'].endswith('/') else _args['url']
  172. url = url + "/version"
  173. if 'version' not in _args :
  174. version = {"_id":"version","current":0.0}
  175. else:
  176. version = _args['version']
  177. http = requests.session()
  178. r = http.get(url)
  179. return r.json()
  180. if __name__ == '__main__' :
  181. info = init()
  182. if 'out-folder' in SYS_ARGS :
  183. OUTPUT_FOLDER = SYS_ARGS['out-folder']
  184. SYS_ARGS['url'] = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
  185. if set(list(SYS_ARGS.keys())) & set(['signup','init']):
  186. #
  187. # This command will essentially get a new copy of the configurations
  188. # @TODO: Tie the request to a version ?
  189. #
  190. email = SYS_ARGS['signup'].strip() if 'signup' in SYS_ARGS else SYS_ARGS['init']
  191. url = SYS_ARGS['url'] if 'url' in SYS_ARGS else URL
  192. store = SYS_ARGS['store'] if 'store' in SYS_ARGS else 'sqlite'
  193. db='healthcareio' if 'db' not in SYS_ARGS else SYS_ARGS['db']
  194. signup(email=email,url=url,store=store,db=db)
  195. # else:
  196. # m = """
  197. # usage:
  198. # healthcareio --signup --email myemail@provider.com [--url <host>]
  199. # """
  200. # print (m)
  201. elif 'upgrade' in SYS_ARGS :
  202. #
  203. # perform an upgrade i.e some code or new parsers information will be provided
  204. #
  205. pass
  206. elif 'parse' in SYS_ARGS and info:
  207. """
  208. In this section of the code we are expecting the user to provide :
  209. :folder location of the files to process or file to process
  210. :
  211. """
  212. files = []
  213. if 'file' in SYS_ARGS :
  214. files = [SYS_ARGS['file']] if not os.path.isdir(SYS_ARGS['file']) else []
  215. if 'folder' in SYS_ARGS and os.path.exists(SYS_ARGS['folder']):
  216. for root,_dir,f in os.walk(SYS_ARGS['folder']) :
  217. if f :
  218. files += [os.sep.join([root,name]) for name in f]
  219. # names = os.listdir(SYS_ARGS['folder'])
  220. # files += [os.sep.join([SYS_ARGS['folder'],name]) for name in names if not os.path.isdir(os.sep.join([SYS_ARGS['folder'],name]))]
  221. else:
  222. #
  223. # raise an error
  224. pass
  225. #
  226. # if the user has specified to resume, we should look into the logs and pull the files processed and those that haven't
  227. #
  228. if 'resume' in SYS_ARGS :
  229. store_config = json.loads( (open(CONFIG_FILE)).read() )
  230. files = proxy.get.resume(files,store_config )
  231. # print (["Found ",len(files)," files unprocessed"])
  232. #
  233. # @TODO: Log this here so we know what is being processed or not
  234. SCOPE = None
  235. if files : #and ('claims' in SYS_ARGS['parse'] or 'remits' in SYS_ARGS['parse']):
  236. BATCH_COUNT = 1 if 'batch' not in SYS_ARGS else int (SYS_ARGS['batch'])
  237. files = np.array_split(files,BATCH_COUNT)
  238. procs = []
  239. index = 0
  240. for row in files :
  241. row = row.tolist()
  242. # logger.write({"process":index,"parse":SYS_ARGS['parse'],"file_count":len(row)})
  243. # proc = Process(target=apply,args=(row,info['store'],_info,))
  244. # parser = x12.Parser(os.sep.join([PATH,'config.json']))
  245. parser = x12.Parser(CONFIG_FILE)
  246. parser.set.files(row)
  247. parser.start()
  248. procs.append(parser)
  249. # index = index + 1
  250. while len(procs) > 0 :
  251. procs = [proc for proc in procs if proc.is_alive()]
  252. time.sleep(2)
  253. uri = OUTPUT_FOLDER
  254. store_config = json.loads( (open(CONFIG_FILE)).read() )['store']
  255. if 'type' in store_config :
  256. uri = store_config['args']['host'] if 'host' in store_config['args'] else ( store_config['args']['path'] if 'path' in store_config['args'] else store_config['args']['database'])
  257. if 'SQLite' in store_config['type']:
  258. provider = 'sqlite'
  259. elif 'sql' in store_config['type'] :
  260. provider = 'SQL'
  261. else:
  262. provider = 'mongo'
  263. else:
  264. provider = store_config['provider']
  265. _msg = """
  266. Completed Parsing, The data is available in :provider database at :uri
  267. Logs are equally available for errors and summary statistics to be compiled
  268. """.replace(":provider",provider).replace(":uri",uri)
  269. print (_msg)
  270. pass
  271. elif 'analytics' in SYS_ARGS :
  272. PORT = int(SYS_ARGS['port']) if 'port' in SYS_ARGS else 5500
  273. DEBUG= int(SYS_ARGS['debug']) if 'debug' in SYS_ARGS else 0
  274. SYS_ARGS['context'] = SYS_ARGS['context'] if 'context' in SYS_ARGS else ''
  275. #
  276. #
  277. # PATH= SYS_ARGS['config'] if 'config' in SYS_ARGS else os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  278. if os.path.exists(CONFIG_FILE) :
  279. e = analytics.engine(CONFIG_FILE) #--@TODO: make the configuration file globally accessible
  280. e.apply(type='claims',serialize=True)
  281. SYS_ARGS['engine'] = e
  282. SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read())
  283. else:
  284. SYS_ARGS['config'] = {"owner":None,"store":None}
  285. if 'args' not in SYS_ARGS['config'] :
  286. SYS_ARGS['config']["args"] = {"batch":1,"resume":True,"folder":"/data"}
  287. me = pd.DataFrame(smart.top.read(name='healthcare-io.py')).args.unique().tolist()
  288. SYS_ARGS['me'] = me[0] #-- This key will identify the current process
  289. pointer = lambda : server.app.run(host='0.0.0.0',port=PORT,debug=DEBUG,threaded=False)
  290. pthread = Process(target=pointer,args=())
  291. pthread.start()
  292. elif 'check-update' in SYS_ARGS :
  293. _args = {"url":SYS_ARGS['url']}
  294. try:
  295. if os.path.exists(CONFIG_FILE) :
  296. SYS_ARGS['config'] = json.loads(open(CONFIG_FILE ).read())
  297. else:
  298. SYS_ARGS['config'] = {}
  299. if 'version' in SYS_ARGS['config'] :
  300. _args['version'] = SYS_ARGS['config']['version']
  301. version = check(**_args)
  302. _version = {"current":0.0}if 'version' not in SYS_ARGS['config'] else SYS_ARGS['config']['version']
  303. if _version['current'] != version['current'] :
  304. print ()
  305. print ("You need to upgrade your system to version to ",version['current'])
  306. print ("\t- signup (for new configuration)")
  307. print ("\t- use pip to upgrade the codebase")
  308. else:
  309. print ()
  310. print ("You are running the current configuraiton version ",_version['current'])
  311. except Exception as e:
  312. print (e)
  313. pass
  314. elif 'export' in SYS_ARGS:
  315. #
  316. # this function is designed to export the data to csv
  317. #
  318. path = SYS_ARGS['export-config']
  319. X12_TYPE = SYS_ARGS['export'] if 'export' in SYS_ARGS else '835'
  320. if not os.path.exists(path) or X12_TYPE not in ['835','837']:
  321. print (HELP_MESSAGE)
  322. else:
  323. #
  324. # Let's run the export function ..., This will push files into a data-store of choice Redshift, PostgreSQL, MySQL ...
  325. #
  326. # _store = {"type":"sql.SQLWriter","args":json.loads( (open(path) ).read())}
  327. _store = json.loads( (open(path) ).read())
  328. pipes = export.Factory.instance(type=X12_TYPE,write_store=_store,config = CONFIG_FILE) #"inspect":0,"cast":0}})
  329. # pipes[0].run()
  330. # print (pipes)
  331. for thread in pipes:
  332. if 'table' in SYS_ARGS and SYS_ARGS['table'] != thread.table :
  333. continue
  334. thread.start()
  335. time.sleep(1)
  336. thread.join()
  337. else:
  338. print(HELP_MESSAGE)