| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353 | """    (c) 2019 EDI-Parser 1.0    Vanderbilt University Medical Center, Health Information Privacy Laboratory    https://hiplab.mc.vanderbilt.edu/tools    Authors:        Khanhly Nguyen,         Steve L. Nyemba<steve.l.nyemba@vanderbilt.edu>    License:        MIT, terms are available at https://opensource.org/licenses/MIT    This parser was originally written by Khanhly Nguyen for her internship and is intended to parse x12 835,837 and others provided the appropriate configuration    USAGE :        - COMMAND LINE                - EMBEDDED"""import osimport sysimport hashlibimport jsonclass X12 :    def split(self,row,sep='*',prefix='HI') :        pass    def get_config(self,config,row):        pass    def hash(self,value):        pass    def suppress (self,value):        pass    def format_date(self,value):        pass    def split(row,sep='*',prefix='HI'):    """    This function is designed to split an x12 row and     """    if row.startswith(prefix) is False:        value = []        for row_value in row.replace('~','').split(sep) :                        if '>' in row_value :                if row_value.startswith('HC') or row_value.startswith('AD'):                                    value += row_value.split('>')[:2]                 else:                                        value += row_value.split('>')   if row.startswith('CLM') is False else [row_value]                                else :                                value.append(row_value)        return [xchar.replace('\r','') for xchar in value] #row.replace('~','').split(sep)    else:                return [ [prefix]+ split(item,'>') for item in row.replace('~','').split(sep)[1:] ]def get_config(config,row):    """    This function will return the meaningfull parts of the configuration for a given item    """    _row = list(row) if type(row[0]) == str else list(row[0])        _info = config[_row[0]] if _row[0] in config else {}    key = None    if '@ref' in _info:        key = list(set(_row) & set(_info['@ref'].keys()))        if key :            key  = key[0]            return _info['@ref'][key]        else:            return {}            if not _info and 'SIMILAR' in config:        #        # Let's look for the nearest key using the edit distance        if _row[0] in config['SIMILAR']    :            key = config['SIMILAR'][_row[0]]            _info = config[key]        return _infodef hash(value):    salt = os.environ['HEALTHCAREIO_SALT'] if 'HEALTHCAREIO_SALT' in os.environ else ''    _value = str(value)+ salt    if sys.version_info[0] > 2 :        return hashlib.md5(_value.encode('utf-8')).hexdigest()    else:        return hashlib.md5(_value).hexdigest()def suppress(value):    return 'N/A'    def format_date(value) :    if len(value) == 8 :        year = value[:4]        month = value[4:6]        day = value[6:]        return "-".join([year,month,day])[:10] #{"year":year,"month":month,"day":day}    elif len(value) == 6 :        year = '20' + value[:2]        month = value[2:4]        day   = value[4:]        return "-".join([year,month,day])def format_time(value):    return ":".join([value[:2],value[2:] ])[:5]def sv3_parse(value):    if '>' in value :        terms = value.split('>')        return {'type':terms[0],'code':terms[1]}            passdef sv2_parse(value):    #    # @TODO: Sometimes there's a suffix (need to inventory all the variations)    #    if '>' in value or ':' in value:        xchar = '>' if '>' in value else ':'        _values = value.split(xchar)        modifier = {}                if len(_values) > 2 :            modifier= {"code":_values[2]}            if len(_values) > 3 :                modifier['type'] = _values[3]        _value = {"code":_values[1],"type":_values[0]}        if modifier :            _value['modifier'] = modifier        return _value    else:        return valuedef format_proc(value):    for xchar in [':','<'] :        if xchar in value and len(value.split(xchar)) > 1 :            #_value = {"type":value.split(':')[0].strip(),"code":value.split(':')[1].strip()}            _value = {"type":value.split(xchar)[0].strip(),"code":value.split(xchar)[1].strip()}            break        else:            _value = str(value)    return _valuedef format_diag(value):    return [ {"code":item[2], "type":item[1]} for item in value if len(item) > 1]def format_pos(value):        xchar = '>' if '>' in value else ':'    x = value.split(xchar)        x =  {"code":x[0],"indicator":x[1],"frequency":x[2]} if len(x) == 3 else {"code":x[0],"indicator":None,"frequency":None}    return x    def get_map(row,config,version=None):        label = config['label'] if 'label' in config else None            omap = config['map'] if not version or version not in config else config[version]    anchors = config['anchors'] if 'anchors' in config else []    if type(row[0]) == str:                object_value = {}        for key in omap :            index = omap[key]            if anchors and set(anchors) & set(row):                _key = list(set(anchors) & set(row))[0]                                aindex = row.index(_key)                index = aindex +  index            if index < len(row) :                value = row[index]                                 if 'cast' in config and key in config['cast'] and value.strip() != '' :                                        value = eval(config['cast'][key])(value)                                    if type(value) == dict :                    for objkey in value :                                                if type(value[objkey]) == dict :                            continue                         if 'syn' in config and value[objkey] in config['syn'] :                            value[objkey] = config['syn'][ value[objkey]]                    value = {key:value} if key not  in value else value                else:                    if 'syn' in config and value in config['syn'] :                        value = config['syn'][value]                if type(value) == dict :                                        object_value = dict(object_value, **value)                 else:                    object_value[key] = value    else:        #        # we are dealing with a complex object        object_value = []                for row_item in row :            value = get_map(row_item,config,version)                        object_value.append(value)            #            # We need to add the index of the object it matters in determining the claim types            #                        # object_value.append( list(get_map(row_item,config,version)))        # object_value = {label:object_value}    return object_valuedef get_locations(x12_file,section='HL') :    locations = []    for line in x12_file :                if line.strip().startswith(section) :            i = x12_file.index(line)            locations.append(i)    return locations#def get_claims(filename,config,section) :def get_content(filename,config,section=None) :    """    This function returns the of the EDI file parsed given the configuration specified    :section    loop prefix (HL, CLP)    :config     configuration with formatting rules, labels ...    :filename   location of the file    """    section = section if section else config['SECTION']    logs = []    try:        x12_file = open(filename.strip(),errors='ignore').read().split('\n')    except Exception as e:        #        # We have an error here that should be logged         if sys.version_info[0] > 2 :            # logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]})            logs.append ({"version":"unknown","filename":filename,"msg":e.args[0]})        else:            # logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]})            logs.append ({"version":"unknown","filename":filename,"msg":e.message})        return [],logs        pass        if len(x12_file) == 1 :                x12_file = x12_file[0].split('~')            #partitions = '\n'.join(x12_file).split(section+'*')    locations = get_locations(x12_file,section)    claims = []    #    # given locations it is possible to build up the partitions (made of segments)        beg = locations [0]    partitions = []    for end in locations[1:] :        partitions.append ("\n".join(x12_file[beg:end]))        beg = end        # VERSION = x12_file[2].split('*')[3].replace('~','')        TOP_ROW = x12_file[1].split('*')    CATEGORY= x12_file[2].split('*')[1].strip()    VERSION         = x12_file[1].split('*')[-1].replace('~','')       SUBMITTED_DATE  = format_date(TOP_ROW[4])    SENDER_ID       = TOP_ROW[2]    row = split(x12_file[3])    _info = get_config(config,row)            _default_value = get_map(row,_info,VERSION) if _info else {}            N = len(locations)    # for index in range(0,N-1):    #     beg = locations[index]    #     end = locations[index+1]    #     claim = {}    for segment in partitions :                claim = {}           # for row in x12_file[beg:end] :        segment = segment.replace('\n','').split('~')        for row in segment :            row = split(row)                        _info = get_config(config,row)            if _info :                try:                                        # tmp = get_map(row,_info,VERSION)                    # if 'parser' in _info :                    #     pointer = eval(_info['parser'])                                        tmp = get_map(row,_info,VERSION)                                    except Exception as e:                                        if sys.version_info[0] > 2 :                        # logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":x12_file[beg:end]})                        logs.append ({"version":VERSION,"filename":filename,"msg":e.args[0],"X12":row,"completed":False,"rows":len(row)})                    else:                        # logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":x12_file[beg:end]})                        logs.append ({"version":VERSION,"filename":filename,"msg":e.message,"X12":row,"rows":len(row),"completed":False})                    claim = {}                    break                                if 'label' not in _info :                    tmp['version']      = VERSION                       tmp['submitted']    = SUBMITTED_DATE                    if TOP_ROW[1] == 'HP' :                        tmp['payer_id'] = SENDER_ID                                            elif TOP_ROW[1] == 'HC':                        tmp['provider_id'] = SENDER_ID                                            tmp['category'] = {"setid": CATEGORY,"version":'X'+VERSION.split('X')[1],"id":VERSION.split('X')[0].strip()}                    claim = dict(claim, **tmp)                                                        else:                    label = _info['label']                    if type(tmp) == list :                                                claim[label] = tmp if label not in claim else claim[label] + tmp                                            else:                        if label not in claim:                                                claim[label] = [tmp]                        elif len(list(tmp.keys())) == 1 :                                                        index = len(claim[label]) -1                             claim[label][index] = dict(claim[label][index],**tmp)                        else:                            claim[label].append(tmp)                    if len(claim[label]) > 0 :                                            labels = []                        for item in claim[label] :                            item['_index'] = len(labels)                            if item not in labels :                                                                labels.append(item)                        claim[label] = labels                        # claim[label] = list( set(claim[label])) #-- removing redundancies        if claim and 'claim_id' in claim:                        claim = dict(claim,**_default_value)            claim['name'] = filename.split(os.sep)[-1] #.replace(ROOT,'')            claim['index'] = len(claims) if len(claims) > 0 else 0            claims.append(claim)        else:            #            # Could not find claim identifier associated with data             #            pass                            return claims,logs
 |