12 KB

  1. """
  2. (c) 2019 EDI-Parser 1.0
  3. Vanderbilt University Medical Center, Health Information Privacy Laboratory
  5. Authors:
  6. Khanhly Nguyen,
  7. Steve L. Nyemba<>
  8. License:
  9. MIT, terms are available at
  10. This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration
  11. USAGE :
  13. - EMBEDDED
  14. """
  15. import os
  16. import sys
  17. import hashlib
  18. import json
  19. class X12 :
  20. def split(self,row,sep='*',prefix='HI') :
  21. pass
  22. def get_config(self,config,row):
  23. pass
  24. def hash(self,value):
  25. pass
  26. def suppress (self,value):
  27. pass
  28. def format_date(self,value):
  29. pass
  30. def split(row,sep='*',prefix='HI'):
  31. """
  32. This function is designed to split an x12 row and
  33. """
  34. if row.startswith(prefix) is False:
  35. value = []
  36. for row_value in row.replace('~','').split(sep) :
  37. if '>' in row_value :
  38. if row_value.startswith('HC') or row_value.startswith('AD'):
  39. value += row_value.split('>')[:2]
  40. else:
  41. value += row_value.split('>') if row.startswith('CLM') is False else [row_value]
  42. else :
  43. value.append(row_value)
  44. return [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep)
  45. else:
  46. return [ [prefix]+ split(item,'>') for item in row.replace('~','').split(sep)[1:] ]
  47. def get_config(config,row):
  48. """
  49. This function will return the meaningfull parts of the configuration for a given item
  50. """
  51. _row = list(row) if type(row[0]) == str else list(row[0])
  52. _info = config[_row[0]] if _row[0] in config else {}
  53. key = None
  54. if '@ref' in _info:
  55. key = list(set(_row) & set(_info['@ref'].keys()))
  56. if key :
  57. key = key[0]
  58. return _info['@ref'][key]
  59. else:
  60. return {}
  61. if not _info and 'SIMILAR' in config:
  62. #
  63. # Let's look for the nearest key using the edit distance
  64. if _row[0] in config['SIMILAR'] :
  65. key = config['SIMILAR'][_row[0]]
  66. _info = config[key]
  67. return _info
  68. def hash(value):
  69. salt = os.environ['HEALTHCAREIO_SALT'] if 'HEALTHCAREIO_SALT' in os.environ else ''
  70. _value = str(value)+ salt
  71. if sys.version_info[0] > 2 :
  72. return hashlib.md5(_value.encode('utf-8')).hexdigest()
  73. else:
  74. return hashlib.md5(_value).hexdigest()
  75. def suppress(value):
  76. return 'N/A'
  77. def format_date(value) :
  78. if len(value) == 8 :
  79. year = value[:4]
  80. month = value[4:6]
  81. day = value[6:]
  82. return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day}
  83. elif len(value) == 6 :
  84. year = '20' + value[:2]
  85. month = value[2:4]
  86. day = value[4:]
  87. return "-".join([year,month,day])
  88. def format_time(value):
  89. return ":".join([value[:2],value[2:] ])[:5]
  90. def sv3_parse(value):
  91. if '>' in value :
  92. terms = value.split('>')
  93. return {'type':terms[0],'code':terms[1]}
  94. pass
  95. def sv2_parse(value):
  96. #
  97. # @TODO: Sometimes there's a suffix (need to inventory all the variations)
  98. #
  99. if '>' in value or ':' in value:
  100. xchar = '>' if '>' in value else ':'
  101. _values = value.split(xchar)
  102. modifier = {}
  103. if len(_values) > 2 :
  104. modifier= {"code":_values[2]}
  105. if len(_values) > 3 :
  106. modifier['type'] = _values[3]
  107. _value = {"code":_values[1],"type":_values[0]}
  108. if modifier :
  109. _value['modifier'] = modifier
  110. return _value
  111. else:
  112. return value
  113. def format_proc(value):
  114. for xchar in [':','<'] :
  115. if xchar in value and len(value.split(xchar)) > 1 :
  116. #_value = {"type":value.split(':')[0].strip(),"code":value.split(':')[1].strip()}
  117. _value = {"type":value.split(xchar)[0].strip(),"code":value.split(xchar)[1].strip()}
  118. break
  119. else:
  120. _value = str(value)
  121. return _value
  122. def format_diag(value):
  123. return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1]
  124. def format_pos(value):
  125. xchar = '>' if '>' in value else ':'
  126. x = value.split(xchar)
  127. x = {"code":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"code":x[0],"indicator":None,"frequency":None}
  128. return x
  129. def get_map(row,config,version=None):
  130. label = config['label'] if 'label' in config else None
  131. omap = config['map'] if not version or version not in config else config[version]
  132. anchors = config['anchors'] if 'anchors' in config else []
  133. if type(row[0]) == str:
  134. object_value = {}
  135. for key in omap :
  136. index = omap[key]
  137. if anchors and set(anchors) & set(row):
  138. _key = list(set(anchors) & set(row))[0]
  139. aindex = row.index(_key)
  140. index = aindex + index
  141. if index < len(row) :
  142. value = row[index]
  143. if 'cast' in config and key in config['cast'] and value.strip() != '' :
  144. value = eval(config['cast'][key])(value)
  145. if type(value) == dict :
  146. for objkey in value :
  147. if type(value[objkey]) == dict :
  148. continue
  149. if 'syn' in config and value[objkey] in config['syn'] :
  150. value[objkey] = config['syn'][ value[objkey]]
  151. value = {key:value} if key not in value else value
  152. else:
  153. if 'syn' in config and value in config['syn'] :
  154. value = config['syn'][value]
  155. if type(value) == dict :
  156. object_value = dict(object_value, **value)
  157. else:
  158. object_value[key] = value
  159. else:
  160. #
  161. # we are dealing with a complex object
  162. object_value = []
  163. for row_item in row :
  164. value = get_map(row_item,config,version)
  165. object_value.append(value)
  166. #
  167. # We need to add the index of the object it matters in determining the claim types
  168. #
  169. # object_value.append( list(get_map(row_item,config,version)))
  170. # object_value = {label:object_value}
  171. return object_value
  172. def get_locations(x12_file,section='HL') :
  173. locations = []
  174. for line in x12_file :
  175. if line.strip().startswith(section) :
  176. i = x12_file.index(line)
  177. locations.append(i)
  178. return locations
  179. #def get_claims(filename,config,section) :
  180. def get_content(filename,config,section=None) :
  181. """
  182. This function returns the of the EDI file parsed given the configuration specified
  183. :section loop prefix (HL, CLP)
  184. :config configuration with formatting rules, labels ...
  185. :filename location of the file
  186. """
  187. section = section if section else config['SECTION']
  188. logs = []
  189. try:
  190. x12_file = open(filename.strip(),errors='ignore').read().split('\n')
  191. except Exception as e:
  192. #
  193. # We have an error here that should be logged
  194. if sys.version_info[0] > 2 :
  195. # logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]})
  196. logs.append ({"version":"unknown","filename":filename,"msg":e.args[0]})
  197. else:
  198. # logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]})
  199. logs.append ({"version":"unknown","filename":filename,"msg":e.message})
  200. return [],logs
  201. pass
  202. if len(x12_file) == 1 :
  203. x12_file = x12_file[0].split('~')
  204. #partitions = '\n'.join(x12_file).split(section+'*')
  205. locations = get_locations(x12_file,section)
  206. claims = []
  207. #
  208. # given locations it is possible to build up the partitions (made of segments)
  209. beg = locations [0]
  210. partitions = []
  211. for end in locations[1:] :
  212. partitions.append ("\n".join(x12_file[beg:end]))
  213. beg = end
  214. # VERSION = x12_file[2].split('*')[3].replace('~','')
  215. TOP_ROW = x12_file[1].split('*')
  216. CATEGORY= x12_file[2].split('*')[1].strip()
  217. VERSION = x12_file[1].split('*')[-1].replace('~','')
  218. SUBMITTED_DATE = format_date(TOP_ROW[4])
  219. SENDER_ID = TOP_ROW[2]
  220. row = split(x12_file[3])
  221. _info = get_config(config,row)
  222. _default_value = get_map(row,_info,VERSION) if _info else {}
  223. N = len(locations)
  224. # for index in range(0,N-1):
  225. # beg = locations[index]
  226. # end = locations[index+1]
  227. # claim = {}
  228. for segment in partitions :
  229. claim = {}
  230. # for row in x12_file[beg:end] :
  231. segment = segment.replace('\n','').split('~')
  232. for row in segment :
  233. row = split(row)
  234. _info = get_config(config,row)
  235. if _info :
  236. try:
  237. # tmp = get_map(row,_info,VERSION)
  238. # if 'parser' in _info :
  239. # pointer = eval(_info['parser'])
  240. tmp = get_map(row,_info,VERSION)
  241. except Exception as e:
  242. if sys.version_info[0] > 2 :
  243. # logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]})
  244. logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":row,"completed":False,"rows":len(row)})
  245. else:
  246. # logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]})
  247. logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":row,"rows":len(row),"completed":False})
  248. claim = {}
  249. break
  250. if 'label' not in _info :
  251. tmp['version'] = VERSION
  252. tmp['submitted'] = SUBMITTED_DATE
  253. if TOP_ROW[1] == 'HP' :
  254. tmp['payer_id'] = SENDER_ID
  255. elif TOP_ROW[1] == 'HC':
  256. tmp['provider_id'] = SENDER_ID
  257. tmp['category'] = {"setid": CATEGORY,"version":'X'+VERSION.split('X')[1],"id":VERSION.split('X')[0].strip()}
  258. claim = dict(claim, **tmp)
  259. else:
  260. label = _info['label']
  261. if type(tmp) == list :
  262. claim[label] = tmp if label not in claim else claim[label] + tmp
  263. else:
  264. if label not in claim:
  265. claim[label] = [tmp]
  266. elif len(list(tmp.keys())) == 1 :
  267. index = len(claim[label]) -1
  268. claim[label][index] = dict(claim[label][index],**tmp)
  269. else:
  270. claim[label].append(tmp)
  271. if len(claim[label]) > 0 :
  272. labels = []
  273. for item in claim[label] :
  274. item['_index'] = len(labels)
  275. if item not in labels :
  276. labels.append(item)
  277. claim[label] = labels
  278. # claim[label] = list( set(claim[label])) #-- removing redundancies
  279. if claim and 'claim_id' in claim:
  280. claim = dict(claim,**_default_value)
  281. claim['name'] = filename.split(os.sep)[-1] #.replace(ROOT,'')
  282. claim['index'] = len(claims) if len(claims) > 0 else 0
  283. claims.append(claim)
  284. else:
  285. #
  286. # Could not find claim identifier associated with data
  287. #
  288. pass
  289. return claims,logs