|
@@ -1,456 +0,0 @@
|
|
|
-from typing import Any
|
|
|
-import numpy as np
|
|
|
-import json
|
|
|
-from multiprocessing import Process, RLock
|
|
|
-import os
|
|
|
-import io
|
|
|
-import queue
|
|
|
-import transport
|
|
|
-from transport import providers
|
|
|
-
|
|
|
-class Store(Process):
|
|
|
- """
|
|
|
- This is the data-store service that will handle read/writes
|
|
|
- """
|
|
|
- dataStore = None
|
|
|
- @staticmethod
|
|
|
- def init(self,**_args):
|
|
|
- if Store.dataStore is None :
|
|
|
- _args = _args['store']
|
|
|
-
|
|
|
- else:
|
|
|
- pass
|
|
|
- @staticmethod
|
|
|
- def reset():
|
|
|
- pass
|
|
|
-
|
|
|
-class X12DOCUMENT (Process):
|
|
|
- """
|
|
|
- X12DOCUMENT class encapsulates functions that will be used to format an x12 (835,837) claim into an object
|
|
|
- """
|
|
|
- _queue = queue.Queue()
|
|
|
-
|
|
|
- class MODE :
|
|
|
- #
|
|
|
- # The following allow us to handle raw content (stream) or a filename
|
|
|
- # The raw content will be wrapped into io.StringIO so that it is handled as if it were a file
|
|
|
- #
|
|
|
- NAMES,STREAM = 'NAMES','STREAM'
|
|
|
- class ConfigHandler :
|
|
|
- def format(self,**_args):
|
|
|
- """
|
|
|
- This function formats variations of an element's parsing rules
|
|
|
- :info {index,field|label,map}
|
|
|
- """
|
|
|
-
|
|
|
- _info = _args['info']
|
|
|
- _ref = {}
|
|
|
-
|
|
|
- for _item in _info :
|
|
|
- _index = str(_item['index'])
|
|
|
- _field = _item['field'] if 'field' in _item else None
|
|
|
- _label = _item['label'] if 'label' in _item else None
|
|
|
- if _field :
|
|
|
- _ref[_index] = {'field':_field}
|
|
|
- elif _label :
|
|
|
- _ref[_index] = {'label':_label}
|
|
|
-
|
|
|
- return {'@ref':_ref}
|
|
|
- def _getColumnsIndexes(self,_columns,_indexes,_map):
|
|
|
- """
|
|
|
- This function return columns and indexes related if a parsing map is passed
|
|
|
- :param _columns
|
|
|
- :param _indexes
|
|
|
- :param _map parsing map (field:index)
|
|
|
- """
|
|
|
- # @TODO: insure the lengths are the same for adequate usage downstream ...
|
|
|
- _xcolumns,_xindexes = list(_map.keys()), list(_map.values())
|
|
|
- keys,values = _xcolumns + _columns,_xindexes + _indexes
|
|
|
- _config = dict(zip(keys,values))
|
|
|
-
|
|
|
- _outColumns,_outIndexes = list(_config.keys()),list(_config.values())
|
|
|
- return _outColumns,_outIndexes
|
|
|
- def _getObjectAtributes(self,_config):
|
|
|
- _field = _config['field'] if 'field' in _config else {}
|
|
|
- _label = _config['label'] if 'label' in _config else {}
|
|
|
- return _field,_label
|
|
|
- def merge(self,**_args):
|
|
|
- #
|
|
|
- # This function overrides the old configuration with the new configuration specifications
|
|
|
- #
|
|
|
-
|
|
|
- # _columns,_indexes = [],[]
|
|
|
- _columns,_indexes = _args['columns'],_args['index']
|
|
|
- _map = {}
|
|
|
- _config = _args['config'] if 'config' in _args else {}
|
|
|
- _field,_label = self._getObjectAtributes(_config)
|
|
|
-
|
|
|
- if 'map' in _config :
|
|
|
- _map = _args['config']['map']
|
|
|
- _columns,_indexes = self._getColumnsIndexes(_columns,_indexes,_map)
|
|
|
-
|
|
|
- if '@ref' in _config :
|
|
|
- # _columns,_indexes = [],[]
|
|
|
- _row = _args['row']
|
|
|
-
|
|
|
- _ref = _config['@ref']
|
|
|
-
|
|
|
- for _anchor in _ref:
|
|
|
- # print ([_anchor,_anchor == _row[1].strip()])
|
|
|
- if _anchor == _row[1].strip() :
|
|
|
- _field,_label = self._getObjectAtributes(_ref[_anchor])
|
|
|
-
|
|
|
- _map = _ref[_anchor]['map'] if 'map' in _ref[_anchor] else {}
|
|
|
-
|
|
|
- if _map :
|
|
|
- _columns,_indexes = self._getColumnsIndexes([],[],_map)
|
|
|
-
|
|
|
-
|
|
|
- break
|
|
|
- # _columns,_indexes = _columns + _map.keys()
|
|
|
-
|
|
|
- return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
|
|
|
- def legacy(self,**_args):
|
|
|
- #
|
|
|
- # This function returns the legacy configuration (default parsing)
|
|
|
- #
|
|
|
-
|
|
|
- _config = _args['config'] if 'config' in _args else {}
|
|
|
- _field,_label = self._getObjectAtributes(_config)
|
|
|
- _columns,_indexes = [],[]
|
|
|
- if 'map' in _config :
|
|
|
- _columns = list(_config['map'].keys())
|
|
|
- _indexes = list(_config['map'].values())
|
|
|
-
|
|
|
- return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
|
|
|
- def override(self,**_args):
|
|
|
- return _args['columns'],_args['indexes']
|
|
|
- def __init__(self,**_args):
|
|
|
- super().__init__()
|
|
|
- self._mode = _args['mode'] if 'mode' in _args else 'NAMES'
|
|
|
- if 'files' in _args :
|
|
|
- self.files = _args['files']
|
|
|
- self._config = _args['config'] if 'config' in _args else {}
|
|
|
- self._document = []
|
|
|
-
|
|
|
- self._x12FileType = None
|
|
|
- self._configHandler = X12DOCUMENT.ConfigHandler()
|
|
|
-
|
|
|
- #
|
|
|
- #-- The files need to be classified, the files need to be either claims or remits
|
|
|
- #
|
|
|
- if 'store' not in self._config :
|
|
|
- self._store_args = _args['store'] if 'store' in _args else {'provider':providers.CONSOLE}
|
|
|
- else:
|
|
|
- self._store_args = self._config['store']
|
|
|
-
|
|
|
- def init(self,_header):
|
|
|
- """
|
|
|
- Expected Elements must include ST
|
|
|
- """
|
|
|
- pass
|
|
|
-
|
|
|
- def merge (self,_x,_y):
|
|
|
- """
|
|
|
- This function will merge two objects _x, _y
|
|
|
- """
|
|
|
- _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
|
|
|
-
|
|
|
- if _zcols :
|
|
|
- _out = dict(_x,**{})
|
|
|
- for _key in _y.keys() :
|
|
|
- if not _key in _zcols :
|
|
|
-
|
|
|
- _out[_key] = _y[_key]
|
|
|
- else:
|
|
|
- if type(_out[_key]) == list :
|
|
|
- _out[_key] += _y[_key]
|
|
|
- elif type(_out[_key]) == dict:
|
|
|
- _out[_key] = dict(_out[_key],**_y[_key])
|
|
|
- else:
|
|
|
- _out[_key] = _y[_key]
|
|
|
-
|
|
|
- return _out
|
|
|
- else:
|
|
|
-
|
|
|
- return dict(_x,**_y)
|
|
|
-
|
|
|
- def split(self,content):
|
|
|
- """
|
|
|
- This function will split the content of an X12 document into blocks and headers
|
|
|
- :content x12 document in raw format (text)
|
|
|
- """
|
|
|
- #_content = content.split('~')
|
|
|
- _content = content.split('HL')
|
|
|
- _header = _content[:1][0].split('~')
|
|
|
-
|
|
|
- _blocks = ['HL'+_item for _item in _content[1:]]
|
|
|
- _blocks = [_item.split('~') for _item in _blocks ]
|
|
|
-
|
|
|
- # for row in _content :
|
|
|
- # if not _blocks and not row.startswith('HL') :
|
|
|
- # _header.append(row)
|
|
|
- # else:
|
|
|
- # _blocks.append(row)
|
|
|
- return {'header':_header,'blocks':_blocks}
|
|
|
- def parse (self,columns,index,**_args):
|
|
|
- """
|
|
|
- This function encapulates how an x12 document element will be processed
|
|
|
- :columns list of attributes that make up the object
|
|
|
- :index indexes of the said items in the element
|
|
|
- :_args
|
|
|
- - row raw x12 element (string)
|
|
|
- - config configuration of the element. his should indicate functions to apply against function
|
|
|
- """
|
|
|
- _ELEMENT = _args['row'][0]
|
|
|
- #
|
|
|
- # get the right configuration from the _config object
|
|
|
- _config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {}
|
|
|
-
|
|
|
- # _field = _config['field'] if 'field' in _config else None
|
|
|
- # _label = _config['label'] if 'label' in _config else None
|
|
|
- _map = _config['map'] if 'map' in _config else {}
|
|
|
- #
|
|
|
- # Let's see if overriding the fields/labels isn't necessary
|
|
|
-
|
|
|
-
|
|
|
- # columns, index,_refField,_refLabel = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
|
|
|
- # _field = _field if not _refField else _refField
|
|
|
- # _label = _label if not _refLabel else _refLabel
|
|
|
-
|
|
|
- _outInfo = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
|
|
|
-
|
|
|
- _field,_label = _outInfo['field'],_outInfo['label']
|
|
|
- _columns,_index = _outInfo['columns'],_outInfo['index']
|
|
|
-
|
|
|
-
|
|
|
- if 'row' in _args:
|
|
|
- _row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
|
|
|
-
|
|
|
- _index = np.array(_index)
|
|
|
-
|
|
|
- #
|
|
|
- # Sometimes the _row doesn't have all expected indexes, we will compensate
|
|
|
- # This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format)
|
|
|
- #
|
|
|
- if np.max(_index) > len(_row) -1 :
|
|
|
- _delta = 1 + np.max(_index) - len(_row)
|
|
|
- _row = _row + np.repeat('',_delta).tolist()
|
|
|
- _row = np.array(_row)
|
|
|
-
|
|
|
- # _element = _row[0]
|
|
|
-
|
|
|
- _configKeys = [] #list(self._config.keys())
|
|
|
- _configTree = [] #list(self._config.values())
|
|
|
- if 'config' in _args :
|
|
|
- _config = _args['config']
|
|
|
- _configKeys = list(_config.keys())
|
|
|
- _configTree = list(_config.values())
|
|
|
- else:
|
|
|
- _config = {}
|
|
|
-
|
|
|
- _info = dict(zip(_columns,_row[_index].tolist()))
|
|
|
- _document = _args['document'] if 'document' in _args else {}
|
|
|
- #
|
|
|
- # Extracting configuration (minimal information)
|
|
|
- # _config = _args['config'] if 'config' in _args else {}
|
|
|
- # _config = self._config
|
|
|
-
|
|
|
-
|
|
|
- # if '@ref' in _config :
|
|
|
- # print (_config['@ref'])
|
|
|
- # _values = _config['@ref']
|
|
|
- # print (_values)
|
|
|
-
|
|
|
- if _field :
|
|
|
- if not _field in _document :
|
|
|
- return {_field:_info}
|
|
|
- else:
|
|
|
- return self.merge(_document[_field],_info)
|
|
|
- elif _label :
|
|
|
- if not _label in _document :
|
|
|
- return {_label:[_info]}
|
|
|
- else:
|
|
|
- return _document[_label] + [_info]
|
|
|
- else:
|
|
|
- return _info
|
|
|
-
|
|
|
- else:
|
|
|
- return columns
|
|
|
- def elements(self):
|
|
|
- """
|
|
|
- This function returns elements that are supported as specified by X12 standard
|
|
|
- """
|
|
|
- return [_name for _name in dir(self) if not _name.startswith('_') and not _name.islower() ]
|
|
|
- def pointers(self):
|
|
|
- """
|
|
|
- This function returns pointers associated with each element ...
|
|
|
- :return Object of Element:Function
|
|
|
- """
|
|
|
- _attr = self.elements()
|
|
|
- _pointers = [getattr(self,_name) for _name in _attr]
|
|
|
- return dict(zip(_attr,_pointers))
|
|
|
-
|
|
|
- def set(self,_info,_document,_config):
|
|
|
- _attrName,_attrType = None,None
|
|
|
-
|
|
|
- if 'label' in _config :
|
|
|
- _attrType = 'label'
|
|
|
- _attrName = _config['label']
|
|
|
- elif 'field' in _config :
|
|
|
- _attrType = 'field'
|
|
|
- _attrName = _config['field']
|
|
|
-
|
|
|
- if _attrName :
|
|
|
- if _attrName not in _document :
|
|
|
- _document[_attrName] = [] if _attrType == 'label' else {}
|
|
|
-
|
|
|
- #
|
|
|
- # @TODO: make sure we don't have a case of an attribute being overridden
|
|
|
- if type(_document[_attrName]) == list :
|
|
|
- _document[_attrName] += [_info]
|
|
|
- else:
|
|
|
- _document[_attrName] = dict(_document[_attrName],**_info)
|
|
|
- # _document[_attrName] += [_info] if _attrType == 'label' else dict(_document[_attrName],**_info)
|
|
|
-
|
|
|
- return _document
|
|
|
-
|
|
|
- return dict(_document,**_info)
|
|
|
-
|
|
|
- pass
|
|
|
- def log (self,**_args):
|
|
|
- pass
|
|
|
- def run(self):
|
|
|
- """
|
|
|
- This function will trigger the workflow associated with a particular file
|
|
|
- """
|
|
|
- _getContent = {
|
|
|
- #
|
|
|
- # For the sake of testing, the following insures
|
|
|
- # that raw string content is handled as if it were a file
|
|
|
- #
|
|
|
- X12DOCUMENT.MODE.STREAM: (lambda stream : io.StringIO(stream)) ,
|
|
|
- X12DOCUMENT.MODE.NAMES: (lambda name: open(name))
|
|
|
-
|
|
|
-
|
|
|
- }
|
|
|
- _writer = transport.factory.instance(**self._store_args)
|
|
|
- for _filename in self.files :
|
|
|
- try:
|
|
|
- _documents = []
|
|
|
- _parts = []
|
|
|
- # _content = (open(_filename)).read()
|
|
|
- _reader = _getContent[self._mode]
|
|
|
- _content = _reader(_filename).read()
|
|
|
- _info = self.split(_content)
|
|
|
- _fileType=self.init(_content)
|
|
|
- _header = self.apply(_info['header'])
|
|
|
-
|
|
|
- # print (json.dumps(_header))
|
|
|
- for _content in _info['blocks'] :
|
|
|
-
|
|
|
- _body = self.apply(_content,header=_header)
|
|
|
- _doc = self.merge(_header,_body)
|
|
|
-
|
|
|
- if _doc and 'claim_id' in _doc:
|
|
|
- # X12DOCUMENT._queue.put(_document)
|
|
|
-
|
|
|
- _documents += [_doc]
|
|
|
-
|
|
|
-
|
|
|
- except Exception as e:
|
|
|
- #
|
|
|
- # @TODO: Log this issue for later analysis ...
|
|
|
- print (e)
|
|
|
- pass
|
|
|
- #
|
|
|
- # Let us post this to the documents we have, we should find a place to post it
|
|
|
- #
|
|
|
- if _documents :
|
|
|
- # print (_header['header'])
|
|
|
-
|
|
|
- self.post(document=_documents,writer=_writer)
|
|
|
- break
|
|
|
-
|
|
|
- def post(self,**_args):
|
|
|
- """
|
|
|
- This function is intended to post content to a given location
|
|
|
- :param document
|
|
|
- :param writer
|
|
|
- """
|
|
|
- _writer = _args['writer'] if 'writer' in _args else None
|
|
|
- _document = _args['document']
|
|
|
- if not _writer:
|
|
|
- X12DOCUMENT._queue.put(_document)
|
|
|
- else:
|
|
|
-
|
|
|
- _writer.write(_document)
|
|
|
- def _getConfig(self,_chunk):
|
|
|
- #
|
|
|
- # Let us determine what kind of file we are dealing with, so we can extract the configuration
|
|
|
- # For this we need to look for the ST loop ...
|
|
|
- #
|
|
|
-
|
|
|
- line = [line for line in _chunk if line and line[:2] == 'ST' ]
|
|
|
-
|
|
|
- if line :
|
|
|
- #
|
|
|
- # We found the header of the block, so we can set the default configuration
|
|
|
- #
|
|
|
- self._x12FileType = line[0].split('*')[1].strip()
|
|
|
- _config = {}
|
|
|
- if self._x12FileType :
|
|
|
- _config = self._config[self._x12FileType]
|
|
|
-
|
|
|
- return _config
|
|
|
-
|
|
|
- def apply(self,_chunk, header = {}):
|
|
|
- """
|
|
|
- _chunks are groups of elements split by HL, within each chunk are x12 loops HL,CLM,ISA
|
|
|
- """
|
|
|
-
|
|
|
-
|
|
|
- _document,_cached = {},{}
|
|
|
- _pointers = self.pointers()
|
|
|
- _config = self._getConfig(_chunk)
|
|
|
- #
|
|
|
- # The configuration comes from the file, let's run this in merge mode
|
|
|
- # _config = self._configHandler.merge
|
|
|
- _pid = None
|
|
|
- for line in _chunk :
|
|
|
-
|
|
|
- segments = line.split('*')
|
|
|
-
|
|
|
- _ELEMENT = segments[0]
|
|
|
-
|
|
|
- if _ELEMENT not in _pointers or not _ELEMENT:
|
|
|
- continue
|
|
|
- if _ELEMENT in ['HL','CLM','ISA'] or not _pid:
|
|
|
- _pid = _ELEMENT
|
|
|
- if _pid not in _cached :
|
|
|
- _cached [_pid] = {}
|
|
|
-
|
|
|
- _pointer = _pointers[_ELEMENT]
|
|
|
-
|
|
|
- _args = {'row':segments,'document':_document,'header':header,'config':(_config)}
|
|
|
-
|
|
|
-
|
|
|
- _parsedLine = _pointer(**_args)
|
|
|
- # print ([_pid,_ELEMENT,_parsedLine])
|
|
|
-
|
|
|
- _cached[_pid] = self.merge(_cached[_pid],_parsedLine)
|
|
|
-
|
|
|
-
|
|
|
- #
|
|
|
- # Let's create the documents as we understand them to be
|
|
|
- # @TODO: Create a log so there can be visibility into the parser
|
|
|
- #
|
|
|
- _document = {}
|
|
|
- for _id in _cached :
|
|
|
- # print ('patient' in _cached[_id] )
|
|
|
-
|
|
|
- _document = self.merge(_document,_cached[_id])
|
|
|
-
|
|
|
- return _document
|
|
|
-
|
|
|
-
|