12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- #!/usr/bin/env python
- __doc__ = """
- (c) 2018 - 2021 data-transport
- steve@the-phi.com, The Phi Technology LLC
- https://dev.the-phi.com/git/steve/data-transport.git
- This program performs ETL between 9 supported data sources : Couchdb, Mongodb, Mysql, Mariadb, PostgreSQL, Netezza,Redshift, Sqlite, File
- Usage :
- transport --config <path-to-file.json> --procs <number-procs>
- @TODO: Create tables if they don't exist for relational databases
- """
- import pandas as pd
- import numpy as np
- import json
- import sys
- import transport
- import time
- from multiprocessing import Process
- SYS_ARGS = {}
- if len(sys.argv) > 1:
-
- N = len(sys.argv)
- for i in range(1,N):
- value = None
- if sys.argv[i].startswith('--'):
- key = sys.argv[i][2:] #.replace('-','')
- SYS_ARGS[key] = 1
- if i + 1 < N:
- value = sys.argv[i + 1] = sys.argv[i+1].strip()
- if key and value and not value.startswith('--'):
- SYS_ARGS[key] = value
-
-
- i += 2
- class Post(Process):
- def __init__(self,**args):
- super().__init__()
- self.PROVIDER = args['target']['type']
- self.writer = transport.factory.instance(**args['target'])
- self.rows = args['rows']
- def run(self):
- _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows
- self.writer.write(_info)
- self.writer.close()
-
- class ETL (Process):
- def __init__(self,**_args):
- super().__init__()
- self.name = _args['id']
- self.reader = transport.factory.instance(**_args['source'])
- self._oargs = _args['target'] #transport.factory.instance(**_args['target'])
- self.JOB_COUNT = _args['jobs']
- # self.logger = transport.factory.instance(**_args['logger'])
- def log(self,**_args) :
- _args['name'] = self.name
- print (_args)
- def run(self):
- idf = self.reader.read()
- idf = pd.DataFrame(idf)
- idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()]
- self.log(rows=idf.shape[0],cols=idf.shape[1])
- #
- # writing the data to a designated data source
- #
- try:
- self.log(module='write',action='partitioning')
- rows = np.array_split(np.arange(idf.shape[0]),self.JOB_COUNT)
- jobs = []
- for i in rows :
- segment = idf.loc[i,:].to_dict(orient='records')
- proc = Post(target = self._oargs,rows = segment)
- jobs.append(proc)
- proc.start()
- self.log(module='write',action='working ...')
- while jobs :
- jobs = [proc for proc in jobs if proc.is_alive()]
- time.sleep(2)
- self.log(module='write',action='completed')
- except Exception as e:
- print (e)
- if __name__ == '__main__' :
- _config = json.loads(open (SYS_ARGS['config']).read())
- _config['jobs'] = 10 if 'jobs' not in SYS_ARGS else SYS_ARGS['jobs']
-
- for _config in _info :
- etl = ETL (**_config)
- etl.start()
|