parser.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. """
  2. This class refactors the default parsing class (better & streamlined implementation)
  3. The class will rely on the new plug/play architectural style perform parsing
  4. """
  5. from multiprocessing import Process, RLock
  6. import os
  7. import json
  8. # from healthcareio.x12.util
  9. from healthcareio import x12
  10. from healthcareio.x12.util import file, document
  11. import numpy as np
  12. import transport
  13. import copy
  14. # from healthcareio.x12.util import file as File, document as Document
  15. from datetime import datetime
  16. from healthcareio.logger import X12Logger
  17. import time
  18. import pandas as pd
  19. class BasicParser (Process) :
  20. def __init__(self,**_args):
  21. super().__init__()
  22. self._plugins = _args['plugins']
  23. self._parents = _args['parents']
  24. self._files = _args['files']
  25. self._store = dict(_args['store'],**{'lock':True})
  26. self._template = x12.util.template(plugins=self._plugins)
  27. self._logger = X12Logger(store = self._store)
  28. if self._logger :
  29. _info = { key:len(self._plugins[key].keys())for key in self._plugins}
  30. _data = {'plugins':_info,'files': len(self._files),'model': self._template}
  31. self._logger.log(module='BasicParser',action='init',data=_data)
  32. def log (self,**_args):
  33. """
  34. This function logs data into a specified location in JSON format
  35. datetime,module,action,data
  36. """
  37. if self._logger :
  38. self._logger.log(**_args)
  39. pass
  40. def apply(self,**_args):
  41. """
  42. :content raw claim i.e CLP/CLM Loops and related content
  43. :x12 file type 837|835
  44. :document document template with attributes pre-populated
  45. """
  46. _content = _args['content']
  47. _filetype = _args['x12']
  48. _doc = _args['document'] #{}
  49. _documentHandler = x12.util.document.Builder(plugins = self._plugins,parents=self._parents, logger=self._logger)
  50. try:
  51. _tmp = {}
  52. for _row in _content :
  53. # _data = None
  54. _data,_meta = _documentHandler.bind(row=_row,x12=_filetype)
  55. if _data and _meta :
  56. _doc = _documentHandler.build(data=_data,document=_doc,meta=_meta,row=_row)
  57. # print (['*** ',_doc])
  58. pass
  59. except Exception as e:
  60. #
  61. # Log something here ....
  62. # print (_row)
  63. print (e)
  64. # print (_row,_doc.keys())
  65. pass
  66. return _doc
  67. def run(self):
  68. _handleContent = file.Content() #x12.util.file.Content()
  69. _handleDocument = document.Builder(plugins = self._plugins,parents=self._parents,logger=self._logger)
  70. _template = self._template #x12.util.template(plugins=self._plugins)
  71. #
  72. # @TODO: starting initializing parsing jobs :
  73. # - number of files, plugins meta data
  74. _log = {}
  75. for _absolute_path in self._files :
  76. try:
  77. _content = _handleContent.read(filename=_absolute_path)
  78. _content,_filetype = _handleContent.split(_content)
  79. #
  80. # LOG: filename with claims found in it
  81. #
  82. # The first row is the header (it will be common to all claims)
  83. _header = copy.deepcopy(_template[_filetype])
  84. _header = self.apply(content=_content[0],x12=_filetype, document=_header)
  85. _docs = []
  86. _ids = []
  87. for _rawclaim in _content[1:] :
  88. _document = copy.deepcopy(_header) #copy.deepcopy(_template[_filetype])
  89. if 'claim_id' in _document :
  90. #
  91. # @TODO: Have a way to get the attribute for CLP or CLM
  92. _ids.append(_document['claim_id'])
  93. # _document = dict(_document,**_header)
  94. if type(_absolute_path) == str:
  95. _document['filename'] = _absolute_path
  96. _doc = self.apply(content=_rawclaim,x12=_filetype, document=_document)
  97. if _doc :
  98. _docs.append(_doc)
  99. else:
  100. # print (['wtf ...',_rawclaim])
  101. pass
  102. #
  103. # LOG: information abou the file that has just been processed.
  104. _location = _absolute_path if type(_absolute_path) == str else 'In-Memory'
  105. _data = {'filename':_location, 'available':len(_content[1:]),'x12':_filetype}
  106. _args = {'module':'parse','action':'parse','data':_data}
  107. _data['parsed'] = len(_docs)
  108. self.log(**_args)
  109. self.log(module='parse',action='file-count', data={'file_name':_absolute_path,'file_type':_filetype,'claims':_ids, 'claim_count':len(_ids)})
  110. #
  111. # Let us submit the batch we have thus far
  112. #
  113. self.post(documents=_docs,x12=_filetype,filename=_location)
  114. except Exception as e:
  115. #
  116. # LOG: We have filename and segment of the claim within filename
  117. #
  118. print (e)
  119. def post(self,**_args):
  120. pass
  121. class X12Parser(BasicParser):
  122. def __init__(self,**_args):
  123. super().__init__(**_args)
  124. self._store = _args['store']
  125. def post(self,**_args):
  126. """
  127. Writing the files to a persistent storage in JSON format (hopefully)
  128. """
  129. _documents = _args['documents']
  130. if _documents :
  131. _store = copy.deepcopy(self._store)
  132. TABLE = 'claims' if _args['x12'] in ['837','claims'] else 'remits'
  133. _store['table'] = TABLE
  134. _store['cotnext'] = 'write'
  135. _writer = transport.factory.instance(**_store)
  136. _writer.write(_documents,table=TABLE)
  137. if getattr(_writer,'close') :
  138. _writer.close()
  139. #
  140. # LOG: report what was written
  141. _data = {'x12':_args['x12'], 'documents':len(_documents),'filename':_args['filename']}
  142. # self._logger.log(module='write',action='write',data=_data)
  143. self.log(module='parse',action='write',data=_data)