common.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525
  1. from typing import Any
  2. import numpy as np
  3. import json
  4. from multiprocessing import Process, RLock
  5. import os
  6. import io
  7. import queue
  8. import transport
  9. from transport import providers
  10. class Store(Process):
  11. """
  12. This is the data-store service that will handle read/writes
  13. """
  14. dataStore = None
  15. @staticmethod
  16. def init(self,**_args):
  17. if Store.dataStore is None :
  18. _args = _args['store']
  19. else:
  20. pass
  21. @staticmethod
  22. def reset():
  23. pass
  24. class X12DOCUMENT (Process):
  25. """
  26. X12DOCUMENT class encapsulates functions that will be used to format an x12 (835,837) claim into an object
  27. """
  28. _queue = queue.Queue()
  29. class MODE :
  30. #
  31. # The following allow us to handle raw content (stream) or a filename
  32. # The raw content will be wrapped into io.StringIO so that it is handled as if it were a file
  33. #
  34. NAMES,STREAM = 'NAMES','STREAM'
  35. class ConfigHandler :
  36. def format(self,**_args):
  37. """
  38. This function formats variations of an element's parsing rules
  39. :info {index,field|label,map}
  40. """
  41. _info = _args['info']
  42. _ref = {}
  43. for _item in _info :
  44. _index = str(_item['index'])
  45. _field = _item['field'] if 'field' in _item else None
  46. _label = _item['label'] if 'label' in _item else None
  47. if _field :
  48. _ref[_index] = {'field':_field}
  49. elif _label :
  50. _ref[_index] = {'label':_label}
  51. return {'@ref':_ref}
  52. def _getColumnsIndexes(self,_columns,_indexes,_map):
  53. """
  54. This function return columns and indexes related if a parsing map is passed
  55. :param _columns
  56. :param _indexes
  57. :param _map parsing map (field:index)
  58. """
  59. # @TODO: insure the lengths are the same for adequate usage downstream ...
  60. _xcolumns,_xindexes = list(_map.keys()), list(_map.values())
  61. keys,values = _xcolumns + _columns,_xindexes + _indexes
  62. _config = dict(zip(keys,values))
  63. _outColumns,_outIndexes = list(_config.keys()),list(_config.values())
  64. return _outColumns,_outIndexes
  65. def _getObjectAtributes(self,_config):
  66. _field = _config['field'] if 'field' in _config else {}
  67. _label = _config['label'] if 'label' in _config else {}
  68. return _field,_label
  69. def consolidate(self,**_args):
  70. #
  71. # This function overrides the old configuration with the new configuration specifications
  72. #
  73. # _columns,_indexes = [],[]
  74. _columns,_indexes = _args['columns'],_args['index']
  75. _map = {}
  76. _config = _args['config'] if 'config' in _args else {}
  77. _field,_label = self._getObjectAtributes(_config)
  78. if 'map' in _config :
  79. _map = _args['config']['map']
  80. _columns,_indexes = self._getColumnsIndexes(_columns,_indexes,_map)
  81. if '@ref' in _config :
  82. # _columns,_indexes = [],[]
  83. _row = _args['row']
  84. _ref = _config['@ref']
  85. for _anchor in _ref:
  86. if _anchor == _row[1].strip() :
  87. _field,_label = self._getObjectAtributes(_ref[_anchor])
  88. _map = _ref[_anchor]['map'] if 'map' in _ref[_anchor] else {}
  89. if _map :
  90. _columns,_indexes = self._getColumnsIndexes([],[],_map)
  91. else:
  92. # print ([_anchor,_indexes,_columns])
  93. _map = dict(zip(_columns,_indexes))
  94. pass
  95. break
  96. # _columns,_indexes = _columns + _map.keys()
  97. _out = {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
  98. return _out
  99. def legacy(self,**_args):
  100. #
  101. # This function returns the legacy configuration (default parsing)
  102. #
  103. _config = _args['config'] if 'config' in _args else {}
  104. _field,_label = self._getObjectAtributes(_config)
  105. _columns,_indexes = [],[]
  106. if 'map' in _config :
  107. _columns = list(_config['map'].keys())
  108. _indexes = list(_config['map'].values())
  109. return {'columns':_columns,'index':_indexes,'field':_field,'label':_label}
  110. def override(self,**_args):
  111. return _args['columns'],_args['indexes']
  112. def __init__(self,**_args):
  113. super().__init__()
  114. self._mode = _args['mode'] if 'mode' in _args else 'NAMES'
  115. self.lastelement = {} # This to store in the loop
  116. if 'files' in _args :
  117. self.files = _args['files']
  118. self._config = _args['config'] if 'config' in _args else {}
  119. #
  120. # NM1 is a fluid type and thus will be cached in order to recreate the hierarchy
  121. # @TODO:
  122. # -add this to the configuration
  123. #
  124. self._hierarchy = {'NM1':['N1','N2','N3','N4']}
  125. self._document = []
  126. self._x12FileType = None
  127. self._configHandler = X12DOCUMENT.ConfigHandler()
  128. #
  129. #-- The files need to be classified, the files need to be either claims or remits
  130. #
  131. if 'store' not in self._config :
  132. self._store_args = _args['store'] if 'store' in _args else {'provider':providers.CONSOLE}
  133. else:
  134. self._store_args = self._config['store']
  135. def init(self,_header):
  136. """
  137. Expected Elements must include ST
  138. """
  139. pass
  140. def merge (self,_x,_y):
  141. """
  142. This function will merge two objects _x, _y
  143. """
  144. _zcols = list(set(_x.keys()) & set(_y.keys())) #--common columns
  145. if _zcols :
  146. _out = dict(_x,**{})
  147. for _key in _y.keys() :
  148. if not _key in _zcols :
  149. _out[_key] = _y[_key]
  150. else:
  151. if type(_out[_key]) == list :
  152. _out[_key] += _y[_key]
  153. elif type(_out[_key]) == dict:
  154. _out[_key] = dict(_out[_key],**_y[_key])
  155. else:
  156. _out[_key] = _y[_key]
  157. return _out
  158. else:
  159. return dict(_x,**_y)
  160. def split(self,content):
  161. """
  162. This function will split the content of an X12 document into blocks and headers
  163. :content x12 document in raw format (text)
  164. """
  165. #_content = content.split('~')
  166. _content = content.split('HL')
  167. xchar = '~\n' if '~\n' in _content[0] else '~'
  168. _header = _content[:1][0].split(xchar) #.split('~')
  169. _blocks = ['HL*'+_item for _item in _content[1:]]
  170. # xchar = '~\n' if '~\n' in _blocks[0] else '~'
  171. _blocks = [_item.split(xchar) for _item in _blocks ]
  172. # for row in _content :
  173. # if not _blocks and not row.startswith('HL') :
  174. # _header.append(row)
  175. # else:
  176. # _blocks.append(row)
  177. return {'header':_header,'blocks':_blocks}
  178. def parse (self,columns,index,**_args):
  179. """
  180. This function encapulates how an x12 document element will be processed
  181. :columns list of attributes that make up the object
  182. :index indexes of the said items in the element
  183. :_args
  184. - row raw x12 element (string)
  185. - config configuration of the element. his should indicate functions to apply against function
  186. """
  187. _ELEMENT = _args['row'][0]
  188. #
  189. # get the right configuration from the _config object
  190. _config = _args['config'][_ELEMENT] if _ELEMENT in _args['config'] else {}
  191. # _field = _config['field'] if 'field' in _config else None
  192. # _label = _config['label'] if 'label' in _config else None
  193. _map = _config['map'] if 'map' in _config else {}
  194. #
  195. # Let's see if overriding the fields/labels isn't necessary
  196. # columns, index,_refField,_refLabel = self._configHandler.merge(row=_args['row'],columns=columns,index=index,config=_config)
  197. # _field = _field if not _refField else _refField
  198. # _label = _label if not _refLabel else _refLabel
  199. #
  200. # @TODO:
  201. # There should be a priority in parsing i.e plugin - config
  202. #
  203. if 'plugin-context' in _args :
  204. _config = _args['plugin-context'] #dict(_config,**_args['plugin-context'])
  205. _outInfo = self._configHandler.consolidate(row=_args['row'],columns=columns,index=index,config=_config)
  206. _field,_label = _outInfo['field'],_outInfo['label']
  207. _columns,_index = _outInfo['columns'],_outInfo['index']
  208. if 'row' in _args:
  209. _row = _args['row'] if type(_args['row']) == list else _args['row'].split('*')
  210. _index = np.array(_index)
  211. #
  212. # Sometimes the _row doesn't have all expected indexes, we will compensate
  213. # This allows to minimize parsing errors as it may relate to disconnects between configuration and x12 element variations (shitty format)
  214. #
  215. if np.max(_index) > len(_row) -1 :
  216. _delta = 1 + np.max(_index) - len(_row)
  217. _row = _row + np.repeat('',_delta).tolist()
  218. _row = np.array(_row)
  219. _info = dict(zip(_columns,_row[_index].tolist()))
  220. _document = _args['document'] if 'document' in _args else {}
  221. #
  222. # @TODO:
  223. # Apply parsing/casting function to the object retrieved
  224. # _apply(_info) #-- the object will be processed accordingly
  225. #
  226. #
  227. # @TODO:
  228. # The objects parsed must be augmented against the appropriate ones e.g: NM1 <- N1,N2,N3,N4
  229. # - Find a way to drive this from a configuration ...
  230. #
  231. if _field :
  232. if not _field in _document :
  233. _item = {_field:_info}
  234. else:
  235. _item = self.merge(_document[_field],_info)
  236. elif _label :
  237. if not _label in _document :
  238. _item = {_label:[_info]}
  239. else:
  240. _item = _document[_label] + [_info]
  241. else:
  242. _item = _info
  243. if _ELEMENT in self._hierarchy and _field:
  244. # print ([_field,_item])
  245. self.lastelement = _item
  246. pass
  247. else:
  248. for key in self._hierarchy :
  249. if _ELEMENT in self._hierarchy[key] :
  250. _ikey = list(self.lastelement.keys())[0]
  251. _oldinfo = self.lastelement[_ikey]
  252. if type(_oldinfo) != dict :
  253. #
  254. # This is we should log somewhere to suggest an issue happened
  255. #
  256. # self.log(action='error',input=_row)
  257. pass
  258. else:
  259. _item = {_ikey: self.merge(_oldinfo,_item)}
  260. break
  261. pass
  262. return _item
  263. else:
  264. #
  265. #
  266. print (_config)
  267. return columns
  268. def elements(self):
  269. """
  270. This function returns elements that are supported as specified by X12 standard
  271. """
  272. return [_name for _name in dir(self) if not _name.startswith('_') and not _name.islower() ]
  273. def pointers(self):
  274. """
  275. This function returns pointers associated with each element ...
  276. :return Object of Element:Function
  277. """
  278. _attr = self.elements()
  279. _pointers = [getattr(self,_name) for _name in _attr]
  280. return dict(zip(_attr,_pointers))
  281. def set(self,_info,_document,_config):
  282. _attrName,_attrType = None,None
  283. if 'label' in _config :
  284. _attrType = 'label'
  285. _attrName = _config['label']
  286. elif 'field' in _config :
  287. _attrType = 'field'
  288. _attrName = _config['field']
  289. if _attrName :
  290. if _attrName not in _document :
  291. _document[_attrName] = [] if _attrType == 'label' else {}
  292. #
  293. # @TODO: make sure we don't have a case of an attribute being overridden
  294. if type(_document[_attrName]) == list :
  295. _document[_attrName] += [_info]
  296. else:
  297. _document[_attrName] = dict(_document[_attrName],**_info)
  298. # _document[_attrName] += [_info] if _attrType == 'label' else dict(_document[_attrName],**_info)
  299. return _document
  300. return dict(_document,**_info)
  301. pass
  302. def log (self,**_args):
  303. print(_args)
  304. def parseBlocks (self,_blocks,_header):
  305. """
  306. This function extracts blocks and returns them to the caller,
  307. Blocks of a document are made of transactional loops, that constitute a patients claim
  308. """
  309. _tmp = {}
  310. _documents = []
  311. for _content in _blocks :
  312. _body = self.apply(_content,header=_header)
  313. _doc = self.merge(_header,_body)
  314. # self.log(action='parse',section='body',input=_content[0])
  315. if _doc and 'claim_id' in _doc:
  316. # X12DOCUMENT._queue.put(_document)
  317. # self.log(action='parse',section='document')
  318. _documents += [self.merge(_tmp,_doc)]
  319. _tmp = {}
  320. else:
  321. #
  322. # The document is being built and not yet ready
  323. _tmp = self.merge(_tmp,_doc)
  324. return _documents
  325. def run(self):
  326. """
  327. This function will trigger the workflow associated with a particular file
  328. """
  329. for _filename in self.files :
  330. # self.log(action='parse',section='file',input=_filename)
  331. try:
  332. _documents = []
  333. _parts = []
  334. if os.sep in _filename and os.path.exists(_filename) :
  335. _reader = open(_filename)
  336. else:
  337. #
  338. # This is a stream, we are wrapping it into an appropriate structure
  339. #
  340. _reader = io.StringIO(_filename)
  341. #
  342. # Let us log the mode we have set ...
  343. _content = _reader.read()
  344. if hasattr(_reader,'close') :
  345. _reader.close()
  346. _info = self.split(_content)
  347. _fileType=self.init(_content)
  348. _header = self.apply(_info['header'])
  349. if _info['blocks'] :
  350. #
  351. # processing blocks for the current claim
  352. #
  353. _documents = self.parseBlocks(_info['blocks'],_header)
  354. except Exception as e:
  355. #
  356. # @TODO: Log this issue for later analysis ...
  357. print (e)
  358. pass
  359. # #
  360. # # Let us post this to the documents we have, we should find a place to post it
  361. # #
  362. if _documents :
  363. # print (_header['header'])
  364. _writer = transport.factory.instance(**self._store_args)
  365. self.post(document=_documents,writer=_writer)
  366. def post(self,**_args):
  367. """
  368. This function is intended to post content to a given location
  369. :param document
  370. :param writer
  371. """
  372. _writer = _args['writer'] if 'writer' in _args else None
  373. _document = _args['document']
  374. if not _writer:
  375. X12DOCUMENT._queue.put(_document)
  376. else:
  377. _writer.write(_document)
  378. def _getConfig(self,_chunk):
  379. #
  380. # Let us determine what kind of file we are dealing with, so we can extract the configuration
  381. # For this we need to look for the ST loop ...
  382. #
  383. line = [line for line in _chunk if line and line[:2] == 'ST' ]
  384. if line :
  385. #
  386. # We found the header of the block, so we can set the default configuration
  387. #
  388. self._x12FileType = line[0].split('*')[1].strip()
  389. _config = {}
  390. if self._x12FileType :
  391. _config = self._config[self._x12FileType]
  392. return _config
  393. def apply(self,_chunk, header = {}):
  394. """
  395. _chunks are groups of elements split by HL, within each chunk are x12 loops HL,CLM,ISA
  396. """
  397. _document,_cached = {},{}
  398. _pointers = self.pointers()
  399. _config = self._getConfig(_chunk)
  400. #
  401. # The configuration comes from the file, let's run this in merge mode
  402. # _config = self._configHandler.merge
  403. _pid = None
  404. for line in _chunk :
  405. segments = line.split('*')
  406. _ELEMENT = segments[0]
  407. if _ELEMENT not in _pointers or not _ELEMENT:
  408. continue
  409. if _ELEMENT in ['HL','CLM','ISA'] or not _pid:
  410. _pid = _ELEMENT
  411. if _pid not in _cached :
  412. _cached [_pid] = {}
  413. _pointer = _pointers[_ELEMENT]
  414. _args = {'row':segments,'document':_document,'header':header,'config':(_config)}
  415. _parsedLine = _pointer(**_args)
  416. if _pid in _cached :
  417. _cached[_pid] = self.merge(_cached[_pid],_parsedLine)
  418. else:
  419. _cached[_pid] = _parsedLine
  420. #
  421. # Let's create the documents as we understand them to be
  422. # @TODO: Create a log so there can be visibility into the parser
  423. #
  424. _document = {}
  425. for _id in _cached :
  426. # print ('patient' in _cached[_id] )
  427. _document = self.merge(_document,_cached[_id])
  428. return _document