publish.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. import copy
  2. from . import util
  3. import transport
  4. import numpy as np
  5. import time
  6. import pandas as pd
  7. from multiprocessing import Process
  8. import json
  9. from healthcareio.logger import X12Logger
  10. def build (**_args):
  11. """
  12. This function will build SQL statements to create a table (perhaps not needed)
  13. :plugins loaded plugins
  14. :x12 837|835 file types
  15. """
  16. _plugins=_args['plugins']
  17. _x12 = _args['x12']
  18. _template = util.template(plugins=_plugins)[_x12]
  19. _primaryKey = util.getPrimaryKey(plugins=_plugins,x12=_x12)
  20. _tables = []
  21. _main = {}
  22. for _name in _template :
  23. _item = _template[_name] #copy.deepcopy(_template[_name])
  24. if _primaryKey not in _item and type(_item) == dict:
  25. _item[_primaryKey] = ''
  26. _tables.append({_name:_item})
  27. else:
  28. _main[_name] = ''
  29. _name = getContext(_x12)
  30. _tables += [{_name:_main}]
  31. _template[_name] = _main
  32. return _template #_tables
  33. def getContext(_x12) :
  34. return 'claims' if _x12 == '837' else 'remits'
  35. def format(**_args) :
  36. """
  37. :rows rows for the
  38. :primary_key primary_key field name
  39. :x12 file format
  40. """
  41. # _name = _args['table']
  42. _rows = _args['rows']
  43. _primary_key = _args['primary_key']
  44. _x12 = _args['x12']
  45. _mainTableName = getContext(_x12)
  46. _tables = {_mainTableName:[]}
  47. for _claim in _rows :
  48. # #
  49. # # Turn the claim into a relational model ...
  50. # #
  51. _main = {}
  52. _pkvalue = None
  53. if _primary_key in _claim :
  54. _pkvalue = _claim[_primary_key]
  55. for _attrName in _claim :
  56. _item = _claim[_attrName]
  57. _item = update(_item,_primary_key,_pkvalue)
  58. #
  59. # We have a casting problem, with relational data-store and JSON objects
  60. #
  61. if type(_item) == str and (_item.startswith("{") or _item.startswith("{")) :
  62. try:
  63. _item = json.loads(_item)
  64. except Exception as ee :
  65. # print (ee)
  66. pass
  67. if _attrName not in _tables and type(_item) in [dict,list]:
  68. _tables[_attrName] = []
  69. if type(_item) in [dict,list] :
  70. _tables[_attrName] += _item if type(_item) == list else [_item]
  71. pass
  72. else:
  73. #
  74. # This section suggests we found a main table attribute
  75. _main[_attrName] = _item
  76. _tables[_mainTableName].append(_main)
  77. return _tables
  78. def update (_item,key,value):
  79. if type(_item) not in [dict,list] :
  80. return _item
  81. if type(_item) == dict :
  82. _item[key] = value
  83. else:
  84. #
  85. # List, we will go through every item and update accordingly
  86. _index = 0
  87. for _row in _item :
  88. if type(_row) == dict :
  89. _row['_index'] = _index
  90. _row[key] = value
  91. return _item
  92. def init(**_args):
  93. """
  94. This function will kick off the export process provided claims/remits and the loaded plugins (not sure why)
  95. It requires the data it is pulling to be consistently formatted (otherwise nothing can be done)
  96. :plugins
  97. :store data store information i.e {source,target} specifications for data-transport
  98. :x12 file type i.e 837|835
  99. """
  100. _file_type = _args['x12']
  101. _plugins = _args['plugins']
  102. _store = _args['store']
  103. _default = build(plugins=_plugins,x12=_file_type)
  104. _logger = X12Logger(store = _store['source'])
  105. _df = read(store = _store['source'],x12=_file_type)
  106. #
  107. # @LOG :
  108. if _logger :
  109. _logger.log(module='init',action='export-init',data={'rows':_df.shape[0],'attributes':list(_df.columns)})
  110. _pkey = util.getPrimaryKey(plugins = _plugins, x12=_file_type)
  111. SEGMENTS = 4 # arbitrary choice
  112. _indexes = np.array_split(np.arange(_df.shape[0]),SEGMENTS)
  113. jobs = []
  114. _tables = {}
  115. for _ii in _indexes :
  116. try:
  117. _data = format(rows= _df.iloc[_ii].to_dict(orient='records'),x12=_file_type,primary_key=_pkey)
  118. _thread = Process(target=post,args=({'store':_store['target'],'data':_data,'default':_default,'x12':_file_type},))
  119. _thread.start()
  120. jobs.append(_thread)
  121. _tables = list(_data.keys())
  122. except Exception as e:
  123. #
  124. # Log: sigment,
  125. print (e)
  126. pass
  127. #
  128. # @LOG :
  129. if _logger :
  130. _logger.log(module='init',action='export-wait',data={'jobs':len(jobs),'tables':_tables})
  131. if jobs :
  132. # jobs[0].start()
  133. # jobs[0].join()
  134. while jobs :
  135. jobs = [thread for thread in jobs if thread.is_alive()]
  136. time.sleep(1)
  137. def read (**_args):
  138. _store = copy.copy(_args['store'])
  139. _x12 = _args['x12']
  140. _store['table'] = getContext(_x12) #'claims' if _x12 == '837' else 'remits'
  141. _store['context'] = 'read'
  142. reader = transport.factory.instance(**_store)
  143. #
  144. # @TODO: reading should support streaming (for scalability)
  145. _df = reader.read()
  146. return _df
  147. def post(_args):
  148. _data = _args['data']
  149. _store = _args['store']
  150. _store['context'] = 'write'
  151. _default = _args['default']
  152. _prefix = 'clm_' if _args['x12'] == '837' else 'rem_'
  153. # if 'claims' in _data or 'remits' in _data :
  154. # _key = 'claims' if 'claims' in _data else 'remits'
  155. # _data = _data[_key]
  156. for _name in _data :
  157. _tablename = _prefix+_name
  158. _store['table'] = _tablename if _name not in ['remits','claims'] else _name
  159. _store['context']='write'
  160. _store['lock'] = True
  161. writer = transport.factory.instance(**_store)
  162. if len(_data[_name]) == 0 and _name in _default and not writer.has(table=_tablename):
  163. _rows = [_default[_name]]
  164. else:
  165. _rows = _data[_name]
  166. writer.write(pd.DataFrame(_rows).fillna(''))
  167. if hasattr(writer,'close') :
  168. writer.close()
  169. #
  170. # Have a logger here to log what's going on ...
  171. # _xwriter = trasnport.factory.instance(**_store)
  172. # _xwriter.write(_df)
  173. # _info = format()
  174. pass