transport 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. #!/usr/bin/env python
  2. __doc__ = """
  3. (c) 2018 - 2021 data-transport
  4. steve@the-phi.com, The Phi Technology LLC
  5. https://dev.the-phi.com/git/steve/data-transport.git
  6. This program performs ETL between 9 supported data sources : Couchdb, Mongodb, Mysql, Mariadb, PostgreSQL, Netezza,Redshift, Sqlite, File
  7. Usage :
  8. transport --config <path-to-file.json> --procs <number-procs>
  9. @TODO: Create tables if they don't exist for relational databases
  10. """
  11. import pandas as pd
  12. import numpy as np
  13. import json
  14. import sys
  15. import transport
  16. import time
  17. from multiprocessing import Process
  18. SYS_ARGS = {}
  19. if len(sys.argv) > 1:
  20. N = len(sys.argv)
  21. for i in range(1,N):
  22. value = None
  23. if sys.argv[i].startswith('--'):
  24. key = sys.argv[i][2:] #.replace('-','')
  25. SYS_ARGS[key] = 1
  26. if i + 1 < N:
  27. value = sys.argv[i + 1] = sys.argv[i+1].strip()
  28. if key and value and not value.startswith('--'):
  29. SYS_ARGS[key] = value
  30. i += 2
  31. class Post(Process):
  32. def __init__(self,**args):
  33. super().__init__()
  34. self.PROVIDER = args['target']['type']
  35. self.writer = transport.factory.instance(**args['target'])
  36. self.rows = args['rows']
  37. def run(self):
  38. _info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows
  39. self.writer.write(_info)
  40. self.writer.close()
  41. class ETL (Process):
  42. def __init__(self,**_args):
  43. super().__init__()
  44. self.name = _args['id']
  45. self.reader = transport.factory.instance(**_args['source'])
  46. self._oargs = _args['target'] #transport.factory.instance(**_args['target'])
  47. self.JOB_COUNT = _args['jobs']
  48. self.jobs = []
  49. # self.logger = transport.factory.instance(**_args['logger'])
  50. def log(self,**_args) :
  51. _args['name'] = self.name
  52. print (_args)
  53. def run(self):
  54. idf = self.reader.read()
  55. idf = pd.DataFrame(idf)
  56. idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()]
  57. self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT)
  58. #
  59. # writing the data to a designated data source
  60. #
  61. try:
  62. self.log(module='write',action='partitioning')
  63. rows = np.array_split(np.arange(idf.shape[0]),self.JOB_COUNT)
  64. for i in rows :
  65. _id = 'segment #'.join([str(rows.index(i)),self.name])
  66. segment = idf.loc[i,:] #.to_dict(orient='records')
  67. proc = Post(target = self._oargs,rows = segment,name=_id)
  68. self.jobs.append(proc)
  69. proc.start()
  70. self.log(module='write',action='working ...',name=self.name)
  71. except Exception as e:
  72. print (e)
  73. def is_done(self):
  74. self.jobs = [proc for proc in self.jobs if proc.is_alive()]
  75. return len(self.jobs) == 0
  76. def apply(_args) :
  77. """
  78. This function will apply a set of commands against a data-store. The expected structure is as follows :
  79. {"store":...,"apply":[]}
  80. """
  81. handler = transport.factory.instance(**_args['store'])
  82. for cmd in _args['apply'] :
  83. handler.apply(cmd)
  84. handler.close()
  85. if __name__ == '__main__' :
  86. _info = json.loads(open (SYS_ARGS['config']).read())
  87. index = int(SYS_ARGS['index']) if 'index' in SYS_ARGS else None
  88. procs = []
  89. for _config in _info :
  90. if 'source' in SYS_ARGS :
  91. _config['source'] = {"type":"disk.DiskReader","args":{"path":SYS_ARGS['source'],"delimiter":","}}
  92. _config['jobs'] = 10 if 'jobs' not in SYS_ARGS else int(SYS_ARGS['jobs'])
  93. etl = ETL (**_config)
  94. etl.start()
  95. procs.append(etl)
  96. if index and _info.index(_config) == index :
  97. break
  98. #
  99. #
  100. N = len(procs)
  101. while procs :
  102. procs = [thread for thread in procs if not thread.is_done()]
  103. if len(procs) < N :
  104. print (["Finished ",(N-len(procs)), " remaining ", len(procs)])
  105. N = len(procs)
  106. time.sleep(1)
  107. print ("We're done !!")