|
@@ -0,0 +1,154 @@
|
|
|
|
+import copy
|
|
|
|
+from . import util
|
|
|
|
+import transport
|
|
|
|
+import numpy as np
|
|
|
|
+import time
|
|
|
|
+import pandas as pd
|
|
|
|
+from multiprocessing import Process
|
|
|
|
+
|
|
|
|
+def build (**_args):
|
|
|
|
+ """
|
|
|
|
+ This function will build SQL statements to create a table (perhaps not needed)
|
|
|
|
+ :plugins loaded plugins
|
|
|
|
+ :x12 837|835 file types
|
|
|
|
+ """
|
|
|
|
+ _plugins=_args['plugins']
|
|
|
|
+ _x12 = _args['x12']
|
|
|
|
+ _template = util.template(plugins=_plugins)[_x12]
|
|
|
|
+ _primaryKey = util.getPrimaryKey(plugins=_plugins,x12=_x12)
|
|
|
|
+ _tables = []
|
|
|
|
+ _main = {}
|
|
|
|
+ for _name in _template :
|
|
|
|
+ _item = _template[_name] #copy.deepcopy(_template[_name])
|
|
|
|
+ if _primaryKey not in _item and type(_item) == dict:
|
|
|
|
+ _item[_primaryKey] = ''
|
|
|
|
+ _tables.append({_name:_item})
|
|
|
|
+ else:
|
|
|
|
+ _main[_name] = ''
|
|
|
|
+
|
|
|
|
+ _name = getContext(_x12)
|
|
|
|
+ _tables += [{_name:_main}]
|
|
|
|
+ _template[_name] = _main
|
|
|
|
+
|
|
|
|
+ return _template #_tables
|
|
|
|
+def getContext(_x12) :
|
|
|
|
+ return 'claims' if _x12 == '837' else 'remits'
|
|
|
|
+def format(**_args) :
|
|
|
|
+ """
|
|
|
|
+ :rows rows for the
|
|
|
|
+ :primary_key primary_key field name
|
|
|
|
+ :x12 file format
|
|
|
|
+ """
|
|
|
|
+
|
|
|
|
+ # _name = _args['table']
|
|
|
|
+ _rows = _args['rows']
|
|
|
|
+
|
|
|
|
+ _primary_key = _args['primary_key']
|
|
|
|
+ _x12 = _args['x12']
|
|
|
|
+ _mainTableName = getContext(_x12)
|
|
|
|
+ _tables = {_mainTableName:[]}
|
|
|
|
+
|
|
|
|
+ for _claim in _rows :
|
|
|
|
+ # #
|
|
|
|
+ # # Turn the claim into a relational model ...
|
|
|
|
+ # #
|
|
|
|
+ _main = {}
|
|
|
|
+ _pkvalue = None
|
|
|
|
+ if _primary_key in _claim :
|
|
|
|
+ _pkvalue = _claim[_primary_key]
|
|
|
|
+
|
|
|
|
+ for _attrName in _claim :
|
|
|
|
+ _item = _claim[_attrName]
|
|
|
|
+ _item = update(_item,_primary_key,_pkvalue)
|
|
|
|
+
|
|
|
|
+ if _attrName not in _tables and type(_item) in [dict,list]:
|
|
|
|
+ _tables[_attrName] = []
|
|
|
|
+ if type(_item) in [dict,list] :
|
|
|
|
+ _tables[_attrName] += _item if type(_item) == list else [_item]
|
|
|
|
+ else:
|
|
|
|
+ #
|
|
|
|
+ # This section suggests we found a main table attribute
|
|
|
|
+ _main[_attrName] = _item
|
|
|
|
+ _tables[_mainTableName].append(_main)
|
|
|
|
+
|
|
|
|
+ return _tables
|
|
|
|
+def update (_item,key,value):
|
|
|
|
+ if type(_item) not in [dict,list] :
|
|
|
|
+ return _item
|
|
|
|
+ if type(_item) == dict :
|
|
|
|
+ _item[key] = value
|
|
|
|
+ else:
|
|
|
|
+ #
|
|
|
|
+ # List, we will go through every item and update accordingly
|
|
|
|
+ _index = 0
|
|
|
|
+ for _row in _item :
|
|
|
|
+ if type(_row) == dict :
|
|
|
|
+ _row['_index'] = _index
|
|
|
|
+ _row[key] = value
|
|
|
|
+ return _item
|
|
|
|
+def init(**_args):
|
|
|
|
+ """
|
|
|
|
+ This function will kick off the export process provided claims/remits and the loaded plugins (not sure why)
|
|
|
|
+ It requires the data it is pulling to be consistently formatted (otherwise nothing can be done)
|
|
|
|
+ :plugins
|
|
|
|
+ :store data store information i.e {source,target} specifications for data-transport
|
|
|
|
+ :x12 file type i.e 837|835
|
|
|
|
+ """
|
|
|
|
+ _file_type = _args['x12']
|
|
|
|
+ _plugins = _args['plugins']
|
|
|
|
+ _store = _args['store']
|
|
|
|
+ _default = build(plugins=_plugins,x12=_file_type)
|
|
|
|
+
|
|
|
|
+ _df = read(store = _store['source'],x12=_file_type)
|
|
|
|
+
|
|
|
|
+ _pkey = util.getPrimaryKey(plugins = _plugins, x12=_file_type)
|
|
|
|
+ SEGMENTS = 4 # arbitrary choice
|
|
|
|
+ _indexes = np.array_split(np.arange(_df.shape[0]),SEGMENTS)
|
|
|
|
+ jobs = []
|
|
|
|
+ for _ii in _indexes :
|
|
|
|
+
|
|
|
|
+ _data = format(rows= _df.iloc[_ii].to_dict(orient='records'),x12=_file_type,primary_key=_pkey)
|
|
|
|
+
|
|
|
|
+ _thread = Process(target=post,args=({'store':_store['target'],'data':_data,'default':_default},))
|
|
|
|
+ jobs.append(_thread)
|
|
|
|
+ if jobs :
|
|
|
|
+ jobs[0].start()
|
|
|
|
+ jobs[0].join()
|
|
|
|
+ while jobs :
|
|
|
|
+ jobs = [thread for thread in jobs if thread.is_alive()]
|
|
|
|
+ time.sleep(1)
|
|
|
|
+
|
|
|
|
+def read (**_args):
|
|
|
|
+ _store = copy.copy(_args['store'])
|
|
|
|
+ _x12 = _args['x12']
|
|
|
|
+ _store['table'] = getContext(_x12) #'claims' if _x12 == '837' else 'remits'
|
|
|
|
+ reader = transport.factory.instance(**_store)
|
|
|
|
+ #
|
|
|
|
+ # @TODO: reading should support streaming (for scalability)
|
|
|
|
+ _df = reader.read()
|
|
|
|
+
|
|
|
|
+ return _df
|
|
|
|
+
|
|
|
|
+def post(_args):
|
|
|
|
+ _data = _args['data']
|
|
|
|
+ _store = _args['store']
|
|
|
|
+ _default = _args['default']
|
|
|
|
+
|
|
|
|
+ for _name in _data :
|
|
|
|
+ _store['table'] = _name
|
|
|
|
+ _store['context']='write'
|
|
|
|
+ writer = transport.factory.instance(**_store)
|
|
|
|
+ if len(_data[_name]) == 0 and _name in _default:
|
|
|
|
+ _rows = [_default[_name]]
|
|
|
|
+ else:
|
|
|
|
+ _rows = _data[_name]
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ writer.write(_rows)
|
|
|
|
+ if hasattr(writer,'close') :
|
|
|
|
+ writer.close()
|
|
|
|
+
|
|
|
|
+ # _xwriter = trasnport.factory.instance(**_store)
|
|
|
|
+ # _xwriter.write(_df)
|
|
|
|
+ # _info = format()
|
|
|
|
+ pass
|