parser.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. """
  2. This class refactors the default parsing class (better & streamlined implementation)
  3. The class will rely on the new plug/play architectural style perform parsing
  4. """
  5. from multiprocessing import Process, RLock
  6. import os
  7. import json
  8. # from healthcareio.x12.util
  9. from healthcareio import x12
  10. import numpy as np
  11. import transport
  12. import copy
  13. # from healthcareio.x12.util import file as File, document as Document
  14. from datetime import datetime
  15. from healthcareio.logger import X12Logger
  16. import time
  17. class BasicParser (Process) :
  18. def __init__(self,**_args):
  19. super().__init__()
  20. self._plugins = _args['plugins']
  21. self._parents = _args['parents']
  22. self._files = _args['files']
  23. self._store = _args['store']
  24. self._template = x12.util.template(plugins=self._plugins)
  25. # self._logger = _args['logger'] if 'logger' in _args else None
  26. self._logger = X12Logger(store = self._store)
  27. if self._logger :
  28. _info = { key:len(self._plugins[key].keys())for key in self._plugins}
  29. _data = {'plugins':_info,'files': len(self._files),'model':self._template}
  30. self._logger.log(module='BasicParser',action='init',data=_data)
  31. def log (self,**_args):
  32. """
  33. This function logs data into a specified location in JSON format
  34. datetime,module,action,data
  35. """
  36. pass
  37. def apply(self,**_args):
  38. """
  39. :content raw claim i.e CLP/CLM Loops and related content
  40. :x12 file type 837|835
  41. :document document template with attributes pre-populated
  42. """
  43. _content = _args['content']
  44. _filetype = _args['x12']
  45. _doc = _args['document'] #{}
  46. _documentHandler = x12.util.document.Builder(plugins = self._plugins,parents=self._parents)
  47. try:
  48. for _row in _content :
  49. # _data = None
  50. _data,_meta = _documentHandler.bind(row=_row,x12=_filetype)
  51. if _data and _meta :
  52. _doc = _documentHandler.build(data=_data,document=_doc,meta=_meta,row=_row)
  53. # print (['*** ',_doc])
  54. pass
  55. except Exception as e:
  56. #
  57. # Log something here ....
  58. print (_row)
  59. print (e)
  60. # print (_row,_doc.keys())
  61. pass
  62. return _doc
  63. def run(self):
  64. _handleContent = x12.util.file.Content()
  65. _handleDocument = x12.util.document.Builder(plugins = self._plugins,parents=self._parents)
  66. _template = self._template #x12.util.template(plugins=self._plugins)
  67. #
  68. # @TODO: starting initializing parsing jobs :
  69. # - number of files, plugins meta data
  70. _log = {}
  71. for _absolute_path in self._files :
  72. try:
  73. _content = _handleContent.read(filename=_absolute_path)
  74. _content,_filetype = _handleContent.split(_content)
  75. #
  76. # LOG: filename with claims found in it
  77. #
  78. # The first row is the header (it will be common to all claims)
  79. _header = copy.deepcopy(_template[_filetype])
  80. _header = self.apply(content=_content[0],x12=_filetype, document=_header)
  81. _docs = []
  82. for _rawclaim in _content[1:] :
  83. _document = copy.deepcopy(_header) #copy.deepcopy(_template[_filetype])
  84. # _document = dict(_document,**_header)
  85. if type(_absolute_path) == str:
  86. _document['filename'] = _absolute_path
  87. _doc = self.apply(content=_rawclaim,x12=_filetype, document=_document)
  88. if _doc :
  89. _docs.append(_doc)
  90. else:
  91. # print (['wtf ...',_rawclaim])
  92. pass
  93. #
  94. # LOG: information abou the file that has just been processed.
  95. _location = _absolute_path if type(_absolute_path) == str else 'In-Memory'
  96. _data = {'filename':_location, 'available':len(_content[1:]),'x12':_filetype}
  97. _args = {'module':'parse','action':'parse','data':_data}
  98. _data['parsed'] = len(_docs)
  99. self._logger.log(**_args)
  100. #
  101. # Let us submit the batch we have thus far
  102. #
  103. self.post(documents=_docs,x12=_filetype,filename=_location)
  104. except Exception as e:
  105. #
  106. # LOG: We have filename and segment of the claim within filename
  107. #
  108. print (e)
  109. def post(self,**_args):
  110. pass
  111. class X12Parser(BasicParser):
  112. def __init__(self,**_args):
  113. super().__init__(**_args)
  114. self._store = _args['store']
  115. def post(self,**_args):
  116. """
  117. Writing the files to a persistent storage in JSON format (hopefully)
  118. """
  119. _documents = _args['documents']
  120. if _documents :
  121. _store = copy.copy(self._store,**{})
  122. TABLE = 'claims' if _args['x12'] in ['837','claims'] else 'remits'
  123. _store['table'] = TABLE
  124. _writer = transport.factory.instance(**_store)
  125. _writer.write(_documents)
  126. if getattr(_writer,'close') :
  127. _writer.close()
  128. #
  129. # LOG: report what was written
  130. _data = {'x12':_args['x12'], 'documents':len(_documents),'filename':_args['filename']}
  131. self._logger.log(module='write',action='write',data=_data)
  132. # def instance (**_args):
  133. # """
  134. # :path
  135. # """
  136. # # _files = x12.util.Files.get(_args['file'])
  137. # # #
  138. # # # We can split these files (multi-processing)
  139. # # #
  140. # # _jobCount = 1 if 'jobs' not in _args else int (_args['jobs'])
  141. # # _files = np.array_split(_files,_jobCount)
  142. # # PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
  143. # # if 'config' in _args :
  144. # # PATH = _args['config']
  145. # # f = open(PATH)
  146. # # _config = json.loads(f.read())
  147. # # f.close()
  148. # # jobs = []
  149. # # for _batch in _files :
  150. # # pthread = Parser(files=_batch,config=_config)
  151. # # pthread.start()
  152. # # jobs.append(pthread)
  153. # # time.sleep(1)
  154. # pass
  155. # class parser (Process) :
  156. # _CONFIGURATION = {}
  157. # def __init__(self,path=None) :
  158. # if not parser._CONFIGURATION :
  159. # _path = path if path else os.sep.join([os.environ['HOME'],'.healthcareio/config.json'])
  160. # #
  161. # # @TODO: Load custom configuration just in case we need to do further processing
  162. # config = json.loads(open(path).read())
  163. # parser._CONFIGURATION = config['parser']
  164. # #
  165. # # do we have a custom configuration in this location
  166. # #
  167. # _custompath = _path.replace('config.json','')
  168. # _custompath = _custompath if not _custompath.endswith(os.sep) else _custompath[:-1]
  169. # _custompath = os.sep.join([_custompath,'custom'])
  170. # if os.exists(_custompath) :
  171. # files = os.listdir(_custompath)
  172. # if files :
  173. # _filename = os.sep.join([_custompath,files[0]])
  174. # _customconf = json.loads(open(_filename).read())
  175. # #
  176. # # merge with existing configuration
  177. # else:
  178. # pass
  179. # #
  180. # #
  181. # class getter :
  182. # def value(self,) :
  183. # pass
  184. # class setter :
  185. # def files(self,files):
  186. # pass