123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- #!/usr/bin/env python
- __doc__ = """
- (c) 2018 - 2021 data-transport
- steve@the-phi.com, The Phi Technology LLC
- https://dev.the-phi.com/git/steve/data-transport.git
- This program performs ETL between 9 supported data sources : Couchdb, Mongodb, Mysql, Mariadb, PostgreSQL, Netezza,Redshift, Sqlite, File
- Usage :
- transport --config <path-to-file.json> --procs <number-procs>
- @TODO: Create tables if they don't exist for relational databases
- """
- import pandas as pd
- import numpy as np
- import json
- import sys
- import transport
- import time
- from multiprocessing import Process
- SYS_ARGS = {}
- if len(sys.argv) > 1:
-
- N = len(sys.argv)
- for i in range(1,N):
- value = None
- if sys.argv[i].startswith('--'):
- key = sys.argv[i][2:] #.replace('-','')
- SYS_ARGS[key] = 1
- if i + 1 < N:
- value = sys.argv[i + 1] = sys.argv[i+1].strip()
- if key and value and not value.startswith('--'):
- SYS_ARGS[key] = value
-
-
- i += 2
- class Post(Process):
- def __init__(self,**args):
- super().__init__()
- self.PROVIDER = args['target']['type']
- self.writer = transport.factory.instance(**args['target'])
- self.rows = args['rows']
- def run(self):
- _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows
- self.writer.write(_info)
- self.writer.close()
-
- class ETL (Process):
- def __init__(self,**_args):
- super().__init__()
- self.name = _args['id']
- self.reader = transport.factory.instance(**_args['source'])
- self._oargs = _args['target'] #transport.factory.instance(**_args['target'])
- self.JOB_COUNT = _args['jobs']
- self.jobs = []
- # self.logger = transport.factory.instance(**_args['logger'])
- def log(self,**_args) :
- _args['name'] = self.name
- print (_args)
- def run(self):
- idf = self.reader.read()
- idf = pd.DataFrame(idf)
- idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()]
- self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT)
- #
- # writing the data to a designated data source
- #
- try:
- self.log(module='write',action='partitioning')
- rows = np.array_split(np.arange(idf.shape[0]),self.JOB_COUNT)
-
- for i in rows :
- _id = 'segment #'.join([str(rows.index(i)),self.name])
- segment = idf.loc[i,:] #.to_dict(orient='records')
- proc = Post(target = self._oargs,rows = segment,name=_id)
- self.jobs.append(proc)
- proc.start()
- self.log(module='write',action='working ...',name=self.name)
-
- except Exception as e:
- print (e)
-
- def is_done(self):
- self.jobs = [proc for proc in self.jobs if proc.is_alive()]
- return len(self.jobs) == 0
- def apply(_args) :
- """
- This function will apply a set of commands against a data-store. The expected structure is as follows :
- {"store":...,"apply":[]}
- """
- handler = transport.factory.instance(**_args['store'])
- for cmd in _args['apply'] :
- handler.apply(cmd)
- handler.close()
- if __name__ == '__main__' :
- _info = json.loads(open (SYS_ARGS['config']).read())
- index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else None
- procs = []
- for _config in _info :
- if 'source' in SYS_ARGS :
- _config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}}
- _config['jobs'] = 10 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs'])
- etl = ETL (**_config)
- etl.start()
- procs.append(etl)
- if index and _info.index(_config) == index :
- break
- #
- #
- N = len(procs)
- while procs :
- procs = [thread for thread in procs if not thread.is_done()]
- if len(procs) < N :
- print (["Finished ",(N-len(procs)), " remaining ", len(procs)])
- N = len(procs)
- time.sleep(1)
- print ("We're done !!")
|