publish.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import copy
  2. from . import util
  3. import transport
  4. import numpy as np
  5. import time
  6. import pandas as pd
  7. from multiprocessing import Process
  8. import json
  9. from healthcareio.logger import X12Logger
  10. def build (**_args):
  11. """
  12. This function will build SQL statements to create a table (perhaps not needed)
  13. :plugins loaded plugins
  14. :x12 837|835 file types
  15. """
  16. _plugins=_args['plugins']
  17. _x12 = _args['x12']
  18. _template = util.template(plugins=_plugins)[_x12]
  19. _primaryKey = util.getPrimaryKey(plugins=_plugins,x12=_x12)
  20. _tables = {}
  21. _main = {}
  22. for _name in _template :
  23. _item = _template[_name] #copy.deepcopy(_template[_name])
  24. if _name not in _tables and type(_item) in [list,dict] :
  25. _tables[_name] = _item
  26. if _primaryKey not in _item and type(_item) == dict:
  27. _item[_primaryKey] = ''
  28. # _tables.append({_name:_item})
  29. _tables[_name] = _item
  30. else:
  31. _main[_name] = ''
  32. _name = getContext(_x12)
  33. # _tables += [{_name:_main}]
  34. _tables[_name] = _main
  35. _template[_name] = _main
  36. return _tables #_template #_tables
  37. def getContext(_x12) :
  38. return 'claims' if _x12 == '837' else 'remits'
  39. def format(**_args) :
  40. """
  41. :rows rows for the
  42. :primary_key primary_key field name
  43. :x12 file format
  44. """
  45. # _name = _args['table']
  46. _rows = _args['rows']
  47. _primary_key = _args['primary_key']
  48. _x12 = _args['x12']
  49. _mainTableName = getContext(_x12)
  50. _tables = {_mainTableName:[]}
  51. for _claim in _rows :
  52. # #
  53. # # Turn the claim into a relational model ...
  54. # #
  55. _main = {}
  56. _pkvalue = None
  57. if _primary_key in _claim :
  58. _pkvalue = _claim[_primary_key]
  59. for _attrName in _claim :
  60. _item = _claim[_attrName]
  61. # _item = update(_item,_primary_key,_pkvalue)
  62. #
  63. # We have a casting problem, with relational data-store and JSON objects
  64. #
  65. if type(_item) == str and (_item.startswith("[") or _item.startswith("{")) :
  66. try:
  67. _item = json.loads(_item)
  68. _item = update(_item,_primary_key,_pkvalue)
  69. except Exception as ee :
  70. # print (ee)
  71. pass
  72. if _attrName not in _tables and type(_item) in [dict,list]:
  73. _tables[_attrName] = []
  74. if type(_item) in [dict,list] :
  75. _tables[_attrName] += _item if type(_item) == list else [_item]
  76. pass
  77. else:
  78. #
  79. # This section suggests we found a main table attribute
  80. _main[_attrName] = _item
  81. _tables[_mainTableName].append(_main)
  82. return _tables
  83. def update (_item,key,value):
  84. if type(_item) not in [dict,list] :
  85. return _item
  86. if type(_item) == dict :
  87. _item[key] = value
  88. else:
  89. #
  90. # List, we will go through every item and update accordingly
  91. _index = 0
  92. for _row in _item :
  93. if type(_row) == dict :
  94. _row['_index'] = _index
  95. _row[key] = value
  96. return _item
  97. def init(**_args):
  98. """
  99. This function will kick off the export process provided claims/remits and the loaded plugins (not sure why)
  100. It requires the data it is pulling to be consistently formatted (otherwise nothing can be done)
  101. :plugins
  102. :store data store information i.e {source,target} specifications for data-transport
  103. :x12 file type i.e 837|835
  104. """
  105. _file_type = _args['x12']
  106. _plugins = _args['plugins']
  107. _store = _args['store']
  108. _default = build(plugins=_plugins,x12=_file_type)
  109. _logger = X12Logger(store = _store['source'])
  110. _df = read(store = _store['source'],x12=_file_type)
  111. #
  112. # @LOG :
  113. if _logger :
  114. _logger.log(module='init',action='export-init',data={'rows':_df.shape[0],'attributes':list(_df.columns)})
  115. _pkey = util.getPrimaryKey(plugins = _plugins, x12=_file_type)
  116. SEGMENTS = 4 # arbitrary choice
  117. _indexes = np.array_split(np.arange(_df.shape[0]),SEGMENTS)
  118. jobs = []
  119. _tables = {}
  120. for _ii in _indexes :
  121. try:
  122. _data = format(rows= _df.iloc[_ii].to_dict(orient='records'),x12=_file_type,primary_key=_pkey)
  123. _thread = Process(target=post,args=({'store':_store['target'],'data':_data,'default':_default,'x12':_file_type},))
  124. _thread.start()
  125. jobs.append(_thread)
  126. _tables = list(_data.keys())
  127. except Exception as e:
  128. #
  129. # Log: sigment,
  130. print (e)
  131. pass
  132. #
  133. # @LOG :
  134. if _logger :
  135. _logger.log(module='init',action='export-wait',data={'jobs':len(jobs),'tables':_tables})
  136. if jobs :
  137. # jobs[0].start()
  138. # jobs[0].join()
  139. while jobs :
  140. jobs = [thread for thread in jobs if thread.is_alive()]
  141. time.sleep(1)
  142. def read (**_args):
  143. _store = copy.copy(_args['store'])
  144. _x12 = _args['x12']
  145. _store['table'] = getContext(_x12) #'claims' if _x12 == '837' else 'remits'
  146. _store['context'] = 'read'
  147. reader = transport.factory.instance(**_store)
  148. #
  149. # @TODO: reading should support streaming (for scalability)
  150. _df = reader.read()
  151. return _df
  152. def post(_args):
  153. _data = _args['data']
  154. _store = _args['store']
  155. _store['context'] = 'write'
  156. _default = _args['default']
  157. _prefix = 'clm_' if _args['x12'] == '837' else 'rem_'
  158. # if 'claims' in _data or 'remits' in _data :
  159. # _key = 'claims' if 'claims' in _data else 'remits'
  160. # _data = _data[_key]
  161. for _name in _data :
  162. _tablename = _prefix+_name
  163. _store['table'] = _tablename if _name not in ['remits','claims'] else _name
  164. _store['context']='write'
  165. _store['lock'] = True
  166. writer = transport.factory.instance(**_store)
  167. if len(_data[_name]) == 0 and _name in _default and not writer.has(table=_tablename):
  168. _rows = [_default[_name]]
  169. else:
  170. _rows = _data[_name]
  171. writer.write(pd.DataFrame(_rows).fillna(''))
  172. if hasattr(writer,'close') :
  173. writer.close()
  174. #
  175. # Have a logger here to log what's going on ...
  176. # _xwriter = trasnport.factory.instance(**_store)
  177. # _xwriter.write(_df)
  178. # _info = format()
  179. pass