__init__.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678
  1. """
  2. (c) 2019 Healthcare/IO 1.0
  3. Vanderbilt University Medical Center, Health Information Privacy Laboratory
  4. https://hiplab.mc.vanderbilt.edu/healthcareio
  5. Authors:
  6. Khanhly Nguyen,
  7. Steve L. Nyemba<steve.l.nyemba@vanderbilt.edu>
  8. License:
  9. MIT, terms are available at https://opensource.org/licenses/MIT
  10. This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration
  11. USAGE :
  12. - COMMAND LINE
  13. - EMBEDDED
  14. """
  15. import hashlib
  16. import json
  17. import os
  18. import sys
  19. from itertools import islice
  20. from multiprocessing import Process
  21. import transport
  22. import jsonmerge
  23. import copy
  24. class void :
  25. pass
  26. class Formatters :
  27. def __init__(self):
  28. # self.config = config
  29. self.get = void()
  30. self.get.config = self.get_config
  31. self.parse = void()
  32. self.parse.sv3 = self.sv3
  33. self.parse.sv2 = self.sv2
  34. self.sv2_parser = self.sv2
  35. self.sv3_parser = self.sv3
  36. self.sv3_parse = self.sv3
  37. self.format_proc = self.procedure
  38. self.format_diag = self.diagnosis
  39. self.parse.procedure = self.procedure
  40. self.parse.diagnosis = self.diagnosis
  41. self.parse.date = self.date
  42. self.format_date = self.date
  43. self.format_pos = self.pos
  44. self.format_time = self.time
  45. def split(self,row,sep='*',prefix='HI') :
  46. """
  47. This function is designed to split an x12 row and
  48. """
  49. value = []
  50. if row.startswith(prefix) is False:
  51. for row_value in row.replace('~','').split(sep) :
  52. if '>' in row_value and not row_value.startswith('HC'):
  53. # if row_value.startswith('HC') or row_value.startswith('AD'):
  54. if row_value.startswith('AD'):
  55. value += row_value.split('>')[:2]
  56. pass
  57. else:
  58. value += [row_value]
  59. # value += row_value.split('>') if row.startswith('CLM') is False else [row_value]
  60. else :
  61. value.append(row_value.replace('\n',''))
  62. value = [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep)
  63. else:
  64. value = [ [prefix]+ self.split(item,'>') for item in row.replace('~','').split(sep)[1:] ]
  65. return value if type(value) == list and type(value[0]) != list else value[0]
  66. def get_config(self,config,row):
  67. """
  68. This function will return the meaningfull parts of the configuration for a given item
  69. """
  70. _row = list(row) if type(row[0]) == str else list(row[0])
  71. _info = config[_row[0]] if _row[0] in config else {}
  72. _rinfo = {}
  73. key = None
  74. if '@ref' in _info:
  75. keys = list(set(_row) & set(_info['@ref'].keys()))
  76. if keys :
  77. _rinfo = {}
  78. for key in keys :
  79. _rinfo = jsonmerge.merge(_rinfo,_info['@ref'][key])
  80. return _rinfo
  81. # key = key[0]
  82. # return _info['@ref'][key]
  83. else:
  84. return {}
  85. if not _info and 'SIMILAR' in config:
  86. #
  87. # Let's look for the nearest key using the edit distance
  88. if _row[0] in config['SIMILAR'] :
  89. key = config['SIMILAR'][_row[0]]
  90. _info = config[key]
  91. return _info
  92. def hash(self,value):
  93. salt = os.environ['HEALTHCAREIO_SALT'] if 'HEALTHCAREIO_SALT' in os.environ else ''
  94. _value = str(value)+ salt
  95. if sys.version_info[0] > 2 :
  96. return hashlib.md5(_value.encode('utf-8')).hexdigest()
  97. else:
  98. return hashlib.md5(_value).hexdigest()
  99. def suppress (self,value):
  100. return 'N/A'
  101. def date(self,value):
  102. if len(value) > 8 or '-' in value:
  103. value = value.split('-')[0]
  104. if len(value) == 8 :
  105. year = value[:4]
  106. month = value[4:6]
  107. day = value[6:]
  108. return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day}
  109. elif len(value) == 6 :
  110. year = '20' + value[:2]
  111. month = value[2:4]
  112. day = value[4:]
  113. elif value.isnumeric() and len(value) >= 10:
  114. #
  115. # Here I a will assume we have a numeric vale
  116. year = value[:4]
  117. month= value[4:6]
  118. day = value[6:8]
  119. else:
  120. #
  121. # We have a date formatting issue
  122. return value
  123. return "-".join([year,month,day])
  124. def time(self,value):
  125. pass
  126. def sv3(self,value):
  127. if '>' in value [1]:
  128. terms = value[1].split('>')
  129. return {'type':terms[0],'code':terms[1],"amount":float(value[2])}
  130. else:
  131. return {"code":value[2],"type":value[1],"amount":float(value[3])}
  132. def sv2(self,value):
  133. #
  134. # @TODO: Sometimes there's a suffix (need to inventory all the variations)
  135. #
  136. if '>' in value or ':' in value:
  137. xchar = '>' if '>' in value else ':'
  138. _values = value.split(xchar)
  139. modifier = {}
  140. if len(_values) > 2 :
  141. modifier= {"code":_values[2]}
  142. if len(_values) > 3 :
  143. modifier['type'] = _values[3]
  144. _value = {"code":_values[1],"type":_values[0]}
  145. if modifier :
  146. _value['modifier'] = modifier
  147. return _value
  148. else:
  149. return value
  150. def procedure(self,value):
  151. for xchar in [':','<','|','>'] :
  152. if xchar in value and len(value.split(xchar)) > 1 :
  153. #_value = {"type":value.split(':')[0].strip(),"code":value.split(':')[1].strip()}
  154. _value = {"type":value.split(xchar)[0].strip(),"code":value.split(xchar)[1].strip()}
  155. if len(value.split(xchar)) >2 :
  156. index = 1;
  157. for modifier in value.split(xchar)[2:] :
  158. _value['modifier_'+str(index)] = modifier
  159. index += 1
  160. break
  161. else:
  162. _value = str(value)
  163. return _value
  164. def diagnosis(self,value):
  165. return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1]
  166. def pos(self,value):
  167. """
  168. formatting place of service information within a segment (REF)
  169. @TODO: In order to accomodate the other elements they need to be specified in the configuration
  170. Otherwise it causes problems on export
  171. """
  172. xchar = '>' if '>' in value else ':'
  173. x = value.split(xchar)
  174. x = {"place_of_service":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"place_of_service":x[0],"indicator":None,"frequency":None}
  175. return x
  176. class Parser (Process):
  177. def __init__(self,path):
  178. """
  179. :path path of the configuration file (it can be absolute)
  180. """
  181. Process.__init__(self)
  182. self.utils = Formatters()
  183. self.get = void()
  184. self.get.value = self.get_map
  185. self.get.default_value = self.get_default_value
  186. _config = json.loads(open(path).read())
  187. self._custom_config = self.get_custom(path)
  188. self.config = _config['parser']
  189. self.store = _config['store']
  190. self.cache = {}
  191. self.files = []
  192. self.set = void()
  193. self.set.files = self.set_files
  194. self.emit = void()
  195. self.emit.pre = None
  196. self.emit.post = None
  197. def get_custom(self,path) :
  198. """
  199. :path path of the configuration file (it can be absolute)
  200. """
  201. #
  202. #
  203. _path = path.replace('config.json','')
  204. if _path.endswith(os.sep) :
  205. _path = _path[:-1]
  206. _config = {}
  207. _path = os.sep.join([_path,'custom'])
  208. if os.path.exists(_path) :
  209. files = os.listdir(_path)
  210. if files :
  211. fullname = os.sep.join([_path,files[0]])
  212. _config = json.loads ( (open(fullname)).read() )
  213. return _config
  214. def set_files(self,files):
  215. self.files = files
  216. def get_map(self,row,config,version=None):
  217. # label = config['label'] if 'label' in config else None
  218. handler = Formatters()
  219. if 'map' not in config and hasattr(handler,config['apply']):
  220. pointer = getattr(handler,config['apply'])
  221. object_value = pointer(row)
  222. return object_value
  223. #
  224. # Pull the goto configuration that skips rows
  225. #
  226. omap = config['map'] if not version or version not in config else config[version]
  227. anchors = config['anchors'] if 'anchors' in config else []
  228. rewrite = config['rewrite'] if 'rewrite' in config else {}
  229. if type(row[0]) == str:
  230. object_value = {}
  231. for key in omap :
  232. index = omap[key]
  233. if anchors and set(anchors) & set(row):
  234. _key = list(set(anchors) & set(row))[0]
  235. aindex = row.index(_key)
  236. index = aindex + index
  237. if index < len(row) :
  238. value = row[index]
  239. if 'cast' in config and key in config['cast'] and value.strip() != '' :
  240. if config['cast'][key] in ['float','int'] :
  241. value = eval(config['cast'][key])(value)
  242. elif hasattr(handler,config['cast'][key]):
  243. pointer = getattr(handler,config['cast'][key])
  244. value = pointer(value)
  245. else:
  246. print ("Missing Pointer ",key,config['cast'])
  247. if type(value) == dict :
  248. for objkey in value :
  249. if type(value[objkey]) == dict :
  250. continue
  251. if 'syn' in config and value[objkey] in config['syn'] :
  252. # value[objkey] = config['syn'][ value[objkey]]
  253. pass
  254. if key in rewrite :
  255. _key = rewrite[key]
  256. if _key in value :
  257. value = value[_key]
  258. else:
  259. value = ""
  260. value = {key:value} if key not in value else value
  261. else:
  262. if 'syn' in config and value in config['syn'] :
  263. # value = config['syn'][value]
  264. pass
  265. if type(value) == dict :
  266. # object_value = dict(object_value, **value)
  267. object_value = jsonmerge.merge(object_value, value)
  268. else:
  269. object_value[key] = value
  270. else:
  271. #
  272. # we are dealing with a complex object
  273. object_value = []
  274. for row_item in row :
  275. value = self.get.value(row_item,config,version)
  276. object_value.append(value)
  277. #
  278. # We need to add the index of the object it matters in determining the claim types
  279. #
  280. # object_value.append( list(get_map(row_item,config,version)))
  281. # object_value = {label:object_value}
  282. return object_value
  283. def set_cache(self,tmp,_info) :
  284. """
  285. insert into cache a value that the, these are in reference to a loop
  286. """
  287. if 'cache' in _info :
  288. key = _info['cache']['key']
  289. value=_info['cache']['value']
  290. field = _info['cache']['field']
  291. if value in tmp :
  292. self.cache [key] = {field:tmp[value]}
  293. pass
  294. def get_cache(self,row) :
  295. """
  296. retrieve cache element for a current
  297. """
  298. key = row[0]
  299. return self.cache[key] if key in self.cache else {}
  300. def apply(self,content,_code) :
  301. """
  302. :content content of a file i.e a segment with the envelope
  303. :_code 837 or 835 (helps get the appropriate configuration)
  304. """
  305. util = Formatters()
  306. # header = default_value.copy()
  307. value = {}
  308. for row in content[:] :
  309. row = util.split(row.replace('\n','').replace('~',''))
  310. _info = util.get.config(self.config[_code][0],row)
  311. if self._custom_config and _code in self._custom_config:
  312. _cinfo = util.get.config(self._custom_config[_code],row)
  313. else:
  314. _cinfo = {}
  315. if _info or _cinfo:
  316. try:
  317. _info = jsonmerge.merge(_info,_cinfo)
  318. tmp = self.get.value(row,_info)
  319. if not tmp :
  320. continue
  321. #
  322. # At this point we have the configuration and the row parsed into values
  323. # We should check to see if we don't have anything in the cache to be added to it
  324. #
  325. if row[0] in self.cache :
  326. tmp = jsonmerge.merge(tmp,self.get_cache(row))
  327. if 'label' in _info :
  328. label = _info['label']
  329. if type(tmp) == list :
  330. value[label] = tmp if label not in value else value[label] + tmp
  331. else:
  332. # if 'DTM' in row :
  333. # print ([label,tmp,label in value])
  334. if label not in value :
  335. value[label] = []
  336. value[label].append(tmp)
  337. # if label not in value:
  338. # value[label] = [tmp]
  339. # else:
  340. # value[label].append(tmp)
  341. if '_index' not in tmp :
  342. #
  343. # In case we asked it to be overriden, then this will not apply
  344. # X12 occasionally requires references to other elements in a loop (alas)
  345. #
  346. tmp['_index'] = len(value[label]) -1
  347. elif 'field' in _info :
  348. name = _info['field']
  349. # value[name] = tmp
  350. # value = jsonmerge.merge(value,{name:tmp})
  351. value = dict(value,**{name:tmp})
  352. else:
  353. value = dict(value,**tmp)
  354. pass
  355. except Exception as e :
  356. print (e.args[0])
  357. # print ('__',(dir(e.args)))
  358. pass
  359. #
  360. # At this point the object is completely built,
  361. # if there ar any attributes to be cached it will be done here
  362. #
  363. if 'cache' in _info :
  364. self.set_cache(tmp,_info)
  365. return value if value else {}
  366. def get_default_value(self,content,_code):
  367. util = Formatters()
  368. TOP_ROW = content[1].split('*')
  369. SUBMITTED_DATE = util.parse.date(TOP_ROW[4])
  370. CATEGORY= content[2].split('*')[1].strip()
  371. VERSION = content[1].split('*')[-1].replace('~','').replace('\n','')
  372. SENDER_ID = TOP_ROW[2]
  373. row = util.split(content[3])
  374. _info = util.get_config(self.config[_code][0],row)
  375. value = self.get.value(row,_info,VERSION) if _info else {}
  376. value['category'] = {"setid": _code,"version":'X'+VERSION.split('X')[1],"id":VERSION.split('X')[0].strip()}
  377. value["submitted"] = SUBMITTED_DATE
  378. value['sender_id'] = SENDER_ID
  379. value = dict(value,**self.apply(content,_code))
  380. # Let's parse this for default values
  381. return value #jsonmerge.merge(value,self.apply(content,_code))
  382. def read(self,filename) :
  383. """
  384. :formerly get_content
  385. This function returns the of the EDI file parsed given the configuration specified. it is capable of identifying a file given the content
  386. :section loop prefix (HL, CLP)
  387. :config configuration with formatting rules, labels ...
  388. :filename location of the file
  389. """
  390. # section = section if section else config['SECTION']
  391. logs = []
  392. claims = []
  393. try:
  394. self.cache = {}
  395. file = open(filename.strip())
  396. file = file.read().split('CLP')
  397. _code = '835'
  398. section = 'CLP'
  399. if len(file) == 1 :
  400. file = file[0].split('CLM') #.split('HL')
  401. _code = '837'
  402. section = 'CLM' #'HL'
  403. INITIAL_ROWS = file[0].split(section)[0].split('\n')
  404. if len(INITIAL_ROWS) == 1 :
  405. INITIAL_ROWS = INITIAL_ROWS[0].split('~')
  406. # for item in file[1:] :
  407. # item = item.replace('~','\n')
  408. # print (INITIAL_ROWS)
  409. DEFAULT_VALUE = self.get.default_value(INITIAL_ROWS,_code)
  410. DEFAULT_VALUE['name'] = filename.strip()
  411. file = section.join(file).split('\n')
  412. if len(file) == 1:
  413. file = file[0].split('~')
  414. #
  415. # In the initial rows, there's redundant information (so much for x12 standard)
  416. # index 1 identifies file type i.e CLM for claim and CLP for remittance
  417. segment = []
  418. index = 0;
  419. _toprows = []
  420. _default = None
  421. for row in file :
  422. row = row.replace('\r','')
  423. # if not segment and not row.startswith(section):
  424. # _toprows += [row]
  425. if row.startswith(section) and not segment:
  426. segment = [row]
  427. continue
  428. elif segment and not row.startswith(section):
  429. segment.append(row)
  430. if len(segment) > 1 and row.startswith(section):
  431. #
  432. # process the segment somewhere (create a thread maybe?)
  433. #
  434. _claim = self.apply(segment,_code)
  435. if _claim :
  436. _claim['index'] = index #len(claims)
  437. # claims.append(dict(DEFAULT_VALUE,**_claim))
  438. #
  439. # schema = [ {key:{"mergeStrategy":"append" if list( type(_claim[key])) else "overwrite"}} for key in _claim.keys()] # if type(_claim[key]) == list]
  440. # _schema = set(DEFAULT_VALUE.keys()) - schema
  441. # if schema :
  442. # schema = {"properties":dict.fromkeys(schema,{"mergeStrategy":"append"})}
  443. # else:
  444. # schema = {"properties":{}}
  445. # schema = jsonmerge.merge(schema['properties'],dict.fromkeys(_schema,{"mergeStrategy":"overwrite"}))
  446. schema = {"properties":{}}
  447. for attr in _claim.keys() :
  448. schema['properties'][attr] = {"mergeStrategy": "append" if type(_claim[attr]) == list else "overwrite" }
  449. merger = jsonmerge.Merger(schema)
  450. _baseclaim = None
  451. _baseclaim = merger.merge(_baseclaim,copy.deepcopy(DEFAULT_VALUE))
  452. _claim = merger.merge(_baseclaim,_claim)
  453. # _claim = merger.merge(DEFAULT_VALUE.copy(),_claim)
  454. claims.append( _claim)
  455. segment = [row]
  456. index += 1
  457. pass
  458. #
  459. # Handling the last claim found
  460. if segment and segment[0].startswith(section) :
  461. # default_claim = dict({"name":index},**DEFAULT_VALUE)
  462. claim = self.apply(segment,_code)
  463. if claim :
  464. claim['index'] = len(claims)
  465. # schema = [key for key in claim.keys() if type(claim[key]) == list]
  466. # if schema :
  467. # schema = {"properties":dict.fromkeys(schema,{"mergeStrategy":"append"})}
  468. # else:
  469. # print (claim.keys())
  470. # schema = {}
  471. #
  472. # @TODO: Fix merger related to schema (drops certain fields ... NOT cool)
  473. # merger = jsonmerge.Merger(schema)
  474. # top_row_claim = self.apply(_toprows,_code)
  475. # claim = merger.merge(claim,self.apply(_toprows,_code))
  476. # claims.append(dict(DEFAULT_VALUE,**claim))
  477. schema = {"properties":{}}
  478. for attr in claim.keys() :
  479. schema['properties'][attr] = {"mergeStrategy": "append" if type(claim[attr]) == list else "overwrite" }
  480. merger = jsonmerge.Merger(schema)
  481. _baseclaim = None
  482. _baseclaim = merger.merge(_baseclaim,copy.deepcopy(DEFAULT_VALUE))
  483. claim = merger.merge(_baseclaim,claim)
  484. claims.append(claim)
  485. # claims.append(merger.merge(DEFAULT_VALUE.copy(),claim))
  486. if type(file) != list :
  487. file.close()
  488. # x12_file = open(filename.strip(),errors='ignore').read().split('\n')
  489. except Exception as e:
  490. logs.append ({"parse":_code,"completed":False,"name":filename,"msg":e.args[0]})
  491. return [],logs,None
  492. rate = 0 if len(claims) == 0 else (1 + index)/len(claims)
  493. logs.append ({"parse":"claims" if _code == '837' else 'remits',"completed":True,"name":filename,"rate":rate})
  494. # self.finish(claims,logs,_code)
  495. return claims,logs,_code
  496. def run(self):
  497. if self.emit.pre :
  498. self.emit.pre()
  499. for filename in self.files :
  500. content,logs,_code = self.read(filename)
  501. self.finish(content,logs,_code)
  502. def finish(self,content,logs,_code) :
  503. args = self.store
  504. _args = json.loads(json.dumps(self.store))
  505. if args['type'] == 'mongo.MongoWriter' :
  506. args['args']['doc'] = 'claims' if _code == '837' else 'remits'
  507. _args['args']['doc'] = 'logs'
  508. else:
  509. args['args']['table'] = 'claims' if _code == '837' else 'remits'
  510. _args['args']['table'] = 'logs'
  511. if content :
  512. writer = transport.factory.instance(**args)
  513. writer.write(content)
  514. writer.close()
  515. if logs :
  516. logger = transport.factory.instance(**_args)
  517. logger.write(logs)
  518. logger.close()
  519. if self.emit.post :
  520. self.emit.post(content,logs)