|
@@ -39,22 +39,22 @@ import os
|
|
|
|
|
|
|
|
|
from multiprocessing import Process
|
|
|
-SYS_ARGS = {}
|
|
|
-if len(sys.argv) > 1:
|
|
|
+# SYS_ARGS = {}
|
|
|
+# if len(sys.argv) > 1:
|
|
|
|
|
|
- N = len(sys.argv)
|
|
|
- for i in range(1,N):
|
|
|
- value = None
|
|
|
- if sys.argv[i].startswith('--'):
|
|
|
- key = sys.argv[i][2:] #.replace('-','')
|
|
|
- SYS_ARGS[key] = 1
|
|
|
- if i + 1 < N:
|
|
|
- value = sys.argv[i + 1] = sys.argv[i+1].strip()
|
|
|
- if key and value and not value.startswith('--'):
|
|
|
- SYS_ARGS[key] = value
|
|
|
+# N = len(sys.argv)
|
|
|
+# for i in range(1,N):
|
|
|
+# value = None
|
|
|
+# if sys.argv[i].startswith('--'):
|
|
|
+# key = sys.argv[i][2:] #.replace('-','')
|
|
|
+# SYS_ARGS[key] = 1
|
|
|
+# if i + 1 < N:
|
|
|
+# value = sys.argv[i + 1] = sys.argv[i+1].strip()
|
|
|
+# if key and value and not value.startswith('--'):
|
|
|
+# SYS_ARGS[key] = value
|
|
|
|
|
|
|
|
|
- i += 2
|
|
|
+# i += 2
|
|
|
class Transporter(Process):
|
|
|
"""
|
|
|
The transporter (Jason Stathem) moves data from one persistant store to another
|
|
@@ -74,81 +74,72 @@ class Transporter(Process):
|
|
|
#
|
|
|
# Let's insure we can support multiple targets
|
|
|
self._target = [self._target] if type(self._target) != list else self._target
|
|
|
-
|
|
|
pass
|
|
|
- def read(self,**_args):
|
|
|
- """
|
|
|
- This function
|
|
|
- """
|
|
|
- _reader = transport.factory.instance(**self._source)
|
|
|
+ def run(self):
|
|
|
+
|
|
|
+ _reader = transport.get.etl(source=self._source,target=self._target)
|
|
|
#
|
|
|
- # If arguments are provided then a query is to be executed (not just a table dump)
|
|
|
if 'cmd' in self._source or 'query' in self._source :
|
|
|
_query = self._source['cmd'] if 'cmd' in self._source else self._source['query']
|
|
|
return _reader.read(**_query)
|
|
|
else:
|
|
|
return _reader.read()
|
|
|
- # return _reader.read() if 'query' not in self._source else _reader.read(**self._source['query'])
|
|
|
+
|
|
|
+ # def _read(self,**_args):
|
|
|
+ # """
|
|
|
+ # This function
|
|
|
+ # """
|
|
|
+ # _reader = transport.factory.instance(**self._source)
|
|
|
+ # #
|
|
|
+ # # If arguments are provided then a query is to be executed (not just a table dump)
|
|
|
+ # if 'cmd' in self._source or 'query' in self._source :
|
|
|
+ # _query = self._source['cmd'] if 'cmd' in self._source else self._source['query']
|
|
|
+ # return _reader.read(**_query)
|
|
|
+ # else:
|
|
|
+ # return _reader.read()
|
|
|
+ # # return _reader.read() if 'query' not in self._source else _reader.read(**self._source['query'])
|
|
|
|
|
|
- def _delegate_write(self,_data,**_args):
|
|
|
- """
|
|
|
- This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern
|
|
|
- :data data-frame or object to be written
|
|
|
- """
|
|
|
- if _data.shape[0] > 0 :
|
|
|
- for _target in self._target :
|
|
|
- if 'write' not in _target :
|
|
|
- _target['context'] = 'write'
|
|
|
- # _target['lock'] = True
|
|
|
- else:
|
|
|
- # _target['write']['lock'] = True
|
|
|
- pass
|
|
|
- _writer = transport.factory.instance(**_target)
|
|
|
- _writer.write(_data,**_args)
|
|
|
- if hasattr(_writer,'close') :
|
|
|
- _writer.close()
|
|
|
+ # def _delegate_write(self,_data,**_args):
|
|
|
+ # """
|
|
|
+ # This function will write a data-frame to a designated data-store, The function is built around a delegation design pattern
|
|
|
+ # :data data-frame or object to be written
|
|
|
+ # """
|
|
|
+ # if _data.shape[0] > 0 :
|
|
|
+ # for _target in self._target :
|
|
|
+ # if 'write' not in _target :
|
|
|
+ # _target['context'] = 'write'
|
|
|
+ # # _target['lock'] = True
|
|
|
+ # else:
|
|
|
+ # # _target['write']['lock'] = True
|
|
|
+ # pass
|
|
|
+ # _writer = transport.factory.instance(**_target)
|
|
|
+ # _writer.write(_data,**_args)
|
|
|
+ # if hasattr(_writer,'close') :
|
|
|
+ # _writer.close()
|
|
|
|
|
|
- def write(self,_df,**_args):
|
|
|
- """
|
|
|
- """
|
|
|
- SEGMENT_COUNT = 6
|
|
|
- MAX_ROWS = 1000000
|
|
|
- # _df = self.read()
|
|
|
- _segments = np.array_split(np.arange(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])])
|
|
|
- # _index = 0
|
|
|
+ # def write(self,_df,**_args):
|
|
|
+ # """
|
|
|
+ # """
|
|
|
+ # SEGMENT_COUNT = 6
|
|
|
+ # MAX_ROWS = 1000000
|
|
|
+ # # _df = self.read()
|
|
|
+ # _segments = np.array_split(np.arange(_df.shape[0]),SEGMENT_COUNT) if _df.shape[0] > MAX_ROWS else np.array( [np.arange(_df.shape[0])])
|
|
|
+ # # _index = 0
|
|
|
|
|
|
|
|
|
- for _indexes in _segments :
|
|
|
- _fwd_args = {} if not _args else _args
|
|
|
+ # for _indexes in _segments :
|
|
|
+ # _fwd_args = {} if not _args else _args
|
|
|
|
|
|
- self._delegate_write(_df.iloc[_indexes],**_fwd_args)
|
|
|
- time.sleep(1)
|
|
|
- #
|
|
|
- # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?)
|
|
|
- pass
|
|
|
+ # self._delegate_write(_df.iloc[_indexes],**_fwd_args)
|
|
|
+ # time.sleep(1)
|
|
|
+ # #
|
|
|
+ # # @TODO: Perhaps consider writing up each segment in a thread/process (speeds things up?)
|
|
|
+ # pass
|
|
|
|
|
|
def instance(**_args):
|
|
|
- _proxy = lambda _agent: _agent.write(_agent.read())
|
|
|
- if 'source' in _args and 'target' in _args :
|
|
|
-
|
|
|
- _agent = Transporter(**_args)
|
|
|
- _proxy(_agent)
|
|
|
-
|
|
|
- else:
|
|
|
- _config = _args['config']
|
|
|
- _items = [Transporter(**_item) for _item in _config ]
|
|
|
- _MAX_JOBS = 5
|
|
|
- _items = np.array_split(_items,_MAX_JOBS)
|
|
|
- for _batch in _items :
|
|
|
- jobs = []
|
|
|
- for _item in _batch :
|
|
|
- thread = Process(target=_proxy,args = (_item,))
|
|
|
- thread.start()
|
|
|
- jobs.append(thread)
|
|
|
- while jobs :
|
|
|
- jobs = [thread for thread in jobs if thread.is_alive()]
|
|
|
- time.sleep(1)
|
|
|
-
|
|
|
+ pthread = Transporter (**_args)
|
|
|
+ pthread.start()
|
|
|
+ return pthread
|
|
|
pass
|
|
|
# class Post(Process):
|
|
|
# def __init__(self,**args):
|
|
@@ -360,4 +351,4 @@ def instance(**_args):
|
|
|
# print (["Finished ",(N-len(procs)), " remaining ", len(procs)])
|
|
|
# N = len(procs)
|
|
|
# time.sleep(1)
|
|
|
-# # print ("We're done !!")
|
|
|
+# # print ("We're done !!")
|