123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172 |
- import os
- import numpy as np
- from io import StringIO
- # from .common import Common
- class Content :
- """
- This class implements functions that will manipulate content of a file
- :split splits the content
- :read reads the content of a file given a filename
- :parse parses the content of a file given a map {index:field_name}
- """
- def __init__(self,**_args):
- self._parents = {}
- self._lastelement = {}
- def split(self,_content):
- if type(_content) == str :
- _xchar = '~\n' if '~\n' in _content else ('~' if '~' in _content else ('\n' if '\n' in _content else None))
- _x12 = '837' if 'CLM*' in _content else ('835' if 'CLP*' in _content else None)
- _map = {'835':'CLP','837':'CLM'}
- _claim_mark = _map[_x12]
- _content = _content.split(_claim_mark)
- _xchar = ''.join(_xchar)
- _chunks = []
-
-
- for _block in _content :
-
- if len(_chunks) > 0 :
- _block = _claim_mark+ _block
- _splitblocks = [row.strip().split('*') for row in _block.split(_xchar) if row.strip()]
- _chunks.append(_splitblocks)
- return _chunks,_x12
- # if _xchar :
- # _xchar = ''.join(_xchar)
- # _rows = _content.split(_xchar)
-
- # return [row.strip().split('*') for row in _rows if row.strip()]
- # else:
- # return _content.split('*')
- return [],None
- def read(self,**_args):
- """
- This function will read and clean-up the content of a file
- """
-
- _filename = _args['filename']
- if type(_filename) == StringIO :
- return _filename.read()
- else:
- f = open(_filename)
- _content = f.read()
- f.close()
- return _content
- def _ix_parse (self,columns,index,**_args):
- """
- This function encapulates how an x12 document element will be processed
- :columns list of attributes that make up the object
- :index indexes of the said items in the element
- :_args
- - row raw x12 element (string)
- - pointer decorated function
- - document
-
- """
-
- _ELEMENT = _args['row'][0]
- _pointer = _args['pointer']
- _document = _args['document']
- if 'map' in _pointer.meta :
- _map = _pointer.meta['map']
- _index = list(_map.keys())
- _columns = [_map[_id] for _id in _index ]
- _info = {}
- _row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
- _index = np.array(_index)
-
- #
- # Sometimes the _row doesn't have all expected indexes, we will compensate
- # This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format)
- #
- if np.max(_index) > len(_row) -1 :
- _delta = 1 + np.max(_index) - len(_row)
- _row = _row + np.repeat('',_delta).tolist()
- _row = np.array(_row)
- _info = dict(zip(_columns,_row[_index].tolist()))
- else:
- #
- # We should call the function that is intended to perform the parsing
- #
- _info = _pointer(row=_args['row'],document=_document,meta=_pointer.meta)
- #
- # @TODO: We should look into the object created and enforce the specifications are met
- #
- return _info
- # def consolidate(self,**_args):
- # """
- # This function takes an object and addit to the document given meta data
- # :document document associated associated with a claim (processing the loops)
- # :object
- # :caller attributes within the decorator
- # """
- # _document = _args['document'] if 'document' in _args else {}
- # _info = _args['object']
- # _meta = _args['meta']
- # #
- # # @TODO:
- # # Apply parsing/casting function to the object retrieved
- # # _apply(_info) #-- the object will be processed accordingly
- # #
-
- # #
- # # @TODO:
- # # The objects parsed must be augmented against the appropriate ones e.g: NM1 <- N1,N2,N3,N4
- # # - Find a way to drive this from a configuration ...
- # #
- # if 'field' in _meta : #hasattr(_meta,'field') :
- # _field = _meta['field']
- # if not _field in _document :
- # _item = {_field:_info}
- # else:
- # _item = self.merge(_document[_field],_info)
- # elif 'container' in _meta: # hasattr(_meta,'container') :
- # _label = _meta.container
- # if not _label in _document :
- # _item = {_label:[_info]}
- # else:
- # _item = _document[_label] + [_info]
- # else:
- # _item = _info
-
- # if 'parent' in _meta : #hasattr(_meta,'parent'):
- # _hasField = 'field' in _meta
- # _hasParent= _meta['element'] in self._parents
- # if _hasField and _hasParent: #_meta.element in self._parents and hasattr(_meta,'field'):
-
- # self_last = _item
- # pass
- # else:
- # for key in self._parents :
- # if _meta.element in self._parents[key] :
-
- # _ikey = list(self_last.keys())[0]
- # _oldinfo = self_last[_ikey]
- # if type(_oldinfo) != dict :
- # #
- # # Only applicable against a dictionary not a list (sorry)
- # pass
- # else:
- # _item = {_ikey: self.merge(_oldinfo,_item)}
-
- # break
- # pass
- # return _item
- class Location :
- @staticmethod
- def get(**_args):
- _path = _args['path']
- files = []
-
- if os.path.isdir(_path):
-
- for root,_dir,f in os.walk(_path) :
- if f :
- files += [os.sep.join([root,name]) for name in f]
- files = [path for path in files if os.path.isfile(path)]
- else:
- files = [_path]
- _chunks = 0 if 'chunks' not in _args else int(_args['chunks'])
- return files if not _chunks else np.array_split(files,_chunks)
|