123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- """
- This file encapsulates the functions needed to build a document
- """
- import numpy as np
- import copy
- class Builder:
- __doc__ = """
- This class is intended to create and manipulate objects
- :merge The class merges two objects and accounts for attributes that are lists
- :parent returns the parent for a given object
- """
- def __init__(self,**_args):
- self._last = {}
-
- self._plugins = copy.deepcopy(_args['plugins'])
- self._parents = copy.deepcopy(_args['parents'])
- self._loop = {}
-
-
- def reset (self):
- self._last = {}
- self._loop = {}
- def parent(self,**_args):
- """
- This function returns the parent item of an object
- :meta meta data of a decorated/annotated function
- """
- _meta = _args['meta']
- # _item = None
- if _meta['parent'] :
- _id = _meta['parent']
- if _id :
- return self._last[_id] if _id in self._last else None
- return None
- # if _id in self._parents :
- # self._last[_id] =
- # if 'parent' in _meta : #hasattr(_meta,'parent'):
- # _hasField = 'field' in _meta
- # _hasParent= _meta['element'] in self._parents
- # if _hasField and _hasParent: #_meta.element in self._parents and hasattr(_meta,'field'):
-
- # self._last = _item
- # pass
- # else:
- # for key in self._parents :
- # if _meta['element'] in self._parents[key] :
-
- # _ikey = list(self._last.keys())[0]
- # _oldinfo = self._last[_ikey]
- # if type(_oldinfo) != dict :
- # #
- # # Only applicable against a dictionary not a list (sorry)
- # pass
- # else:
- # _item = {_ikey: self.merge(_oldinfo,_item)}
-
- # break
- # pass
-
- # return _item
- def count(self,_element):
- if _element not in self._loop :
- self._loop[_element] = 0
- self._loop[_element] += 1
- def pointer(self,**_args):
- """
- This function returns a pointer associated with a row element
- @TODO: Make sure we know what kind of file we are processing (it would help suppress the loop)
- """
- _id = _args['row'][0] if 'row' in _args else _args['element']
- _filetype = _args['x12']
- _pointer = None
-
- if _id in self._plugins[_filetype] :
- _pointer = self._plugins[_filetype][_id]
- else:
- for _x12 in self._plugins :
- if _id in self._plugins[_x12] :
- _pointer = self._plugins[_x12][_id]
- break
-
-
- return _pointer
- def field(self,**_args) :
- _row = _args['row']
- _meta= _args['meta']
- _field = None
- if _meta['parent'] :
- _field = self.parent(meta=_meta)['field']
- if 'field' in _meta or 'container' in _meta :
- _field = _meta['field'] if 'field' in _meta else _meta['container']
-
- if 'anchor' in _meta :
- _anchor = _meta['anchor']
- for key in _anchor :
-
- if key == _row[1].strip() :
- _field = _anchor[key]
- break
-
- return _field
- def merge (self,_x,_y):
- """
- This function will merge two objects _x, _y
- """
- _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
-
- if _zcols :
- _out = dict(_x,**{})
- for _key in list(_y.keys()) :
-
-
- if _key not in _zcols and _key:
- _out[_key] = _y[_key]
- else:
- if type(_out[_key]) == list :
- for value in _y[_key] :
- if value not in _out[_key] :
- _out[_key].append(value)
- # _out[_key] += _y[_key]
- elif type(_out[_key]) == dict:
- _out[_key] = dict(_out[_key],**_y[_key])
- else:
- _out[_key] = _y[_key]
-
- return _out
- else:
-
- return dict(_x,**_y)
- def parse (self,**_args):
- """
- This function will perform parsing on behalf of the plugin by relying on map function
- :row raw x12 row
- :meta meta data of the plugin function
- """
- #-- Loop Markers
-
- _row = _args['row']
- _map = _args['meta']['map']
- # _map = self.pointer(row=_row).meta['map']
-
- _index = list(_map.keys())
-
- _columns = [] #[_map[_id] for _id in _index ]
- for _id in _index :
- _name = _map[_id]
- if type(_name) == list :
- _columns += _name
- _i = _index.index(_id)
- _index = (_index[:_i] + np.repeat(_index[_i], len(_name)).tolist()+_index[_i+1:])
- else:
- _columns.append(_name)
- _info = {}
- _index = np.array(_index).astype(int)
- # _document = _args['document']
- if np.max(_index) > len(_row) -1 :
- _delta = 1 + np.max(_index) - len(_row)
- _row = _row + np.repeat('',_delta).tolist()
- _row = np.array(_row)
- try:
- _info = dict(zip(_columns,_row[_index].tolist()))
- except Exception as e:
- # print (_row)
- # print ( e)
- pass
-
- return _info
- def meta (self,**_args):
- _row = _args['row']
- _id = _row[0]
- _meta = None
- for key in self._plugins :
- _items = self._plugins[key]
- if _id in _items :
- _meta = (_items[_id].meta)
- break
- return _meta
- def update(self,**_args):
- _element = _args['row'][0]
- if _element in self._parents :
- _meta = self.meta(row=_args['row'])
- if 'field' not in _meta :
- _field = self.field(row=_args['row'],meta=_meta)
- else:
- _field = _meta['field']
- self._last[_element] = {'data':_args['data'],'field':_field}
- def bind(self,**_args):
- """
- This function is intended to make an object out of an element
- :row raw row of x12
- :document object that is the document
- """
- _row = _args['row']
- _filetype = _args['x12']
- _id = _row[0]
- self.count(_id)
- _pointer = self.pointer(row=_row,x12=_filetype)
-
-
- _parent = None
- _data = {}
- # _document = _args['document']
- if not _pointer :
- return None,None
- #
- # Should we use the built-in parser or not
- if _pointer and 'map' in _pointer.meta :
- _data = self.parse(row=_row,meta=_pointer.meta)
- #
- # This function will be used as formatter (at least)
- # We will also insure that the current element is not the last one
- _out = _pointer(row=_row,data=_data, meta=_pointer.meta)
- _data = _data if _out is None else _out
- self.update(row = _row, data=_data) #-- If this element is considered a parent, we store it
- return _data, _pointer.meta
-
- def build (self,**_args):
- """
- This function attemps to place a piece of data within a document
- """
-
- _meta = _args['meta']
- _data = _args['data']
- _row = _args['row']
-
- _document = _args['document']
-
- # if _meta['parent'] :
- # _field = self.parent(meta=_meta)['field']
- # elif 'field' in _meta :
- # _field = _meta['field']
- # elif 'container' in _meta :
- # _field = _meta['container']
-
- # if type(_document[_field]) != list :
- # _data = self.merge(_document[_field],_data)
- # _document[_field] = []
-
- # elif 'anchor' in _meta:
- # _field = self.field(row=_row,meta=_meta)
-
-
- # else:
- # _field = None
- _field = self.field(meta=_meta,row=_row)
- if _field :
- if 'container' in _meta and type(_document[_field]) != list :
- _document[_field] = []
- if _field and _document:
-
- if _field not in _document :
- _document[_field] =_data
- else:
- if 'container' in _meta :
- _document[_field].append(_data)
- else:
- _document[_field] = self.merge(_document[_field],_data)
- else:
- if not _field and 'anchor' in _meta :
- #
- # This is an unusual situation ...
- pass
- _document = self.merge(_document,_data)
- return _document
-
|