parser.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. """
  2. This class refactors the default parsing class (better & streamlined implementation)
  3. The class will rely on the new plug/play architectural style perform parsing
  4. """
  5. from multiprocessing import Process, RLock
  6. import os
  7. import json
  8. # from healthcareio.x12.util
  9. from healthcareio import x12
  10. from healthcareio.x12.util import file, document
  11. import numpy as np
  12. import transport
  13. import copy
  14. # from healthcareio.x12.util import file as File, document as Document
  15. from datetime import datetime
  16. from healthcareio.logger import X12Logger
  17. import time
  18. import pandas as pd
  19. from transport import providers
  20. class BasicParser (Process) :
  21. def __init__(self,**_args):
  22. super().__init__()
  23. self._plugins = _args['plugins']
  24. self._parents = _args['parents']
  25. self._files = _args['files']
  26. self._store = dict(_args['store'],**{'lock':True})
  27. self._template = x12.util.template(plugins=self._plugins)
  28. self._logger = X12Logger(store = self._store)
  29. if self._logger :
  30. _info = { key:len(self._plugins[key].keys())for key in self._plugins}
  31. _data = {'plugins':_info,'files': len(self._files),'model': self._template}
  32. self._logger.log(module='BasicParser',action='init',data=_data)
  33. def log (self,**_args):
  34. """
  35. This function logs data into a specified location in JSON format
  36. datetime,module,action,data
  37. """
  38. if self._logger :
  39. self._logger.log(**_args)
  40. pass
  41. def apply(self,**_args):
  42. """
  43. :content raw claim i.e CLP/CLM Loops and related content
  44. :x12 file type 837|835
  45. :document document template with attributes pre-populated
  46. """
  47. _content = _args['content']
  48. _filetype = _args['x12']
  49. _doc = _args['document'] #{}
  50. _documentHandler = x12.util.document.Builder(plugins = self._plugins,parents=self._parents, logger=self._logger)
  51. try:
  52. _tmp = {}
  53. for _row in _content :
  54. # _data = None
  55. _data,_meta = _documentHandler.bind(row=_row,x12=_filetype)
  56. if _data and _meta :
  57. _doc = _documentHandler.build(data=_data,document=_doc,meta=_meta,row=_row)
  58. # print (['*** ',_doc])
  59. pass
  60. except Exception as e:
  61. #
  62. # Log something here ....
  63. # print (_row)
  64. print (e)
  65. # print (_row,_doc.keys())
  66. pass
  67. return _doc
  68. def run(self):
  69. _handleContent = file.Content() #x12.util.file.Content()
  70. _handleDocument = document.Builder(plugins = self._plugins,parents=self._parents,logger=self._logger)
  71. _template = self._template #x12.util.template(plugins=self._plugins)
  72. #
  73. # @TODO: starting initializing parsing jobs :
  74. # - number of files, plugins meta data
  75. _log = {}
  76. for _absolute_path in self._files :
  77. try:
  78. _content = _handleContent.read(filename=_absolute_path)
  79. _content,_filetype = _handleContent.split(_content)
  80. #
  81. # LOG: filename with claims found in it
  82. #
  83. # The first row is the header (it will be common to all claims)
  84. _header = copy.deepcopy(_template[_filetype])
  85. _header = self.apply(content=_content[0],x12=_filetype, document=_header)
  86. _docs = []
  87. _ids = []
  88. for _rawclaim in _content[1:] :
  89. _document = copy.deepcopy(_header) #copy.deepcopy(_template[_filetype])
  90. if 'claim_id' in _document :
  91. #
  92. # @TODO: Have a way to get the attribute for CLP or CLM
  93. _ids.append(_document['claim_id'])
  94. # _document = dict(_document,**_header)
  95. if type(_absolute_path) == str:
  96. _document['filename'] = _absolute_path
  97. _doc = self.apply(content=_rawclaim,x12=_filetype, document=_document)
  98. if _doc :
  99. _docs.append(_doc)
  100. else:
  101. # print (['wtf ...',_rawclaim])
  102. pass
  103. #
  104. # LOG: information abou the file that has just been processed.
  105. _location = _absolute_path if type(_absolute_path) == str else 'In-Memory'
  106. _data = {'filename':_location, 'available':len(_content[1:]),'x12':_filetype}
  107. _args = {'module':'parse','action':'parse','data':_data}
  108. _data['parsed'] = len(_docs)
  109. self.log(**_args)
  110. self.log(module='parse',action='file-count', data={'file_name':_absolute_path,'file_type':_filetype,'claims':_ids, 'claim_count':len(_ids)})
  111. #
  112. # Let us submit the batch we have thus far
  113. #
  114. self.post(documents=_docs,x12=_filetype,filename=_location)
  115. except Exception as e:
  116. #
  117. # LOG: We have filename and segment of the claim within filename
  118. #
  119. print (e)
  120. def post(self,**_args):
  121. pass
  122. class X12Parser(BasicParser):
  123. def __init__(self,**_args):
  124. super().__init__(**_args)
  125. self._store = _args['store']
  126. def post(self,**_args):
  127. """
  128. Writing the files to a persistent storage in JSON format (hopefully)
  129. """
  130. _documents = _args['documents']
  131. if _documents :
  132. _store = copy.deepcopy(self._store)
  133. TABLE = 'claims' if _args['x12'] in ['837','claims'] else 'remits'
  134. _store['table'] = TABLE
  135. _store['cotnext'] = 'write'
  136. _writer = transport.factory.instance(**_store)
  137. # if _store['provider'] not in [providers.MONGODB, providers.COUCHDB] :
  138. for _document in _documents :
  139. for field in _document :
  140. if type(_document[field]) in [dict,list] :
  141. _document[field] = json.dumps(_document[field],default=str)
  142. _writer.write(_documents,table=TABLE)
  143. if getattr(_writer,'close') :
  144. _writer.close()
  145. #
  146. # LOG: report what was written
  147. _data = {'x12':_args['x12'], 'documents':len(_documents),'filename':_args['filename']}
  148. # self._logger.log(module='write',action='write',data=_data)
  149. self.log(module='parse',action='write',data=_data)