123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- import copy
- from . import util
- import transport
- import numpy as np
- import time
- import pandas as pd
- from multiprocessing import Process
- def build (**_args):
- """
- This function will build SQL statements to create a table (perhaps not needed)
- :plugins loaded plugins
- :x12 837|835 file types
- """
- _plugins=_args['plugins']
- _x12 = _args['x12']
- _template = util.template(plugins=_plugins)[_x12]
- _primaryKey = util.getPrimaryKey(plugins=_plugins,x12=_x12)
- _tables = []
- _main = {}
- for _name in _template :
- _item = _template[_name] #copy.deepcopy(_template[_name])
- if _primaryKey not in _item and type(_item) == dict:
- _item[_primaryKey] = ''
- _tables.append({_name:_item})
- else:
- _main[_name] = ''
- _name = getContext(_x12)
- _tables += [{_name:_main}]
- _template[_name] = _main
- return _template #_tables
- def getContext(_x12) :
- return 'claims' if _x12 == '837' else 'remits'
- def format(**_args) :
- """
- :rows rows for the
- :primary_key primary_key field name
- :x12 file format
- """
- # _name = _args['table']
- _rows = _args['rows']
-
- _primary_key = _args['primary_key']
- _x12 = _args['x12']
- _mainTableName = getContext(_x12)
- _tables = {_mainTableName:[]}
-
- for _claim in _rows :
- # #
- # # Turn the claim into a relational model ...
- # #
- _main = {}
- _pkvalue = None
- if _primary_key in _claim :
- _pkvalue = _claim[_primary_key]
-
- for _attrName in _claim :
- _item = _claim[_attrName]
- _item = update(_item,_primary_key,_pkvalue)
-
- if _attrName not in _tables and type(_item) in [dict,list]:
- _tables[_attrName] = []
- if type(_item) in [dict,list] :
- _tables[_attrName] += _item if type(_item) == list else [_item]
- else:
- #
- # This section suggests we found a main table attribute
- _main[_attrName] = _item
- _tables[_mainTableName].append(_main)
-
- return _tables
- def update (_item,key,value):
- if type(_item) not in [dict,list] :
- return _item
- if type(_item) == dict :
- _item[key] = value
- else:
- #
- # List, we will go through every item and update accordingly
- _index = 0
- for _row in _item :
- if type(_row) == dict :
- _row['_index'] = _index
- _row[key] = value
- return _item
- def init(**_args):
- """
- This function will kick off the export process provided claims/remits and the loaded plugins (not sure why)
- It requires the data it is pulling to be consistently formatted (otherwise nothing can be done)
- :plugins
- :store data store information i.e {source,target} specifications for data-transport
- :x12 file type i.e 837|835
- """
- _file_type = _args['x12']
- _plugins = _args['plugins']
- _store = _args['store']
- _default = build(plugins=_plugins,x12=_file_type)
-
- _df = read(store = _store['source'],x12=_file_type)
-
- _pkey = util.getPrimaryKey(plugins = _plugins, x12=_file_type)
- SEGMENTS = 4 # arbitrary choice
- _indexes = np.array_split(np.arange(_df.shape[0]),SEGMENTS)
- jobs = []
- for _ii in _indexes :
-
- _data = format(rows= _df.iloc[_ii].to_dict(orient='records'),x12=_file_type,primary_key=_pkey)
-
- _thread = Process(target=post,args=({'store':_store['target'],'data':_data,'default':_default,'x12':_file_type},))
- jobs.append(_thread)
- if jobs :
- jobs[0].start()
- jobs[0].join()
- while jobs :
- jobs = [thread for thread in jobs if thread.is_alive()]
- time.sleep(1)
- def read (**_args):
- _store = copy.copy(_args['store'])
- _x12 = _args['x12']
- _store['table'] = getContext(_x12) #'claims' if _x12 == '837' else 'remits'
- reader = transport.factory.instance(**_store)
- #
- # @TODO: reading should support streaming (for scalability)
- _df = reader.read()
-
- return _df
- def post(_args):
- _data = _args['data']
- _store = _args['store']
- _default = _args['default']
- _prefix = 'clm_' if _args['x12'] == '837' else 'rem_'
- for _name in _data :
- _tablename = _prefix+_name
- _store['table'] = _tablename if _name not in ['remits','claims'] else _name
- _store['context']='write'
- writer = transport.factory.instance(**_store)
- if len(_data[_name]) == 0 and _name in _default and not writer.has(table=_tablename):
- _rows = [_default[_name]]
- else:
- _rows = _data[_name]
-
-
- writer.write(pd.DataFrame(_rows).fillna(''))
- if hasattr(writer,'close') :
- writer.close()
-
- # _xwriter = trasnport.factory.instance(**_store)
- # _xwriter.write(_df)
- # _info = format()
- pass
|