123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- """
- This class refactors the default parsing class (better & streamlined implementation)
- The class will rely on the new plug/play architectural style perform parsing
- """
- from multiprocessing import Process, RLock
- import os
- import json
- # from healthcareio.x12.util
- from healthcareio import x12
- import numpy as np
- import transport
- import copy
- # from healthcareio.x12.util import file as File, document as Document
- from datetime import datetime
- from healthcareio.logger import X12Logger
- import time
- class BasicParser (Process) :
- def __init__(self,**_args):
- super().__init__()
- self._plugins = _args['plugins']
- self._parents = _args['parents']
- self._files = _args['files']
- self._store = _args['store']
- self._template = x12.util.template(plugins=self._plugins)
- # self._logger = _args['logger'] if 'logger' in _args else None
- self._logger = X12Logger(store = self._store)
- if self._logger :
- _info = { key:len(self._plugins[key].keys())for key in self._plugins}
- _data = {'plugins':_info,'files': len(self._files),'model':self._template}
- self._logger.log(module='BasicParser',action='init',data=_data)
-
- def log (self,**_args):
- """
- This function logs data into a specified location in JSON format
- datetime,module,action,data
- """
- pass
- def apply(self,**_args):
- """
- :content raw claim i.e CLP/CLM Loops and related content
- :x12 file type 837|835
- :document document template with attributes pre-populated
- """
- _content = _args['content']
- _filetype = _args['x12']
- _doc = _args['document'] #{}
- _documentHandler = x12.util.document.Builder(plugins = self._plugins,parents=self._parents)
- try:
-
- for _row in _content :
- # _data = None
-
- _data,_meta = _documentHandler.bind(row=_row,x12=_filetype)
-
- if _data and _meta :
- _doc = _documentHandler.build(data=_data,document=_doc,meta=_meta,row=_row)
- # print (['*** ',_doc])
- pass
-
-
- except Exception as e:
- #
- # Log something here ....
- print (_row)
- print (e)
- # print (_row,_doc.keys())
- pass
- return _doc
- def run(self):
- _handleContent = x12.util.file.Content()
- _handleDocument = x12.util.document.Builder(plugins = self._plugins,parents=self._parents)
- _template = self._template #x12.util.template(plugins=self._plugins)
- #
- # @TODO: starting initializing parsing jobs :
- # - number of files, plugins meta data
- _log = {}
- for _absolute_path in self._files :
- try:
-
- _content = _handleContent.read(filename=_absolute_path)
- _content,_filetype = _handleContent.split(_content)
-
- #
- # LOG: filename with claims found in it
-
- #
- # The first row is the header (it will be common to all claims)
- _header = copy.deepcopy(_template[_filetype])
- _header = self.apply(content=_content[0],x12=_filetype, document=_header)
- _docs = []
-
- for _rawclaim in _content[1:] :
-
- _document = copy.deepcopy(_header) #copy.deepcopy(_template[_filetype])
- # _document = dict(_document,**_header)
- if type(_absolute_path) == str:
- _document['filename'] = _absolute_path
- _doc = self.apply(content=_rawclaim,x12=_filetype, document=_document)
- if _doc :
- _docs.append(_doc)
- else:
- # print (['wtf ...',_rawclaim])
- pass
- #
- # LOG: information abou the file that has just been processed.
- _location = _absolute_path if type(_absolute_path) == str else 'In-Memory'
- _data = {'filename':_location, 'available':len(_content[1:]),'x12':_filetype}
- _args = {'module':'parse','action':'parse','data':_data}
- _data['parsed'] = len(_docs)
- self._logger.log(**_args)
- #
- # Let us submit the batch we have thus far
- #
-
- self.post(documents=_docs,x12=_filetype,filename=_location)
- except Exception as e:
- #
- # LOG: We have filename and segment of the claim within filename
- #
- print (e)
- def post(self,**_args):
- pass
- class X12Parser(BasicParser):
- def __init__(self,**_args):
- super().__init__(**_args)
- self._store = _args['store']
- def post(self,**_args):
- """
- Writing the files to a persistent storage in JSON format (hopefully)
- """
-
- _documents = _args['documents']
- if _documents :
- _store = copy.copy(self._store,**{})
- TABLE = 'claims' if _args['x12'] in ['837','claims'] else 'remits'
- _store['table'] = TABLE
-
- _writer = transport.factory.instance(**_store)
- _writer.write(_documents)
- if getattr(_writer,'close') :
- _writer.close()
- #
- # LOG: report what was written
- _data = {'x12':_args['x12'], 'documents':len(_documents),'filename':_args['filename']}
- self._logger.log(module='write',action='write',data=_data)
- # def instance (**_args):
- # """
- # :path
- # """
- # # _files = x12.util.Files.get(_args['file'])
-
- # # #
- # # # We can split these files (multi-processing)
- # # #
- # # _jobCount = 1 if 'jobs' not in _args else int (_args['jobs'])
- # # _files = np.array_split(_files,_jobCount)
- # # PATH = os.sep.join([os.environ['HOME'],'.healthcareio','config.json'])
- # # if 'config' in _args :
- # # PATH = _args['config']
- # # f = open(PATH)
- # # _config = json.loads(f.read())
- # # f.close()
- # # jobs = []
- # # for _batch in _files :
- # # pthread = Parser(files=_batch,config=_config)
- # # pthread.start()
- # # jobs.append(pthread)
- # # time.sleep(1)
- # pass
- # class parser (Process) :
- # _CONFIGURATION = {}
- # def __init__(self,path=None) :
- # if not parser._CONFIGURATION :
- # _path = path if path else os.sep.join([os.environ['HOME'],'.healthcareio/config.json'])
- # #
- # # @TODO: Load custom configuration just in case we need to do further processing
- # config = json.loads(open(path).read())
- # parser._CONFIGURATION = config['parser']
- # #
- # # do we have a custom configuration in this location
- # #
- # _custompath = _path.replace('config.json','')
- # _custompath = _custompath if not _custompath.endswith(os.sep) else _custompath[:-1]
- # _custompath = os.sep.join([_custompath,'custom'])
- # if os.exists(_custompath) :
- # files = os.listdir(_custompath)
- # if files :
- # _filename = os.sep.join([_custompath,files[0]])
- # _customconf = json.loads(open(_filename).read())
- # #
- # # merge with existing configuration
-
- # else:
- # pass
- # #
- # #
- # class getter :
- # def value(self,) :
- # pass
- # class setter :
- # def files(self,files):
- # pass
-
|