123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 |
- """
- (c) 2019 EDI-Parser 1.0
- Vanderbilt University Medical Center, Health Information Privacy Laboratory
- https://hiplab.mc.vanderbilt.edu/tools
- Authors:
- Khanhly Nguyen,
- Steve L. Nyemba<steve.l.nyemba@vanderbilt.edu>
- License:
- MIT, terms are available at https://opensource.org/licenses/MIT
- This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration
- USAGE :
- - COMMAND LINE
-
- - EMBEDDED
- """
- import os
- import sys
- import hashlib
- import json
- class X12 :
- def split(self,row,sep='*',prefix='HI') :
- pass
- def get_config(self,config,row):
- pass
- def hash(self,value):
- pass
- def suppress (self,value):
- pass
- def format_date(self,value):
- pass
-
- def split(row,sep='*',prefix='HI'):
- """
- This function is designed to split an x12 row and
- """
- if row.startswith(prefix) is False:
- value = []
- for row_value in row.replace('~','').split(sep) :
-
- if '>' in row_value :
- if row_value.startswith('HC') or row_value.startswith('AD'):
-
- value += row_value.split('>')[:2]
- else:
-
- value += row_value.split('>') if row.startswith('CLM') is False else [row_value]
-
- else :
-
- value.append(row_value)
- return [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep)
- else:
-
- return [ [prefix]+ split(item,'>') for item in row.replace('~','').split(sep)[1:] ]
- def get_config(config,row):
- """
- This function will return the meaningfull parts of the configuration for a given item
- """
- _row = list(row) if type(row[0]) == str else list(row[0])
-
- _info = config[_row[0]] if _row[0] in config else {}
- key = None
- if '@ref' in _info:
- key = list(set(_row) & set(_info['@ref'].keys()))
- if key :
- key = key[0]
- return _info['@ref'][key]
- else:
- return {}
-
- if not _info and 'SIMILAR' in config:
- #
- # Let's look for the nearest key using the edit distance
- if _row[0] in config['SIMILAR'] :
- key = config['SIMILAR'][_row[0]]
- _info = config[key]
-
- return _info
- def hash(value):
- salt = os.environ['HEALTHCAREIO_SALT'] if 'HEALTHCAREIO_SALT' in os.environ else ''
- _value = str(value)+ salt
- if sys.version_info[0] > 2 :
- return hashlib.md5(_value.encode('utf-8')).hexdigest()
- else:
- return hashlib.md5(_value).hexdigest()
- def suppress(value):
- return 'N/A'
-
- def format_date(value) :
- if len(value) == 8 :
- year = value[:4]
- month = value[4:6]
- day = value[6:]
- return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day}
- elif len(value) == 6 :
- year = '20' + value[:2]
- month = value[2:4]
- day = value[4:]
- return "-".join([year,month,day])
- def format_time(value):
- return ":".join([value[:2],value[2:] ])[:5]
- def sv3_parse(value):
- if '>' in value :
- terms = value.split('>')
- return {'type':terms[0],'code':terms[1]}
-
- pass
- def sv2_parse(value):
- #
- # @TODO: Sometimes there's a suffix (need to inventory all the variations)
- #
- if '>' in value or ':' in value:
- xchar = '>' if '>' in value else ':'
- _values = value.split(xchar)
- modifier = {}
-
- if len(_values) > 2 :
- modifier= {"code":_values[2]}
- if len(_values) > 3 :
- modifier['type'] = _values[3]
- _value = {"code":_values[1],"type":_values[0]}
- if modifier :
- _value['modifier'] = modifier
- return _value
- else:
- return value
- def format_proc(value):
- for xchar in [':','<'] :
- if xchar in value and len(value.split(xchar)) > 1 :
- #_value = {"type":value.split(':')[0].strip(),"code":value.split(':')[1].strip()}
- _value = {"type":value.split(xchar)[0].strip(),"code":value.split(xchar)[1].strip()}
- break
- else:
- _value = str(value)
- return _value
- def format_diag(value):
- return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1]
- def format_pos(value):
-
- xchar = '>' if '>' in value else ':'
- x = value.split(xchar)
- x = {"code":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"code":x[0],"indicator":None,"frequency":None}
- return x
-
- def get_map(row,config,version=None):
-
- label = config['label'] if 'label' in config else None
-
- omap = config['map'] if not version or version not in config else config[version]
- anchors = config['anchors'] if 'anchors' in config else []
- if type(row[0]) == str:
- object_value = {}
- for key in omap :
- index = omap[key]
- if anchors and set(anchors) & set(row):
- _key = list(set(anchors) & set(row))[0]
-
- aindex = row.index(_key)
- index = aindex + index
- if index < len(row) :
- value = row[index]
-
- if 'cast' in config and key in config['cast'] and value.strip() != '' :
-
- value = eval(config['cast'][key])(value)
-
- if type(value) == dict :
- for objkey in value :
-
- if type(value[objkey]) == dict :
- continue
- if 'syn' in config and value[objkey] in config['syn'] :
- value[objkey] = config['syn'][ value[objkey]]
- value = {key:value} if key not in value else value
- else:
- if 'syn' in config and value in config['syn'] :
- value = config['syn'][value]
- if type(value) == dict :
-
- object_value = dict(object_value, **value)
- else:
- object_value[key] = value
- else:
- #
- # we are dealing with a complex object
- object_value = []
-
- for row_item in row :
- value = get_map(row_item,config,version)
- object_value.append(value)
- #
- # We need to add the index of the object it matters in determining the claim types
- #
-
- # object_value.append( list(get_map(row_item,config,version)))
- # object_value = {label:object_value}
- return object_value
- def get_locations(x12_file,section='HL') :
- locations = []
- for line in x12_file :
-
- if line.strip().startswith(section) :
- i = x12_file.index(line)
- locations.append(i)
- return locations
- #def get_claims(filename,config,section) :
- def get_content(filename,config,section=None) :
- """
- This function returns the of the EDI file parsed given the configuration specified
- :section loop prefix (HL, CLP)
- :config configuration with formatting rules, labels ...
- :filename location of the file
- """
- section = section if section else config['SECTION']
- logs = []
- try:
- x12_file = open(filename.strip(),errors='ignore').read().split('\n')
- except Exception as e:
- #
- # We have an error here that should be logged
- if sys.version_info[0] > 2 :
- # logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]})
- logs.append ({"version":"unknown","filename":filename,"msg":e.args[0]})
- else:
- # logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]})
- logs.append ({"version":"unknown","filename":filename,"msg":e.message})
- return [],logs
- pass
-
- if len(x12_file) == 1 :
-
- x12_file = x12_file[0].split('~')
-
- #partitions = '\n'.join(x12_file).split(section+'*')
- locations = get_locations(x12_file,section)
- claims = []
- #
- # given locations it is possible to build up the partitions (made of segments)
-
- beg = locations [0]
- partitions = []
- for end in locations[1:] :
- partitions.append ("\n".join(x12_file[beg:end]))
- beg = end
-
- # VERSION = x12_file[2].split('*')[3].replace('~','')
- TOP_ROW = x12_file[1].split('*')
- CATEGORY= x12_file[2].split('*')[1].strip()
- VERSION = x12_file[1].split('*')[-1].replace('~','')
- SUBMITTED_DATE = format_date(TOP_ROW[4])
- SENDER_ID = TOP_ROW[2]
- row = split(x12_file[3])
- _info = get_config(config,row)
-
- _default_value = get_map(row,_info,VERSION) if _info else {}
-
- N = len(locations)
- # for index in range(0,N-1):
- # beg = locations[index]
- # end = locations[index+1]
- # claim = {}
- for segment in partitions :
-
- claim = {}
- # for row in x12_file[beg:end] :
- segment = segment.replace('\n','').split('~')
- for row in segment :
- row = split(row)
-
- _info = get_config(config,row)
- if _info :
- try:
- # tmp = get_map(row,_info,VERSION)
- # if 'parser' in _info :
- # pointer = eval(_info['parser'])
-
- tmp = get_map(row,_info,VERSION)
-
- except Exception as e:
- if sys.version_info[0] > 2 :
- # logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]})
- logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":row,"completed":False,"rows":len(row)})
- else:
- # logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]})
- logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":row,"rows":len(row),"completed":False})
- claim = {}
- break
-
- if 'label' not in _info :
- tmp['version'] = VERSION
- tmp['submitted'] = SUBMITTED_DATE
- if TOP_ROW[1] == 'HP' :
- tmp['payer_id'] = SENDER_ID
-
- elif TOP_ROW[1] == 'HC':
- tmp['provider_id'] = SENDER_ID
-
- tmp['category'] = {"setid": CATEGORY,"version":'X'+VERSION.split('X')[1],"id":VERSION.split('X')[0].strip()}
- claim = dict(claim, **tmp)
-
-
- else:
- label = _info['label']
- if type(tmp) == list :
-
- claim[label] = tmp if label not in claim else claim[label] + tmp
-
- else:
- if label not in claim:
- claim[label] = [tmp]
- elif len(list(tmp.keys())) == 1 :
-
- index = len(claim[label]) -1
- claim[label][index] = dict(claim[label][index],**tmp)
- else:
- claim[label].append(tmp)
- if len(claim[label]) > 0 :
- labels = []
- for item in claim[label] :
- item['_index'] = len(labels)
- if item not in labels :
-
- labels.append(item)
- claim[label] = labels
- # claim[label] = list( set(claim[label])) #-- removing redundancies
- if claim and 'claim_id' in claim:
-
- claim = dict(claim,**_default_value)
- claim['name'] = filename.split(os.sep)[-1] #.replace(ROOT,'')
- claim['index'] = len(claims) if len(claims) > 0 else 0
- claims.append(claim)
- else:
- #
- # Could not find claim identifier associated with data
- #
- pass
-
-
- return claims,logs
|