publish.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import copy
  2. from . import util
  3. import transport
  4. import numpy as np
  5. import time
  6. import pandas as pd
  7. from multiprocessing import Process
  8. import json
  9. def build (**_args):
  10. """
  11. This function will build SQL statements to create a table (perhaps not needed)
  12. :plugins loaded plugins
  13. :x12 837|835 file types
  14. """
  15. _plugins=_args['plugins']
  16. _x12 = _args['x12']
  17. _template = util.template(plugins=_plugins)[_x12]
  18. _primaryKey = util.getPrimaryKey(plugins=_plugins,x12=_x12)
  19. _tables = []
  20. _main = {}
  21. for _name in _template :
  22. _item = _template[_name] #copy.deepcopy(_template[_name])
  23. if _primaryKey not in _item and type(_item) == dict:
  24. _item[_primaryKey] = ''
  25. _tables.append({_name:_item})
  26. else:
  27. _main[_name] = ''
  28. _name = getContext(_x12)
  29. _tables += [{_name:_main}]
  30. _template[_name] = _main
  31. return _template #_tables
  32. def getContext(_x12) :
  33. return 'claims' if _x12 == '837' else 'remits'
  34. def format(**_args) :
  35. """
  36. :rows rows for the
  37. :primary_key primary_key field name
  38. :x12 file format
  39. """
  40. # _name = _args['table']
  41. _rows = _args['rows']
  42. _primary_key = _args['primary_key']
  43. _x12 = _args['x12']
  44. _mainTableName = getContext(_x12)
  45. _tables = {_mainTableName:[]}
  46. for _claim in _rows :
  47. # #
  48. # # Turn the claim into a relational model ...
  49. # #
  50. _main = {}
  51. _pkvalue = None
  52. if _primary_key in _claim :
  53. _pkvalue = _claim[_primary_key]
  54. for _attrName in _claim :
  55. _item = _claim[_attrName]
  56. _item = update(_item,_primary_key,_pkvalue)
  57. #
  58. # We have a casting problem, with relational data-store and JSON objects
  59. #
  60. if type(_item) == str and (_item.startswith("{") or _item.startswith("{")) :
  61. try:
  62. _item = json.loads(_item)
  63. except Exception as ee :
  64. # print (ee)
  65. pass
  66. if _attrName not in _tables and type(_item) in [dict,list]:
  67. _tables[_attrName] = []
  68. if type(_item) in [dict,list] :
  69. _tables[_attrName] += _item if type(_item) == list else [_item]
  70. pass
  71. else:
  72. #
  73. # This section suggests we found a main table attribute
  74. _main[_attrName] = _item
  75. _tables[_mainTableName].append(_main)
  76. return _tables
  77. def update (_item,key,value):
  78. if type(_item) not in [dict,list] :
  79. return _item
  80. if type(_item) == dict :
  81. _item[key] = value
  82. else:
  83. #
  84. # List, we will go through every item and update accordingly
  85. _index = 0
  86. for _row in _item :
  87. if type(_row) == dict :
  88. _row['_index'] = _index
  89. _row[key] = value
  90. return _item
  91. def init(**_args):
  92. """
  93. This function will kick off the export process provided claims/remits and the loaded plugins (not sure why)
  94. It requires the data it is pulling to be consistently formatted (otherwise nothing can be done)
  95. :plugins
  96. :store data store information i.e {source,target} specifications for data-transport
  97. :x12 file type i.e 837|835
  98. """
  99. _file_type = _args['x12']
  100. _plugins = _args['plugins']
  101. _store = _args['store']
  102. _default = build(plugins=_plugins,x12=_file_type)
  103. _df = read(store = _store['source'],x12=_file_type)
  104. _pkey = util.getPrimaryKey(plugins = _plugins, x12=_file_type)
  105. SEGMENTS = 4 # arbitrary choice
  106. _indexes = np.array_split(np.arange(_df.shape[0]),SEGMENTS)
  107. jobs = []
  108. for _ii in _indexes :
  109. try:
  110. _data = format(rows= _df.iloc[_ii].to_dict(orient='records'),x12=_file_type,primary_key=_pkey)
  111. _thread = Process(target=post,args=({'store':_store['target'],'data':_data,'default':_default,'x12':_file_type},))
  112. jobs.append(_thread)
  113. except Exception as e:
  114. #
  115. # Log: sigment,
  116. print (e)
  117. pass
  118. if jobs :
  119. jobs[0].start()
  120. jobs[0].join()
  121. while jobs :
  122. jobs = [thread for thread in jobs if thread.is_alive()]
  123. time.sleep(1)
  124. def read (**_args):
  125. _store = copy.copy(_args['store'])
  126. _x12 = _args['x12']
  127. _store['table'] = getContext(_x12) #'claims' if _x12 == '837' else 'remits'
  128. _store['context'] = 'read'
  129. reader = transport.factory.instance(**_store)
  130. #
  131. # @TODO: reading should support streaming (for scalability)
  132. _df = reader.read()
  133. return _df
  134. def post(_args):
  135. _data = _args['data']
  136. _store = _args['store']
  137. _store['context'] = 'write'
  138. _default = _args['default']
  139. _prefix = 'clm_' if _args['x12'] == '837' else 'rem_'
  140. # if 'claims' in _data or 'remits' in _data :
  141. # _key = 'claims' if 'claims' in _data else 'remits'
  142. # _data = _data[_key]
  143. for _name in _data :
  144. _tablename = _prefix+_name
  145. _store['table'] = _tablename if _name not in ['remits','claims'] else _name
  146. _store['context']='write'
  147. writer = transport.factory.instance(**_store)
  148. if len(_data[_name]) == 0 and _name in _default and not writer.has(table=_tablename):
  149. _rows = [_default[_name]]
  150. else:
  151. _rows = _data[_name]
  152. writer.write(pd.DataFrame(_rows).fillna(''))
  153. if hasattr(writer,'close') :
  154. writer.close()
  155. #
  156. # Have a logger here to log what's going on ...
  157. # _xwriter = trasnport.factory.instance(**_store)
  158. # _xwriter.write(_df)
  159. # _info = format()
  160. pass