123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579 |
- """
- This function contains code to load plugins, the idea of a plugin is that it is designed to load, functions written outside of the scope of the parser.
- This enables better parsing, and the creation of a pipeline that can perform pre/post conditions given that {x12} is organized in loops
- Contract:
- Functions will take in an object (the row of the claim they are intended to parse), and prior rows (parsed)
- """
- import os
- import importlib as IL
- # import imp
- import sys
- from healthcareio import x12
- class parser:
- """
- This is a decorator used against the plugins to determine which function get applied against a given x12 element
- The following are mandatory :
- {element,map} or {element,columns},
- - Providing the map attribute will invoke the built-in parser
- - not providing the built-in parser will suggest that the code will handle parsing itself and thus must inform of the attributes expected
- """
- def __init__(self,**_args):
- self.element = _args['element']
- self.x12 = _args['x12'] if 'x12' in _args else '*'
- self.parent = _args['parent'] if 'parent' in _args else None
- self.map = None if 'map' not in _args else _args['map']
- self.cols = None if 'columns' not in _args else _args['columns']
- self.anchor = None if 'anchor' not in _args else _args['anchor']
-
- if 'field' in _args :
- self.field = _args['field']
- elif 'container' in _args :
-
- self.container = _args['container']
-
-
-
-
- def __call__(self, pointer):
- def wrapper(**args): #(*args, **kwargs):
- #
- # @TODO: Log this in case we have an error
- return pointer(**args) #(*args, **kwargs)
- #
- #-- These attributes will be read by the factory class to make sure the functions are assigned the data they are designed for
- setattr(wrapper,'element',self.element)
- setattr(wrapper,'x12',self.x12)
- setattr(wrapper,'parent',self.parent)
- _meta = {'element':self.element,'x12':self.x12,'parent':self.parent}
- if self.cols :
- setattr(wrapper,'columns',self.cols)
- _meta['columns'] = self.cols
- elif self.map :
- setattr(wrapper,'map',self.map)
- _meta['map'] = self.map
- if hasattr(self,'container') :
- setattr(wrapper,'container',self.container)
- _meta['container'] = self.container
- if hasattr(self,'field') :
- setattr(wrapper,'field',self.field)
- _meta['field'] = self.field
- if hasattr(self,'anchor') and self.anchor:
- _meta['anchor'] = self.anchor
- setattr(wrapper,'anchor',self.anchor)
- setattr(wrapper,'meta',_meta)
- #
- # clean up here ....
- # 1. if anchor no field|containers
- # 2. if parent no field or containers
- # 3. field and containers can't be
-
- return wrapper
- DEFAULT_PLUGINS='healthcareio.x12.plugins.default'
- def isplugin(_module,_name) :
- """
- This function for a given module returns a True if a function is a plugin if
- :_module module
- :_name a given name of a resource in a module
- """
-
- p = type(getattr(_module,_name)).__name__ =='function' # is a function
- q = hasattr(getattr(_module,_name),'element') # has element {x12}
- r = hasattr(getattr(_module,_name),'x12') # has an {x12} type i.e *,835,837
- return p and q and r
- def build_map (**_args):
- """
- This function builds the map of {x12,element,pointer} to enable rapid access to parsing fucntions
- :module module object as returned framework (importlib or imp)
- """
- _plugins = [getattr(_args['module'],_name) for _name in dir(_args['module']) if isplugin(_args['module'],_name)]
- _map = {}
- _parents = {}
- for _item in _plugins :
- _id = _item.x12
- if _id not in _map :
- _map[_id] = {}
- _element = _item.element
- if _item.parent :
- if _item.parent not in _parents :
- _parents[_item.parent] = []
- _parents[_item.parent].append (_item.element)
- if type(_element) == list :
- for _e in _element :
- _map[_id][_e.strip()] = _item
- else:
- _map[_id][_element]= _item
- return _map,_parents
- def instance(**_args):
- """
- This function returns an dictionary/map of functions available for execution
- The functions are organized as follows: {835:{BHT:_pointer,NM1:_pointer}}
- :_args
- - path provides the location (folder|file) to be extracted
- """
- #
- # Loading one of the default functions built-in ...
- #
- _module = IL.import_module(DEFAULT_PLUGINS)
- _map,_parents = build_map(module=_module)
- #
- # loaded the common functions, now load domain specific ones one
- #
- _packages = ['remits','claims']
- _handler = x12.util.document.Builder(parents={},plugins={})
-
- for _x12 in _packages :
- if hasattr(_module,_x12) :
-
- _ixmap,_ixparents = build_map(module=getattr(_module,_x12))
- _map = _handler.merge(_map,_ixmap)
- _parents = _handler.merge(_parents,_ixparents)
- #
- # consolidate, the common elements across the ...
- # We override any common processing element with the version specific element
- _map['835'] = _handler.merge(_map['*'],_map['835'])
- _map['837'] = _handler.merge(_map['*'],_map['837'])
-
- if 'path' in _args:
- #
- # We can/will override the default modules given the user has provided a location
- # _module = imp.load_source('udf',_args['path'])
- _module = IL.machinery.SourcefileLoader('udf',_args['path']).load_module()
- _udf_map,_udfp = build_map(module=_module)
- _map = dict(_map,**_udf_map)
- for key in _udfp:
- if key not in _parents :
- _parents[key] = _udfp[key]
- else:
- _parents[key] = _parents[key] + _udfp[key]
- if 'filter' in _args :
- return filter(elements = _args['filter'],plugins=_map)
- # return _smap
-
- return _map,_parents
- def merge (_x,_y):
- """
- This function will merge two objects _x, _y
- """
- _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
-
- if _zcols :
- _out = dict(_x,**{})
- for _key in list(_y.keys()) :
-
-
- if _key not in _zcols and _key:
- _out[_key] = _y[_key]
- else:
- if type(_out[_key]) == list :
- for value in _y[_key] :
- if value not in _out[_key] :
- _out[_key].append(value)
- # _out[_key] += _y[_key]
- elif type(_out[_key]) == dict:
- _out[_key] = dict(_out[_key],**_y[_key])
- else:
- _out[_key] = _y[_key]
-
- return _out
- else:
-
- return dict(_x,**_y)
-
- # def template(**_args) :
- # """
- # This function generates an object template to be used in object assignment and export functionalities
- # We chose to proceed in this manner so as to enforce consistency of the parser
- # :plugins {*,837,835} with element and pointers associated
- # """
- # _plugins = _args['plugins']
- # _object = {'837':{},'835':{}}
- # for _x12 in _plugins :
- # _pointers = _plugins[_x12]
- # for _element in _pointers :
- # _meta = _pointers[_element].meta
- # _values = _meta['map'].values() if 'map' in _meta else _meta['columns']
- # #
- # # where do the attributes go ..
- # #
- # _attr = []
- # for _item in list(_values) :
- # if type(_item) == list :
- # _attr = _attr + _item
- # else:
- # _attr.append(_item)
- # _field = []
- # if 'field' in _meta or 'container' in _meta :
- # _field = _meta['field'] if 'field' in _meta else _meta['container']
-
- # if 'anchor' in _meta : #-- No parents are expected
- # _field = _meta['anchor'].values()
-
- # elif _meta['parent'] :
- # #
- # # It means the attributes will be
- # _parentPlug = filter(elements=[_meta['parent']],plugins=_plugins)
- # _pid = list(_parentPlug.keys())[0]
- # _parentMeta = _parentPlug[_pid][_meta['parent']].meta
-
- # _attr = _attr + list(_parentMeta['map'].values()) if 'map' in _parentMeta else _parentMeta['columns']
- # if 'anchor' in _parentMeta :
- # _field = list(_parentMeta['anchor'].values())
- # _field = [_field] if type(_field) == str else _field
- # _attr = dict.fromkeys(_attr,'')
- # if not _field :
- # _info = (_attr)
- # else:
- # _info = (dict.fromkeys(_field,_attr))
- # if _x12 == '*' :
-
- # _object['837']= merge(_object['837'], _info)
- # _object['835']= merge (_object['835'], _info)
- # else:
- # _object[_x12] = merge(_object[_x12],_info)
- # return _object
-
- def filter (**_args) :
- _elements = _args['elements']
- _plugins = _args['plugins']
- _map = {}
- for x12 in _plugins :
- _item = _plugins[x12]
- _found = list(set(_elements) & set(_item.keys()))
- if _found :
-
- _map[x12] = {key:_item[key] for key in _found }
- return _map
- def getTableName(**_args) :
- _plugins = _args['plugins']
- _meta = _args['meta']
- _x12 = _meta['x12']
- _foreignkeys = _args['tableKeys']
- _attributes = list(_meta['map'].values()) if 'map' in _meta else _meta['columns']
- if 'field' in _meta or 'container' in _meta:
- _tableName = _meta['field'] if 'field' in _meta else _meta['container']
-
- # _table = {_id:_attributes}
-
- elif 'anchor' in _meta :
- _tableName = _meta['anchor'].values()
- # for _name in _meta['anchor'].values() :
- # _table[_name] = _attributes
- elif 'parent' in _meta and _meta['parent']:
- #
- # We can have a parent with no field/container/anchor
- # We expect either a map or columns ...
- #
- _parentElement = _meta['parent']
- _parentMeta = _plugins[_x12][_parentElement].meta
- _parentTable = getTableName(plugins=_plugins,meta = _parentMeta,tableKeys=_foreignkeys)
- _tableName = list(_parentTable.keys())[0]
- # _table[_id] = _parentTable[_id] + _attributes
- _attributes = _parentTable[_tableName] + _attributes
- # print (_meta)
- else:
- #
- # baseline tables have no parent, we need to determine the name
- #
- _tableName = 'claims' if _x12 == '837' else 'remits'
- # print (_id,_attributes)
-
- pass
- #
- # Are there any anchors
- if _x12 == '837':
- _keys = [_foreignkeys['claims']]
- elif _x12 == '835' :
- _keys = [_foreignkeys['remits']]
- else:
- _keys = list(set(_foreignkeys.values()))
- _attr = []
- for _item in _attributes :
- if type(_item) == list :
- _attr += _item
- else:
- _attr.append(_item)
- _keys = list(set(_keys) - set(_attr))
- _attr = _keys + _attr
- # if 'container' in _meta and _meta['container'] == 'procedures' :
- # print (_attributes)
- _tableName = [_tableName] if type(_tableName) == str else _tableName
- return dict.fromkeys(_tableName,_attr)
- def _getTableName (**_args):
- """
- This function provides a list of attributes associated with an entity
- The function infers a relational structure from the JSON representation of a claim and plugin specifications
- """
- _meta = _args['meta']
- _xattr = list(_meta['map'].values()) if 'map' in _meta else _meta['columns']
- _plugins = _args['plugins']
- _foreignkeys = _args['tableKeys']
- #
- # Fix attributes, in case we have an index associated with multiple fields
- #
- _attr = []
- if 'anchor' not in _meta and not _meta['parent']:
- for _item in _xattr :
- _attr += _item if type(_item) == list else [_item]
- _name = None
- _info = {}
- _infoparent = {}
- if 'field' in _meta :
- _name = _meta['field']
- elif 'container' in _meta :
- _name = _meta['container']
- elif 'anchor' in _meta :
-
- _name = list(_meta['anchor'].values())
- # if _name :
- # _name = _name if type(_name) == list else [_name]
- # _info = dict.fromkeys(_name,_attr)
- if _meta['parent'] :
- _parentElement = filter(elements=[_meta['parent']],plugins=_plugins)
- _x12 = list(_parentElement.keys())[0]
- _id = list(_parentElement[_x12].keys())[0]
- _infoparent = getTableName(meta = _parentElement[_x12][_id].meta,plugins=_plugins,tableKeys=_foreignkeys)
-
- if _meta['x12'] == '*' :
- _name = ['claims','remits'] if not _name else _name
- _attr = list(set(_foreignkeys.values())) + _attr
- else:
- _name = 'claims' if _meta['x12'] == '837' and not _name else ('remits' if not _name and _meta['x12'] == '835' else _name)
- _id = 'claims' if _meta['x12'] == '837' else 'remits'
- if _id in _foreignkeys:
- _attr = [_foreignkeys[_id]] + _attr
-
-
- # if not _name :
- # if _meta['x12'] == '*' :
- # _name = ['claims','remits']
- # else:
- # _name = 'claims' if _meta['x12'] == '837' else 'remits'
- #
- # Let us make sure we can get the keys associated here ...
- #
- # filter (elements = [])
- _name = _name if type(_name) == list else [_name]
- _info = dict.fromkeys(_name,_attr)
- if _infoparent:
- _info = dict(_info,**_infoparent)
-
- return _info
-
-
- def getTableKeys(**_args):
- _plugins=_args['plugins']
- _pointer = filter(elements=['CLM'],plugins=_plugins)
- _keys = {}
- for _element in ['CLM','CLP'] :
- _pointer = filter(elements=[_element],plugins=_plugins)
- if not _pointer :
- continue
- _pointer = list(_pointer.values())[0]
- _meta = _pointer[_element].meta
- _name = _meta['map'][1] if 'map' in _meta else _meta['columns'][0]
- _id = 'claims' if _element == 'CLM' else 'remits'
- _keys[_id] = _name
- return _keys
- # print (list(_pointer.values())[0]['CLM'].meta)
- # print (_pointer.values()[0].meta)
- def sql (**_args):
- _plugins = _args['plugins']
- # _info = {'foreign':{},'keys':{'claims':None,'remits':None}}
- _documentHandler = x12.util.document.Builder(plugins=_plugins,parents=_args['parents'])
- _tableKeys = getTableKeys(plugins=_plugins)
- _schemas = {}
- for key in _plugins :
- _mpointers = _plugins[key]
- for _element in _mpointers :
- _pointer = _mpointers[_element]
- _meta = _pointer.meta
- _info = getTableName(meta=_meta,plugins=_plugins,tableKeys=_tableKeys)
- # _schemas = dict(_schemas,**_info)
- if _info :
- _schemas = _documentHandler.merge(_schemas,_info)
- # print (_info)
- return _schemas
- # if not _info :
- # print (_meta)
- # continue
- # if _meta['x12'] in ['837','837'] :
- # _schema_id = 'claims' if _meta['x12'] == '837' else 'remits'
- # _schema_id = [_schema_id]
- # else:
- # _schema_id = ['claims','remits']
- # if _info :
- # #
- # # foreign tables need to be placed here
-
- # for _id in _schema_id :
- # if type(_info) == list :
- # _schemas[_id]['attributes'] += _info
- # else:
- # _schemas[_id]['foreign'] = dict(_schemas[_id]['foreign'],**_info)
- # else:
- # #
- # # This one goes to the main tables
- # for _id in _schema_id :
- # print (_info)
- # _schemas[_id]['attributes'] += list(_info.values())
-
-
-
- # DEFAULT_PLUGINS='healthcareio.x12.plugins.default'
- # class MODE :
- # TRUST,CHECK,TEST,TEST_AND_CHECK= [0,1,2,3]
- # def instance(**_args) :
- # pass
- # def has(**_args) :
- # """
- # This function will inspect if a function is valid as a plugin or not
- # name : function name for a given file
- # path : python file to examine
- # """
- # _pyfile = _args['path'] if 'path' in _args else ''
- # _name = _args['name']
- # # p = os.path.exists(_pyfile)
- # _module = {}
- # if os.path.exists(_pyfile):
- # _info = IL.utils.spec_from_file_location(_name,_pyfile)
- # if _info :
- # _module = IL.utils.module_from_spec(_info)
- # _info.load.exec(_module)
-
-
- # else:
- # _module = sys.modules[DEFAULT_PLUGINS]
- # return hasattr(_module,_name)
- # def get(**_args) :
- # """
- # This function will inspect if a function is valid as a plugin or not
- # name : function name for a given file
- # path : python file to examine
- # """
- # _pyfile = _args['path'] if 'path' in _args else ''
- # _name = _args['name']
- # # p = os.path.exists(_pyfile)
- # _module = {}
- # if os.path.exists(_pyfile):
- # _info = IL.utils.spec_from_file_location(_name,_pyfile)
- # if _info :
- # _module = IL.utils.module_from_spec(_info)
- # _info.load.exec(_module)
-
-
- # else:
- # _module = sys.modules[DEFAULT_PLUGINS]
- # return getattr(_module,_name) if hasattr(_module,_name) else None
- # def test (**_args):
- # """
- # This function will test a plugin to insure the plugin conforms to the norm we are setting here
- # :pointer function to call
- # """
- # _params = {}
- # try:
- # if 'pointer' in _args :
- # _caller = _args['pointer']
- # else:
- # _name = _args['name']
- # _path = _args['path'] if 'path' in _args else None
- # _caller = get(name=_name,path=_path)
- # _params = _caller()
- # #
- # # the expected result is a list of field names [field_o,field_i]
- # #
- # return [_item for _item in _params if _item not in ['',None] and type(_item) == str]
- # except Exception as e :
- # return []
- # pass
- # def inspect(**_args):
- # _mode = _args['mode']
- # _name= _args['name']
- # _path= _args['path']
- # if _mode == MODE.CHECK :
- # _doapply = [has]
- # elif _mode == MODE.TEST :
- # _doapply = [test]
- # elif _mode == MODE.TEST_AND_CHECK :
- # _doapply = [has,test]
- # _status = True
- # _plugin = {"name":_name}
- # for _pointer in _doapply :
- # _plugin[_pointer.__name__] = _pointer(name=_name,path=_path)
- # if not _plugin[_pointer.__name__] :
- # _status = False
- # break
- # _plugin['loaded'] = _status
- # return _plugin
- # def load(**_args):
- # """
- # This function will load all the plugins given an set of arguments :
- # path file
- # name list of functions to export
- # mode 1- CHECK ONLY, 2 - TEST ONLY, 3- TEST_AND_CHECK
- # """
- # _path = _args ['path']
- # _names= _args['names']
- # _mode= _args ['mode'] if 'mode' in _args else MODE.TEST_AND_CHECK
- # _doapply = []
- # if _mode == MODE.CHECK :
- # _doapply = [has]
- # elif _mode == MODE.TEST :
- # _doapply = [test]
- # elif _mode == MODE.TEST_AND_CHECK :
- # _doapply = [has,test]
- # # _plugins = []
- # _plugins = {}
- # for _name in _names :
- # _plugin = {"name":_name}
- # if 'inspect' in _args and _args['inspect'] :
- # _plugin = inspect(name=_name,mode=_mode,path=_path)
- # else:
- # _plugin["method"] = ""
- # _status = True
- # _plugin['loaded'] = _status
- # if _plugin['loaded'] :
- # _plugin['pointer'] = get(name=_name,path=_path)
- # else:
- # _plugin['pointer'] = None
- # # _plugins.append(_plugin)
- # _plugins[_name] = _plugin
- # return _plugins
- # def parse(**_args):
- # """
- # This function will apply a function against a given function, and data
- # :row claim/remits pre-processed
- # :plugins list of plugins
- # :conifg configuration associated with
- # """
- # _row = _args['row']
- # _document = _args['document']
- # _config = _args['config']
- # """
- # "apply":"@path:name"
- # """
-
- # _info = _args['config']['apply']
- # _plug_conf = _args['config']['plugin'] if 'plugin' in _args['config'] else {}
- # if _info.startswith('@') :
- # _path = '' #-- get this from general configuration
- # elif _info.startswith('!'):
- # _path = _info.split('!')[0][1:]
- # _name = _info.split(':')[-1]
- # _name = _args['config']['apply'].split(_path)
|