_common.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456
  1. from typing import Any
  2. import numpy as np
  3. import json
  4. from multiprocessing import Process, RLock
  5. import os
  6. import io
  7. import queue
  8. import transport
  9. from transport import providers
  10. class Store(Process):
  11. """
  12. This is the data-store service that will handle read/writes
  13. """
  14. dataStore = None
  15. @staticmethod
  16. def init(self,**_args):
  17. if Store.dataStore is None :
  18. _args = _args['store']
  19. else:
  20. pass
  21. @staticmethod
  22. def reset():
  23. pass
  24. class X12DOCUMENT (Process):
  25. """
  26. X12DOCUMENT class encapsulates functions that will be used to format an x12 (835,837) claim into an object
  27. """
  28. _queue = queue.Queue()
  29. class MODE :
  30. #
  31. # The following allow us to handle raw content (stream) or a filename
  32. # The raw content will be wrapped into io.StringIO so that it is handled as if it were a file
  33. #
  34. NAMES,STREAM = 'NAMES','STREAM'
  35. class ConfigHandler :
  36. def format(self,**_args):
  37. """
  38. This function formats variations of an element's parsing rules
  39. :info {index,field|label,map}
  40. """
  41. _info = _args['info']
  42. _ref = {}
  43. for _item in _info :
  44. _index = str(_item['index'])
  45. _field = _item['field'] if 'field' in _item else None
  46. _label = _item['label'] if 'label' in _item else None
  47. if _field :
  48. _ref[_index] = {'field':_field}
  49. elif _label :
  50. _ref[_index] = {'label':_label}
  51. return {'@ref':_ref}
  52. def _getColumnsIndexes(self,_columns,_indexes,_map):
  53. """
  54. This function return columns and indexes related if a parsing map is passed
  55. :param _columns
  56. :param _indexes
  57. :param _map parsing map (field:index)
  58. """
  59. # @TODO: insure the lengths are the same for adequate usage downstream ...
  60. _xcolumns,_xindexes = list(_map.keys()), list(_map.values())
  61. keys,values = _xcolumns + _columns,_xindexes + _indexes
  62. _config = dict(zip(keys,values))
  63. _outColumns,_outIndexes = list(_config.keys()),list(_config.values())
  64. return _outColumns,_outIndexes
  65. def _getObjectAtributes(self,_config):
  66. _field = _config['field'] if 'field' in _config else {}
  67. _label = _config['label'] if 'label' in _config else {}
  68. return _field,_label
  69. def merge(self,**_args):
  70. #
  71. # This function overrides the old configuration with the new configuration specifications
  72. #
  73. # _columns,_indexes = [],[]
  74. _columns,_indexes = _args['columns'],_args['index']
  75. _map = {}
  76. _config = _args['config'] if 'config' in _args else {}
  77. _field,_label = self._getObjectAtributes(_config)
  78. if 'map' in _config :
  79. _map = _args['config']['map']
  80. _columns,_indexes = self._getColumnsIndexes(_columns,_indexes,_map)
  81. if '@ref' in _config :
  82. # _columns,_indexes = [],[]
  83. _row = _args['row']
  84. _ref = _config['@ref']
  85. for _anchor in _ref:
  86. # print ([_anchor,_anchor == _row[1].strip()])
  87. if _anchor == _row[1].strip() :
  88. _field,_label = self._getObjectAtributes(_ref[_anchor])
  89. _map = _ref[_anchor]['map'] if 'map' in _ref[_anchor] else {}
  90. if _map :
  91. _columns,_indexes = self._getColumnsIndexes([],[],_map)
  92. break
  93. # _columns,_indexes = _columns + _map.keys()
  94. return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
  95. def legacy(self,**_args):
  96. #
  97. # This function returns the legacy configuration (default parsing)
  98. #
  99. _config = _args['config'] if 'config' in _args else {}
  100. _field,_label = self._getObjectAtributes(_config)
  101. _columns,_indexes = [],[]
  102. if 'map' in _config :
  103. _columns = list(_config['map'].keys())
  104. _indexes = list(_config['map'].values())
  105. return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
  106. def override(self,**_args):
  107. return _args['columns'],_args['indexes']
  108. def __init__(self,**_args):
  109. super().__init__()
  110. self._mode = _args['mode'] if 'mode' in _args else 'NAMES'
  111. if 'files' in _args :
  112. self.files = _args['files']
  113. self._config = _args['config'] if 'config' in _args else {}
  114. self._document = []
  115. self._x12FileType = None
  116. self._configHandler = X12DOCUMENT.ConfigHandler()
  117. #
  118. #-- The files need to be classified, the files need to be either claims or remits
  119. #
  120. if 'store' not in self._config :
  121. self._store_args = _args['store'] if 'store' in _args else {'provider':providers.CONSOLE}
  122. else:
  123. self._store_args = self._config['store']
  124. def init(self,_header):
  125. """
  126. Expected Elements must include ST
  127. """
  128. pass
  129. def merge (self,_x,_y):
  130. """
  131. This function will merge two objects _x, _y
  132. """
  133. _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
  134. if _zcols :
  135. _out = dict(_x,**{})
  136. for _key in _y.keys() :
  137. if not _key in _zcols :
  138. _out[_key] = _y[_key]
  139. else:
  140. if type(_out[_key]) == list :
  141. _out[_key] += _y[_key]
  142. elif type(_out[_key]) == dict:
  143. _out[_key] = dict(_out[_key],**_y[_key])
  144. else:
  145. _out[_key] = _y[_key]
  146. return _out
  147. else:
  148. return dict(_x,**_y)
  149. def split(self,content):
  150. """
  151. This function will split the content of an X12 document into blocks and headers
  152. :content x12 document in raw format (text)
  153. """
  154. #_content = content.split('~')
  155. _content = content.split('HL')
  156. _header = _content[:1][0].split('~')
  157. _blocks = ['HL'+_item for _item in _content[1:]]
  158. _blocks = [_item.split('~') for _item in _blocks ]
  159. # for row in _content :
  160. # if not _blocks and not row.startswith('HL') :
  161. # _header.append(row)
  162. # else:
  163. # _blocks.append(row)
  164. return {'header':_header,'blocks':_blocks}
  165. def parse (self,columns,index,**_args):
  166. """
  167. This function encapulates how an x12 document element will be processed
  168. :columns list of attributes that make up the object
  169. :index indexes of the said items in the element
  170. :_args
  171. - row raw x12 element (string)
  172. - config configuration of the element. his should indicate functions to apply against function
  173. """
  174. _ELEMENT = _args['row'][0]
  175. #
  176. # get the right configuration from the _config object
  177. _config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {}
  178. # _field = _config['field'] if 'field' in _config else None
  179. # _label = _config['label'] if 'label' in _config else None
  180. _map = _config['map'] if 'map' in _config else {}
  181. #
  182. # Let's see if overriding the fields/labels isn't necessary
  183. # columns, index,_refField,_refLabel = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
  184. # _field = _field if not _refField else _refField
  185. # _label = _label if not _refLabel else _refLabel
  186. _outInfo = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
  187. _field,_label = _outInfo['field'],_outInfo['label']
  188. _columns,_index = _outInfo['columns'],_outInfo['index']
  189. if 'row' in _args:
  190. _row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
  191. _index = np.array(_index)
  192. #
  193. # Sometimes the _row doesn't have all expected indexes, we will compensate
  194. # This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format)
  195. #
  196. if np.max(_index) > len(_row) -1 :
  197. _delta = 1 + np.max(_index) - len(_row)
  198. _row = _row + np.repeat('',_delta).tolist()
  199. _row = np.array(_row)
  200. # _element = _row[0]
  201. _configKeys = [] #list(self._config.keys())
  202. _configTree = [] #list(self._config.values())
  203. if 'config' in _args :
  204. _config = _args['config']
  205. _configKeys = list(_config.keys())
  206. _configTree = list(_config.values())
  207. else:
  208. _config = {}
  209. _info = dict(zip(_columns,_row[_index].tolist()))
  210. _document = _args['document'] if 'document' in _args else {}
  211. #
  212. # Extracting configuration (minimal information)
  213. # _config = _args['config'] if 'config' in _args else {}
  214. # _config = self._config
  215. # if '@ref' in _config :
  216. # print (_config['@ref'])
  217. # _values = _config['@ref']
  218. # print (_values)
  219. if _field :
  220. if not _field in _document :
  221. return {_field:_info}
  222. else:
  223. return self.merge(_document[_field],_info)
  224. elif _label :
  225. if not _label in _document :
  226. return {_label:[_info]}
  227. else:
  228. return _document[_label] + [_info]
  229. else:
  230. return _info
  231. else:
  232. return columns
  233. def elements(self):
  234. """
  235. This function returns elements that are supported as specified by X12 standard
  236. """
  237. return [_name for _name in dir(self) if not _name.startswith('_') and not _name.islower() ]
  238. def pointers(self):
  239. """
  240. This function returns pointers associated with each element ...
  241. :return Object of Element:Function
  242. """
  243. _attr = self.elements()
  244. _pointers = [getattr(self,_name) for _name in _attr]
  245. return dict(zip(_attr,_pointers))
  246. def set(self,_info,_document,_config):
  247. _attrName,_attrType = None,None
  248. if 'label' in _config :
  249. _attrType = 'label'
  250. _attrName = _config['label']
  251. elif 'field' in _config :
  252. _attrType = 'field'
  253. _attrName = _config['field']
  254. if _attrName :
  255. if _attrName not in _document :
  256. _document[_attrName] = [] if _attrType == 'label' else {}
  257. #
  258. # @TODO: make sure we don't have a case of an attribute being overridden
  259. if type(_document[_attrName]) == list :
  260. _document[_attrName] += [_info]
  261. else:
  262. _document[_attrName] = dict(_document[_attrName],**_info)
  263. # _document[_attrName] += [_info] if _attrType == 'label' else dict(_document[_attrName],**_info)
  264. return _document
  265. return dict(_document,**_info)
  266. pass
  267. def log (self,**_args):
  268. pass
  269. def run(self):
  270. """
  271. This function will trigger the workflow associated with a particular file
  272. """
  273. _getContent = {
  274. #
  275. # For the sake of testing, the following insures
  276. # that raw string content is handled as if it were a file
  277. #
  278. X12DOCUMENT.MODE.STREAM: (lambda stream : io.StringIO(stream)) ,
  279. X12DOCUMENT.MODE.NAMES: (lambda name: open(name))
  280. }
  281. _writer = transport.factory.instance(**self._store_args)
  282. for _filename in self.files :
  283. try:
  284. _documents = []
  285. _parts = []
  286. # _content = (open(_filename)).read()
  287. _reader = _getContent[self._mode]
  288. _content = _reader(_filename).read()
  289. _info = self.split(_content)
  290. _fileType=self.init(_content)
  291. _header = self.apply(_info['header'])
  292. # print (json.dumps(_header))
  293. for _content in _info['blocks'] :
  294. _body = self.apply(_content,header=_header)
  295. _doc = self.merge(_header,_body)
  296. if _doc and 'claim_id' in _doc:
  297. # X12DOCUMENT._queue.put(_document)
  298. _documents += [_doc]
  299. except Exception as e:
  300. #
  301. # @TODO: Log this issue for later analysis ...
  302. print (e)
  303. pass
  304. #
  305. # Let us post this to the documents we have, we should find a place to post it
  306. #
  307. if _documents :
  308. # print (_header['header'])
  309. self.post(document=_documents,writer=_writer)
  310. break
  311. def post(self,**_args):
  312. """
  313. This function is intended to post content to a given location
  314. :param document
  315. :param writer
  316. """
  317. _writer = _args['writer'] if 'writer' in _args else None
  318. _document = _args['document']
  319. if not _writer:
  320. X12DOCUMENT._queue.put(_document)
  321. else:
  322. _writer.write(_document)
  323. def _getConfig(self,_chunk):
  324. #
  325. # Let us determine what kind of file we are dealing with, so we can extract the configuration
  326. # For this we need to look for the ST loop ...
  327. #
  328. line = [line for line in _chunk if line and line[:2] == 'ST' ]
  329. if line :
  330. #
  331. # We found the header of the block, so we can set the default configuration
  332. #
  333. self._x12FileType = line[0].split('*')[1].strip()
  334. _config = {}
  335. if self._x12FileType :
  336. _config = self._config[self._x12FileType]
  337. return _config
  338. def apply(self,_chunk, header = {}):
  339. """
  340. _chunks are groups of elements split by HL, within each chunk are x12 loops HL,CLM,ISA
  341. """
  342. _document,_cached = {},{}
  343. _pointers = self.pointers()
  344. _config = self._getConfig(_chunk)
  345. #
  346. # The configuration comes from the file, let's run this in merge mode
  347. # _config = self._configHandler.merge
  348. _pid = None
  349. for line in _chunk :
  350. segments = line.split('*')
  351. _ELEMENT = segments[0]
  352. if _ELEMENT not in _pointers or not _ELEMENT:
  353. continue
  354. if _ELEMENT in ['HL','CLM','ISA'] or not _pid:
  355. _pid = _ELEMENT
  356. if _pid not in _cached :
  357. _cached [_pid] = {}
  358. _pointer = _pointers[_ELEMENT]
  359. _args = {'row':segments,'document':_document,'header':header,'config':(_config)}
  360. _parsedLine = _pointer(**_args)
  361. # print ([_pid,_ELEMENT,_parsedLine])
  362. _cached[_pid] = self.merge(_cached[_pid],_parsedLine)
  363. #
  364. # Let's create the documents as we understand them to be
  365. # @TODO: Create a log so there can be visibility into the parser
  366. #
  367. _document = {}
  368. for _id in _cached :
  369. # print ('patient' in _cached[_id] )
  370. _document = self.merge(_document,_cached[_id])
  371. return _document