common.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. from typing import Any
  2. import numpy as np
  3. import json
  4. from multiprocessing import Process, RLock
  5. import os
  6. import io
  7. import queue
  8. import transport
  9. from transport import providers
  10. class Store(Process):
  11. """
  12. This is the data-store service that will handle read/writes
  13. """
  14. dataStore = None
  15. @staticmethod
  16. def init(self,**_args):
  17. if Store.dataStore is None :
  18. _args = _args['store']
  19. else:
  20. pass
  21. @staticmethod
  22. def reset():
  23. pass
  24. class X12DOCUMENT (Process):
  25. """
  26. X12DOCUMENT class encapsulates functions that will be used to format an x12 (835,837) claim into an object
  27. """
  28. _queue = queue.Queue()
  29. class MODE :
  30. #
  31. # The following allow us to handle raw content (stream) or a filename
  32. # The raw content will be wrapped into io.StringIO so that it is handled as if it were a file
  33. #
  34. NAMES,STREAM = 'NAMES','STREAM'
  35. class ConfigHandler :
  36. def format(self,**_args):
  37. """
  38. This function formats variations of an element's parsing rules
  39. :info {index,field|label,map}
  40. """
  41. _info = _args['info']
  42. _ref = {}
  43. for _item in _info :
  44. _index = str(_item['index'])
  45. _field = _item['field'] if 'field' in _item else None
  46. _label = _item['label'] if 'label' in _item else None
  47. if _field :
  48. _ref[_index] = {'field':_field}
  49. elif _label :
  50. _ref[_index] = {'label':_label}
  51. return {'@ref':_ref}
  52. def _getColumnsIndexes(self,_columns,_indexes,_map):
  53. """
  54. This function return columns and indexes related if a parsing map is passed
  55. :param _columns
  56. :param _indexes
  57. :param _map parsing map (field:index)
  58. """
  59. # @TODO: insure the lengths are the same for adequate usage downstream ...
  60. _xcolumns,_xindexes = list(_map.keys()), list(_map.values())
  61. keys,values = _xcolumns + _columns,_xindexes + _indexes
  62. _config = dict(zip(keys,values))
  63. _outColumns,_outIndexes = list(_config.keys()),list(_config.values())
  64. return _outColumns,_outIndexes
  65. def _getObjectAtributes(self,_config):
  66. _field = _config['field'] if 'field' in _config else {}
  67. _label = _config['label'] if 'label' in _config else {}
  68. return _field,_label
  69. def consolidate(self,**_args):
  70. #
  71. # This function overrides the old configuration with the new configuration specifications
  72. #
  73. # _columns,_indexes = [],[]
  74. _columns,_indexes = _args['columns'],_args['index']
  75. _map = {}
  76. _config = _args['config'] if 'config' in _args else {}
  77. _field,_label = self._getObjectAtributes(_config)
  78. if 'map' in _config :
  79. _map = _args['config']['map']
  80. _columns,_indexes = self._getColumnsIndexes(_columns,_indexes,_map)
  81. if '@ref' in _config :
  82. # _columns,_indexes = [],[]
  83. _row = _args['row']
  84. _ref = _config['@ref']
  85. for _anchor in _ref:
  86. if _anchor == _row[1].strip() :
  87. _field,_label = self._getObjectAtributes(_ref[_anchor])
  88. _map = _ref[_anchor]['map'] if 'map' in _ref[_anchor] else {}
  89. if _map :
  90. _columns,_indexes = self._getColumnsIndexes([],[],_map)
  91. else:
  92. # print ([_anchor,_indexes,_columns])
  93. _map = dict(zip(_columns,_indexes))
  94. pass
  95. break
  96. # _columns,_indexes = _columns + _map.keys()
  97. _out = {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
  98. return _out
  99. def legacy(self,**_args):
  100. #
  101. # This function returns the legacy configuration (default parsing)
  102. #
  103. _config = _args['config'] if 'config' in _args else {}
  104. _field,_label = self._getObjectAtributes(_config)
  105. _columns,_indexes = [],[]
  106. if 'map' in _config :
  107. _columns = list(_config['map'].keys())
  108. _indexes = list(_config['map'].values())
  109. return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
  110. def override(self,**_args):
  111. return _args['columns'],_args['indexes']
  112. def __init__(self,**_args):
  113. super().__init__()
  114. self._mode = _args['mode'] if 'mode' in _args else 'NAMES'
  115. self.lastelement = {} # This to store in the loop
  116. if 'files' in _args :
  117. self.files = _args['files']
  118. self._config = _args['config'] if 'config' in _args else {}
  119. #
  120. # NM1 is a fluid type and thus will be cached in order to recreate the hierarchy
  121. # @TODO:
  122. # -add this to the configuration
  123. #
  124. self._hierarchy = {'NM1':['N1','N2','N3','N4']}
  125. self._document = []
  126. self._x12FileType = None
  127. self._configHandler = X12DOCUMENT.ConfigHandler()
  128. #
  129. #-- The files need to be classified, the files need to be either claims or remits
  130. #
  131. if 'store' not in self._config :
  132. self._store_args = _args['store'] if 'store' in _args else {'provider':providers.CONSOLE}
  133. else:
  134. self._store_args = self._config['store']
  135. def init(self,_header):
  136. """
  137. Expected Elements must include ST
  138. """
  139. pass
  140. def merge (self,_x,_y):
  141. """
  142. This function will merge two objects _x, _y
  143. """
  144. _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
  145. if _zcols :
  146. _out = dict(_x,**{})
  147. for _key in _y.keys() :
  148. if not _key in _zcols :
  149. _out[_key] = _y[_key]
  150. else:
  151. if type(_out[_key]) == list :
  152. _out[_key] += _y[_key]
  153. elif type(_out[_key]) == dict:
  154. _out[_key] = dict(_out[_key],**_y[_key])
  155. else:
  156. _out[_key] = _y[_key]
  157. return _out
  158. else:
  159. return dict(_x,**_y)
  160. def split(self,content):
  161. """
  162. This function will split the content of an X12 document into blocks and headers
  163. :content x12 document in raw format (text)
  164. """
  165. #_content = content.split('~')
  166. _content = content.split('HL')
  167. _header = _content[:1][0].split('~')
  168. _blocks = ['HL'+_item for _item in _content[1:]]
  169. _blocks = [_item.split('~') for _item in _blocks ]
  170. # for row in _content :
  171. # if not _blocks and not row.startswith('HL') :
  172. # _header.append(row)
  173. # else:
  174. # _blocks.append(row)
  175. return {'header':_header,'blocks':_blocks}
  176. def parse (self,columns,index,**_args):
  177. """
  178. This function encapulates how an x12 document element will be processed
  179. :columns list of attributes that make up the object
  180. :index indexes of the said items in the element
  181. :_args
  182. - row raw x12 element (string)
  183. - config configuration of the element. his should indicate functions to apply against function
  184. """
  185. _ELEMENT = _args['row'][0]
  186. #
  187. # get the right configuration from the _config object
  188. _config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {}
  189. # _field = _config['field'] if 'field' in _config else None
  190. # _label = _config['label'] if 'label' in _config else None
  191. _map = _config['map'] if 'map' in _config else {}
  192. #
  193. # Let's see if overriding the fields/labels isn't necessary
  194. # columns, index,_refField,_refLabel = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
  195. # _field = _field if not _refField else _refField
  196. # _label = _label if not _refLabel else _refLabel
  197. #
  198. # @TODO:
  199. # There should be a priority in parsing i.e plugin - config
  200. #
  201. if 'plugin-context' in _args :
  202. _config = _args['plugin-context'] #dict(_config,**_args['plugin-context'])
  203. _outInfo = self._configHandler.consolidate(row=_args['row'],columns=columns,index=index,config=_config)
  204. _field,_label = _outInfo['field'],_outInfo['label']
  205. _columns,_index = _outInfo['columns'],_outInfo['index']
  206. if 'row' in _args:
  207. _row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
  208. _index = np.array(_index)
  209. #
  210. # Sometimes the _row doesn't have all expected indexes, we will compensate
  211. # This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format)
  212. #
  213. if np.max(_index) > len(_row) -1 :
  214. _delta = 1 + np.max(_index) - len(_row)
  215. _row = _row + np.repeat('',_delta).tolist()
  216. _row = np.array(_row)
  217. # _element = _row[0]
  218. # _configKeys = [] #list(self._config.keys())
  219. # _configTree = [] #list(self._config.values())
  220. # if 'config' in _args :
  221. # _config = _args['config']
  222. # _configKeys = list(_config.keys())
  223. # _configTree = list(_config.values())
  224. # else:
  225. # _config = {}
  226. _info = dict(zip(_columns,_row[_index].tolist()))
  227. _document = _args['document'] if 'document' in _args else {}
  228. #
  229. # @TODO:
  230. # Apply parsing/casting function to the object retrieved
  231. # _apply(_info) #-- the object will be processed accordingly
  232. #
  233. #
  234. # @TODO:
  235. # The objects parsed must be augmented against the appropriate ones e.g: NM1 <- N1,N2,N3,N4
  236. # - Find a way to drive this from a configuration ...
  237. #
  238. if _field :
  239. if not _field in _document :
  240. _item = {_field:_info}
  241. else:
  242. _item = self.merge(_document[_field],_info)
  243. elif _label :
  244. if not _label in _document :
  245. _item = {_label:[_info]}
  246. else:
  247. _item = _document[_label] + [_info]
  248. else:
  249. _item = _info
  250. if _ELEMENT in self._hierarchy and _field:
  251. # print ([_field,_item])
  252. self.lastelement = _item
  253. pass
  254. else:
  255. for key in self._hierarchy :
  256. if _ELEMENT in self._hierarchy[key] :
  257. _ikey = list(self.lastelement.keys())[0]
  258. _oldinfo = self.lastelement[_ikey]
  259. _item = {_ikey: self.merge(_oldinfo,_item)}
  260. break
  261. return _item
  262. else:
  263. #
  264. #
  265. print (_config)
  266. return columns
  267. def elements(self):
  268. """
  269. This function returns elements that are supported as specified by X12 standard
  270. """
  271. return [_name for _name in dir(self) if not _name.startswith('_') and not _name.islower() ]
  272. def pointers(self):
  273. """
  274. This function returns pointers associated with each element ...
  275. :return Object of Element:Function
  276. """
  277. _attr = self.elements()
  278. _pointers = [getattr(self,_name) for _name in _attr]
  279. return dict(zip(_attr,_pointers))
  280. def set(self,_info,_document,_config):
  281. _attrName,_attrType = None,None
  282. if 'label' in _config :
  283. _attrType = 'label'
  284. _attrName = _config['label']
  285. elif 'field' in _config :
  286. _attrType = 'field'
  287. _attrName = _config['field']
  288. if _attrName :
  289. if _attrName not in _document :
  290. _document[_attrName] = [] if _attrType == 'label' else {}
  291. #
  292. # @TODO: make sure we don't have a case of an attribute being overridden
  293. if type(_document[_attrName]) == list :
  294. _document[_attrName] += [_info]
  295. else:
  296. _document[_attrName] = dict(_document[_attrName],**_info)
  297. # _document[_attrName] += [_info] if _attrType == 'label' else dict(_document[_attrName],**_info)
  298. return _document
  299. return dict(_document,**_info)
  300. pass
  301. def log (self,**_args):
  302. pass
  303. def run(self):
  304. """
  305. This function will trigger the workflow associated with a particular file
  306. """
  307. _getContent = {
  308. #
  309. # For the sake of testing, the following insures
  310. # that raw string content is handled as if it were a file
  311. #
  312. X12DOCUMENT.MODE.STREAM: (lambda stream : io.StringIO(stream)) ,
  313. X12DOCUMENT.MODE.NAMES: (lambda name: open(name))
  314. }
  315. _writer = transport.factory.instance(**self._store_args)
  316. for _filename in self.files :
  317. try:
  318. _documents = []
  319. _parts = []
  320. # _content = (open(_filename)).read()
  321. _reader = _getContent[self._mode]
  322. _content = _reader(_filename).read()
  323. _info = self.split(_content)
  324. _fileType=self.init(_content)
  325. _header = self.apply(_info['header'])
  326. # print (json.dumps(_header))
  327. _tmp = {}
  328. for _content in _info['blocks'] :
  329. _body = self.apply(_content,header=_header)
  330. _doc = self.merge(_header,_body)
  331. if _doc and 'claim_id' in _doc:
  332. # X12DOCUMENT._queue.put(_document)
  333. _documents += [self.merge(_tmp,_doc)]
  334. _tmp = {}
  335. else:
  336. #
  337. # The document is being built and not yet ready
  338. _tmp = self.merge(_tmp,_doc)
  339. except Exception as e:
  340. #
  341. # @TODO: Log this issue for later analysis ...
  342. print (e)
  343. pass
  344. #
  345. # Let us post this to the documents we have, we should find a place to post it
  346. #
  347. if _documents :
  348. # print (_header['header'])
  349. self.post(document=_documents,writer=_writer)
  350. break
  351. def post(self,**_args):
  352. """
  353. This function is intended to post content to a given location
  354. :param document
  355. :param writer
  356. """
  357. _writer = _args['writer'] if 'writer' in _args else None
  358. _document = _args['document']
  359. if not _writer:
  360. X12DOCUMENT._queue.put(_document)
  361. else:
  362. _writer.write(_document)
  363. def _getConfig(self,_chunk):
  364. #
  365. # Let us determine what kind of file we are dealing with, so we can extract the configuration
  366. # For this we need to look for the ST loop ...
  367. #
  368. line = [line for line in _chunk if line and line[:2] == 'ST' ]
  369. if line :
  370. #
  371. # We found the header of the block, so we can set the default configuration
  372. #
  373. self._x12FileType = line[0].split('*')[1].strip()
  374. _config = {}
  375. if self._x12FileType :
  376. _config = self._config[self._x12FileType]
  377. return _config
  378. def apply(self,_chunk, header = {}):
  379. """
  380. _chunks are groups of elements split by HL, within each chunk are x12 loops HL,CLM,ISA
  381. """
  382. _document,_cached = {},{}
  383. _pointers = self.pointers()
  384. _config = self._getConfig(_chunk)
  385. #
  386. # The configuration comes from the file, let's run this in merge mode
  387. # _config = self._configHandler.merge
  388. _pid = None
  389. for line in _chunk :
  390. segments = line.split('*')
  391. _ELEMENT = segments[0]
  392. if _ELEMENT not in _pointers or not _ELEMENT:
  393. continue
  394. if _ELEMENT in ['HL','CLM','ISA'] or not _pid:
  395. _pid = _ELEMENT
  396. if _pid not in _cached :
  397. _cached [_pid] = {}
  398. _pointer = _pointers[_ELEMENT]
  399. _args = {'row':segments,'document':_document,'header':header,'config':(_config)}
  400. _parsedLine = _pointer(**_args)
  401. # print ([_pid,_ELEMENT,_parsedLine])
  402. _cached[_pid] = self.merge(_cached[_pid],_parsedLine)
  403. #
  404. # Let's create the documents as we understand them to be
  405. # @TODO: Create a log so there can be visibility into the parser
  406. #
  407. _document = {}
  408. for _id in _cached :
  409. # print ('patient' in _cached[_id] )
  410. _document = self.merge(_document,_cached[_id])
  411. return _document