123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501 |
- from typing import Any
- import numpy as np
- import json
- from multiprocessing import Process, RLock
- import os
- import io
- import queue
- import transport
- from transport import providers
- class Store(Process):
- """
- This is the data-store service that will handle read/writes
- """
- dataStore = None
- @staticmethod
- def init(self,**_args):
- if Store.dataStore is None :
- _args = _args['store']
-
- else:
- pass
- @staticmethod
- def reset():
- pass
-
- class X12DOCUMENT (Process):
- """
- X12DOCUMENT class encapsulates functions that will be used to format an x12 (835,837) claim into an object
- """
- _queue = queue.Queue()
-
- class MODE :
- #
- # The following allow us to handle raw content (stream) or a filename
- # The raw content will be wrapped into io.StringIO so that it is handled as if it were a file
- #
- NAMES,STREAM = 'NAMES','STREAM'
- class ConfigHandler :
- def format(self,**_args):
- """
- This function formats variations of an element's parsing rules
- :info {index,field|label,map}
- """
-
- _info = _args['info']
- _ref = {}
-
- for _item in _info :
- _index = str(_item['index'])
- _field = _item['field'] if 'field' in _item else None
- _label = _item['label'] if 'label' in _item else None
- if _field :
- _ref[_index] = {'field':_field}
- elif _label :
- _ref[_index] = {'label':_label}
-
- return {'@ref':_ref}
- def _getColumnsIndexes(self,_columns,_indexes,_map):
- """
- This function return columns and indexes related if a parsing map is passed
- :param _columns
- :param _indexes
- :param _map parsing map (field:index)
- """
- # @TODO: insure the lengths are the same for adequate usage downstream ...
- _xcolumns,_xindexes = list(_map.keys()), list(_map.values())
- keys,values = _xcolumns + _columns,_xindexes + _indexes
- _config = dict(zip(keys,values))
-
- _outColumns,_outIndexes = list(_config.keys()),list(_config.values())
- return _outColumns,_outIndexes
- def _getObjectAtributes(self,_config):
- _field = _config['field'] if 'field' in _config else {}
- _label = _config['label'] if 'label' in _config else {}
- return _field,_label
- def consolidate(self,**_args):
- #
- # This function overrides the old configuration with the new configuration specifications
- #
-
- # _columns,_indexes = [],[]
- _columns,_indexes = _args['columns'],_args['index']
- _map = {}
- _config = _args['config'] if 'config' in _args else {}
- _field,_label = self._getObjectAtributes(_config)
-
- if 'map' in _config :
- _map = _args['config']['map']
- _columns,_indexes = self._getColumnsIndexes(_columns,_indexes,_map)
-
- if '@ref' in _config :
- # _columns,_indexes = [],[]
- _row = _args['row']
-
- _ref = _config['@ref']
-
- for _anchor in _ref:
-
- if _anchor == _row[1].strip() :
- _field,_label = self._getObjectAtributes(_ref[_anchor])
-
- _map = _ref[_anchor]['map'] if 'map' in _ref[_anchor] else {}
-
- if _map :
- _columns,_indexes = self._getColumnsIndexes([],[],_map)
- else:
- # print ([_anchor,_indexes,_columns])
- _map = dict(zip(_columns,_indexes))
- pass
-
- break
- # _columns,_indexes = _columns + _map.keys()
-
- _out = {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
- return _out
- def legacy(self,**_args):
- #
- # This function returns the legacy configuration (default parsing)
- #
-
- _config = _args['config'] if 'config' in _args else {}
- _field,_label = self._getObjectAtributes(_config)
- _columns,_indexes = [],[]
- if 'map' in _config :
- _columns = list(_config['map'].keys())
- _indexes = list(_config['map'].values())
-
- return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
- def override(self,**_args):
- return _args['columns'],_args['indexes']
- def __init__(self,**_args):
- super().__init__()
- self._mode = _args['mode'] if 'mode' in _args else 'NAMES'
- self.lastelement = {} # This to store in the loop
- if 'files' in _args :
- self.files = _args['files']
- self._config = _args['config'] if 'config' in _args else {}
-
- #
- # NM1 is a fluid type and thus will be cached in order to recreate the hierarchy
- # @TODO:
- # -add this to the configuration
- #
- self._hierarchy = {'NM1':['N1','N2','N3','N4']}
- self._document = []
-
- self._x12FileType = None
- self._configHandler = X12DOCUMENT.ConfigHandler()
- #
- #-- The files need to be classified, the files need to be either claims or remits
- #
- if 'store' not in self._config :
- self._store_args = _args['store'] if 'store' in _args else {'provider':providers.CONSOLE}
- else:
- self._store_args = self._config['store']
-
- def init(self,_header):
- """
- Expected Elements must include ST
- """
- pass
- def merge (self,_x,_y):
- """
- This function will merge two objects _x, _y
- """
- _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
-
- if _zcols :
- _out = dict(_x,**{})
- for _key in _y.keys() :
- if not _key in _zcols :
-
- _out[_key] = _y[_key]
- else:
- if type(_out[_key]) == list :
- _out[_key] += _y[_key]
- elif type(_out[_key]) == dict:
- _out[_key] = dict(_out[_key],**_y[_key])
- else:
- _out[_key] = _y[_key]
-
- return _out
- else:
-
- return dict(_x,**_y)
-
- def split(self,content):
- """
- This function will split the content of an X12 document into blocks and headers
- :content x12 document in raw format (text)
- """
- #_content = content.split('~')
- _content = content.split('HL')
- _header = _content[:1][0].split('~')
-
- _blocks = ['HL'+_item for _item in _content[1:]]
- _blocks = [_item.split('~') for _item in _blocks ]
-
- # for row in _content :
- # if not _blocks and not row.startswith('HL') :
- # _header.append(row)
- # else:
- # _blocks.append(row)
- return {'header':_header,'blocks':_blocks}
- def parse (self,columns,index,**_args):
- """
- This function encapulates how an x12 document element will be processed
- :columns list of attributes that make up the object
- :index indexes of the said items in the element
- :_args
- - row raw x12 element (string)
- - config configuration of the element. his should indicate functions to apply against function
- """
- _ELEMENT = _args['row'][0]
- #
- # get the right configuration from the _config object
- _config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {}
-
- # _field = _config['field'] if 'field' in _config else None
- # _label = _config['label'] if 'label' in _config else None
- _map = _config['map'] if 'map' in _config else {}
- #
- # Let's see if overriding the fields/labels isn't necessary
-
-
- # columns, index,_refField,_refLabel = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
- # _field = _field if not _refField else _refField
- # _label = _label if not _refLabel else _refLabel
-
- #
- # @TODO:
- # There should be a priority in parsing i.e plugin - config
- #
- if 'plugin-context' in _args :
- _config = _args['plugin-context'] #dict(_config,**_args['plugin-context'])
-
- _outInfo = self._configHandler.consolidate(row=_args['row'],columns=columns,index=index,config=_config)
-
-
- _field,_label = _outInfo['field'],_outInfo['label']
- _columns,_index = _outInfo['columns'],_outInfo['index']
-
-
- if 'row' in _args:
- _row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
-
- _index = np.array(_index)
-
- #
- # Sometimes the _row doesn't have all expected indexes, we will compensate
- # This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format)
- #
- if np.max(_index) > len(_row) -1 :
- _delta = 1 + np.max(_index) - len(_row)
- _row = _row + np.repeat('',_delta).tolist()
- _row = np.array(_row)
-
- # _element = _row[0]
- # _configKeys = [] #list(self._config.keys())
- # _configTree = [] #list(self._config.values())
- # if 'config' in _args :
- # _config = _args['config']
- # _configKeys = list(_config.keys())
- # _configTree = list(_config.values())
- # else:
- # _config = {}
-
- _info = dict(zip(_columns,_row[_index].tolist()))
- _document = _args['document'] if 'document' in _args else {}
-
- #
- # @TODO:
- # Apply parsing/casting function to the object retrieved
- # _apply(_info) #-- the object will be processed accordingly
- #
-
- #
- # @TODO:
- # The objects parsed must be augmented against the appropriate ones e.g: NM1 <- N1,N2,N3,N4
- # - Find a way to drive this from a configuration ...
- #
- if _field :
- if not _field in _document :
- _item = {_field:_info}
- else:
- _item = self.merge(_document[_field],_info)
- elif _label :
- if not _label in _document :
- _item = {_label:[_info]}
- else:
- _item = _document[_label] + [_info]
- else:
- _item = _info
-
- if _ELEMENT in self._hierarchy and _field:
- # print ([_field,_item])
- self.lastelement = _item
- pass
- else:
- for key in self._hierarchy :
- if _ELEMENT in self._hierarchy[key] :
-
- _ikey = list(self.lastelement.keys())[0]
- _oldinfo = self.lastelement[_ikey]
- _item = {_ikey: self.merge(_oldinfo,_item)}
-
- break
- return _item
- else:
- #
- #
- print (_config)
- return columns
- def elements(self):
- """
- This function returns elements that are supported as specified by X12 standard
- """
- return [_name for _name in dir(self) if not _name.startswith('_') and not _name.islower() ]
- def pointers(self):
- """
- This function returns pointers associated with each element ...
- :return Object of Element:Function
- """
- _attr = self.elements()
- _pointers = [getattr(self,_name) for _name in _attr]
- return dict(zip(_attr,_pointers))
- def set(self,_info,_document,_config):
- _attrName,_attrType = None,None
-
- if 'label' in _config :
- _attrType = 'label'
- _attrName = _config['label']
- elif 'field' in _config :
- _attrType = 'field'
- _attrName = _config['field']
-
- if _attrName :
- if _attrName not in _document :
- _document[_attrName] = [] if _attrType == 'label' else {}
-
- #
- # @TODO: make sure we don't have a case of an attribute being overridden
- if type(_document[_attrName]) == list :
- _document[_attrName] += [_info]
- else:
- _document[_attrName] = dict(_document[_attrName],**_info)
- # _document[_attrName] += [_info] if _attrType == 'label' else dict(_document[_attrName],**_info)
-
- return _document
-
- return dict(_document,**_info)
- pass
- def log (self,**_args):
- pass
- def run(self):
- """
- This function will trigger the workflow associated with a particular file
- """
- _getContent = {
- #
- # For the sake of testing, the following insures
- # that raw string content is handled as if it were a file
- #
- X12DOCUMENT.MODE.STREAM: (lambda stream : io.StringIO(stream)) ,
- X12DOCUMENT.MODE.NAMES: (lambda name: open(name))
-
- }
- _writer = transport.factory.instance(**self._store_args)
- for _filename in self.files :
- try:
- _documents = []
- _parts = []
- # _content = (open(_filename)).read()
- _reader = _getContent[self._mode]
- _content = _reader(_filename).read()
- _info = self.split(_content)
- _fileType=self.init(_content)
- _header = self.apply(_info['header'])
-
- # print (json.dumps(_header))
- _tmp = {}
- for _content in _info['blocks'] :
-
- _body = self.apply(_content,header=_header)
-
- _doc = self.merge(_header,_body)
-
- if _doc and 'claim_id' in _doc:
- # X12DOCUMENT._queue.put(_document)
-
- _documents += [self.merge(_tmp,_doc)]
- _tmp = {}
- else:
- #
- # The document is being built and not yet ready
- _tmp = self.merge(_tmp,_doc)
-
- except Exception as e:
- #
- # @TODO: Log this issue for later analysis ...
- print (e)
- pass
- #
- # Let us post this to the documents we have, we should find a place to post it
- #
- if _documents :
- # print (_header['header'])
-
- self.post(document=_documents,writer=_writer)
- break
-
- def post(self,**_args):
- """
- This function is intended to post content to a given location
- :param document
- :param writer
- """
- _writer = _args['writer'] if 'writer' in _args else None
- _document = _args['document']
- if not _writer:
- X12DOCUMENT._queue.put(_document)
- else:
-
- _writer.write(_document)
- def _getConfig(self,_chunk):
- #
- # Let us determine what kind of file we are dealing with, so we can extract the configuration
- # For this we need to look for the ST loop ...
- #
-
- line = [line for line in _chunk if line and line[:2] == 'ST' ]
-
- if line :
- #
- # We found the header of the block, so we can set the default configuration
- #
- self._x12FileType = line[0].split('*')[1].strip()
- _config = {}
- if self._x12FileType :
- _config = self._config[self._x12FileType]
-
- return _config
-
- def apply(self,_chunk, header = {}):
- """
- _chunks are groups of elements split by HL, within each chunk are x12 loops HL,CLM,ISA
- """
-
-
- _document,_cached = {},{}
- _pointers = self.pointers()
- _config = self._getConfig(_chunk)
- #
- # The configuration comes from the file, let's run this in merge mode
- # _config = self._configHandler.merge
- _pid = None
- for line in _chunk :
-
- segments = line.split('*')
-
- _ELEMENT = segments[0]
-
- if _ELEMENT not in _pointers or not _ELEMENT:
- continue
- if _ELEMENT in ['HL','CLM','ISA'] or not _pid:
- _pid = _ELEMENT
- if _pid not in _cached :
- _cached [_pid] = {}
-
- _pointer = _pointers[_ELEMENT]
-
- _args = {'row':segments,'document':_document,'header':header,'config':(_config)}
-
-
- _parsedLine = _pointer(**_args)
- # print ([_pid,_ELEMENT,_parsedLine])
-
- _cached[_pid] = self.merge(_cached[_pid],_parsedLine)
-
-
- #
- # Let's create the documents as we understand them to be
- # @TODO: Create a log so there can be visibility into the parser
- #
- _document = {}
- for _id in _cached :
- # print ('patient' in _cached[_id] )
-
- _document = self.merge(_document,_cached[_id])
-
- return _document
-
-
|