123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144 |
- import numpy as np
- import os
- """
- This file contains utilities that will be used accross the x12 framework/platform
- @TODO:
- - Provisions with multiprocessing (locks/releases)
- """
- class ContentHandler :
- """
- This class implements {x12} content handling
- """
- def split (self,_stream) :
- if type(_stream) == str :
- _xchar = '~\n' if '~\n' in _stream else ('~' if '~' in _stream else ('\n' if '\n' in _stream else None))
-
- if _xchar :
- _xchar = ''.join(_xchar)
- _rows = _stream.split(_xchar)
-
- return [row.strip().split('*') for row in _rows if row.strip()]
- else:
- return _stream.split('*')
- def classify(self,_content):
- """
- This function is designed to split claim information from the rest of the information (envelope header)
- :_content The file content (already split by row and seperator)
- """
- _indexes = [1 if 'HL' in line else 0 for line in _content]
- _indexes = [_index for _index,_value in enumerate(_indexes) if _value == 1]
-
- #
- # At this point we know how many claims are in the file (log this somewhere)
- #
- _beg = 0
- _end = _indexes[0]
- _header = _content[_beg:_end]
- _block = []
- for _index,_beg in enumerate(_indexes) :
- if _index + 1 == len(_indexes) :
- _end = len(_content)
- else:
- _end = _indexes[_index + 1]
- _block.append(_content[_beg:_end])
-
- return {'header':_header,'block':_block}
- def merge (self,_x,_y):
- """
- This function will merge two objects _x, _y
- """
- _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
-
- if _zcols :
- _out = dict(_x,**{})
- for _key in _y.keys() :
- if not _key in _zcols :
-
- _out[_key] = _y[_key]
- else:
- if type(_out[_key]) == list :
- _out[_key] += _y[_key]
- elif type(_out[_key]) == dict:
- _out[_key] = dict(_out[_key],**_y[_key])
- else:
- _out[_key] = _y[_key]
-
- return _out
- else:
-
- return dict(_x,**_y)
- def _inspect_row(self,**_args):
- """
- This function makes sure the indexes actually exist in the row
- :row row to be parsed (already split)
- :indexes list of indexes
- :columns columns to be used in the creation of the object
- """
- _max = np.max(_args['indexes'])
- _len = np.size(_args['row']) -1
- return _max > _len and np.size(_args['indexes']) == np.size(_args['columns'])
- def _parse (self,**_args):
- """
- This function will parse an x12 element given
- :row row of the x12 element
- :_columns attributes of the object to be returned
- :_indexes indexes of interest
- """
- pass
- _row = _args['row']
- _meta = _args['meta']
- _columns = _args['columns']
- _indexes = np.array(_args['indexes'])
- if not self._inspect_row (_args) :
- #
- # Minimizing parsing errors by padding the line
- _delta = 1+ np.max(_indexes) - np.size(_row)
- _row = _row + np.repeat('',_delta).tolist()
- #
- # @TODO: Log that the rows were padded
- #
- _row = np.array(_row)
- return dict(zip(_columns,_row[_indexes].tolist()))
- def _buildObject (self,**_args):
- """
- :meta data that is pulled from the decorator function
- :object row parsed and stored as an object
- :document existing document being parsed
- """
- _meta = _args['meta']
- _document = _args['document']
- _object = _args['object']
- if 'field' not in _meta and 'container' not in _meta :
- _document = self.merge(_document,_object)
- elif 'field' :
- field = _meta['field']
- if field in _document :
- _document[field] = self.merge(_document[field],_object)
- else:
- _document[field] = _object
- elif 'container' in _meta :
- _label = _meta['container']
- if _label not in _document :
- _document[_label] = []
-
- _document[_label].append(_object)
- return _document
- def get_files(self,**_args):
- folder = _args['folder']
- files = []
- if not os.path.exists(folder) :
- return []
- elif os.path.isdir(folder):
-
- for root,_dir,f in os.walk(folder) :
- if f :
- files += [os.sep.join([root,name]) for name in f]
- files = [path for path in files if os.path.isfile(path)]
- else:
- files = [folder]
- return files
|