transport 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. #!/usr/bin/env python
  2. __doc__ = """
  3. (c) 2018 - 2021 data-transport
  4. steve@the-phi.com, The Phi Technology LLC
  5. https://dev.the-phi.com/git/steve/data-transport.git
  6. This program performs ETL between 9 supported data sources : Couchdb, Mongodb, Mysql, Mariadb, PostgreSQL, Netezza,Redshift, Sqlite, File
  7. Usage :
  8. transport --config <path-to-file.json> --procs <number-procs>
  9. @TODO: Create tables if they don't exist for relational databases
  10. """
  11. import pandas as pd
  12. import numpy as np
  13. import json
  14. import sys
  15. import transport
  16. import time
  17. from multiprocessing import Process
  18. SYS_ARGS = {}
  19. if len(sys.argv) > 1:
  20. N = len(sys.argv)
  21. for i in range(1,N):
  22. value = None
  23. if sys.argv[i].startswith('--'):
  24. key = sys.argv[i][2:] #.replace('-','')
  25. SYS_ARGS[key] = 1
  26. if i + 1 < N:
  27. value = sys.argv[i + 1] = sys.argv[i+1].strip()
  28. if key and value and not value.startswith('--'):
  29. SYS_ARGS[key] = value
  30. i += 2
  31. class Post(Process):
  32. def __init__(self,**args):
  33. super().__init__()
  34. self.PROVIDER = args['target']['type']
  35. self.writer = transport.factory.instance(**args['target'])
  36. self.rows = args['rows']
  37. def run(self):
  38. _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows
  39. self.writer.write(_info)
  40. self.writer.close()
  41. class ETL (Process):
  42. def __init__(self,**_args):
  43. super().__init__()
  44. self.name = _args['id']
  45. self.reader = transport.factory.instance(**_args['source'])
  46. self._oargs = _args['target'] #transport.factory.instance(**_args['target'])
  47. self.JOB_COUNT = _args['jobs']
  48. # self.logger = transport.factory.instance(**_args['logger'])
  49. def log(self,**_args) :
  50. _args['name'] = self.name
  51. print (_args)
  52. def run(self):
  53. idf = self.reader.read()
  54. idf = pd.DataFrame(idf)
  55. idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()]
  56. self.log(rows=idf.shape[0],cols=idf.shape[1])
  57. #
  58. # writing the data to a designated data source
  59. #
  60. try:
  61. self.log(module='write',action='partitioning')
  62. rows = np.array_split(np.arange(idf.shape[0]),self.JOB_COUNT)
  63. jobs = []
  64. for i in rows :
  65. segment = idf.loc[i,:].to_dict(orient='records')
  66. proc = Post(target = self._oargs,rows = segment)
  67. jobs.append(proc)
  68. proc.start()
  69. self.log(module='write',action='working ...')
  70. while jobs :
  71. jobs = [proc for proc in jobs if proc.is_alive()]
  72. time.sleep(2)
  73. self.log(module='write',action='completed')
  74. except Exception as e:
  75. print (e)
  76. if __name__ == '__main__' :
  77. _config = json.loads(open (SYS_ARGS['config']).read())
  78. _config['jobs'] = 10 if 'jobs' not in SYS_ARGS else SYS_ARGS['jobs']
  79. for _config in _info :
  80. etl = ETL (**_config)
  81. etl.start()