utils.py.old 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. import numpy as np
  2. import os
  3. """
  4. This file contains utilities that will be used accross the x12 framework/platform
  5. @TODO:
  6. - Provisions with multiprocessing (locks/releases)
  7. """
  8. class ContentHandler :
  9. """
  10. This class implements {x12} content handling
  11. """
  12. def split (self,_stream) :
  13. if type(_stream) == str :
  14. _xchar = '~\n' if '~\n' in _stream else ('~' if '~' in _stream else ('\n' if '\n' in _stream else None))
  15. if _xchar :
  16. _xchar = ''.join(_xchar)
  17. _rows = _stream.split(_xchar)
  18. return [row.strip().split('*') for row in _rows if row.strip()]
  19. else:
  20. return _stream.split('*')
  21. def classify(self,_content):
  22. """
  23. This function is designed to split claim information from the rest of the information (envelope header)
  24. :_content The file content (already split by row and seperator)
  25. """
  26. _indexes = [1 if 'HL' in line else 0 for line in _content]
  27. _indexes = [_index for _index,_value in enumerate(_indexes) if _value == 1]
  28. #
  29. # At this point we know how many claims are in the file (log this somewhere)
  30. #
  31. _beg = 0
  32. _end = _indexes[0]
  33. _header = _content[_beg:_end]
  34. _block = []
  35. for _index,_beg in enumerate(_indexes) :
  36. if _index + 1 == len(_indexes) :
  37. _end = len(_content)
  38. else:
  39. _end = _indexes[_index + 1]
  40. _block.append(_content[_beg:_end])
  41. return {'header':_header,'block':_block}
  42. def merge (self,_x,_y):
  43. """
  44. This function will merge two objects _x, _y
  45. """
  46. _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
  47. if _zcols :
  48. _out = dict(_x,**{})
  49. for _key in _y.keys() :
  50. if not _key in _zcols :
  51. _out[_key] = _y[_key]
  52. else:
  53. if type(_out[_key]) == list :
  54. _out[_key] += _y[_key]
  55. elif type(_out[_key]) == dict:
  56. _out[_key] = dict(_out[_key],**_y[_key])
  57. else:
  58. _out[_key] = _y[_key]
  59. return _out
  60. else:
  61. return dict(_x,**_y)
  62. def _inspect_row(self,**_args):
  63. """
  64. This function makes sure the indexes actually exist in the row
  65. :row row to be parsed (already split)
  66. :indexes list of indexes
  67. :columns columns to be used in the creation of the object
  68. """
  69. _max = np.max(_args['indexes'])
  70. _len = np.size(_args['row']) -1
  71. return _max > _len and np.size(_args['indexes']) == np.size(_args['columns'])
  72. def _parse (self,**_args):
  73. """
  74. This function will parse an x12 element given
  75. :row row of the x12 element
  76. :_columns attributes of the object to be returned
  77. :_indexes indexes of interest
  78. """
  79. pass
  80. _row = _args['row']
  81. _meta = _args['meta']
  82. _columns = _args['columns']
  83. _indexes = np.array(_args['indexes'])
  84. if not self._inspect_row (_args) :
  85. #
  86. # Minimizing parsing errors by padding the line
  87. _delta = 1+ np.max(_indexes) - np.size(_row)
  88. _row = _row + np.repeat('',_delta).tolist()
  89. #
  90. # @TODO: Log that the rows were padded
  91. #
  92. _row = np.array(_row)
  93. return dict(zip(_columns,_row[_indexes].tolist()))
  94. def _buildObject (self,**_args):
  95. """
  96. :meta data that is pulled from the decorator function
  97. :object row parsed and stored as an object
  98. :document existing document being parsed
  99. """
  100. _meta = _args['meta']
  101. _document = _args['document']
  102. _object = _args['object']
  103. if 'field' not in _meta and 'container' not in _meta :
  104. _document = self.merge(_document,_object)
  105. elif 'field' :
  106. field = _meta['field']
  107. if field in _document :
  108. _document[field] = self.merge(_document[field],_object)
  109. else:
  110. _document[field] = _object
  111. elif 'container' in _meta :
  112. _label = _meta['container']
  113. if _label not in _document :
  114. _document[_label] = []
  115. _document[_label].append(_object)
  116. return _document
  117. def get_files(self,**_args):
  118. folder = _args['folder']
  119. files = []
  120. if not os.path.exists(folder) :
  121. return []
  122. elif os.path.isdir(folder):
  123. for root,_dir,f in os.walk(folder) :
  124. if f :
  125. files += [os.sep.join([root,name]) for name in f]
  126. files = [path for path in files if os.path.isfile(path)]
  127. else:
  128. files = [folder]
  129. return files