瀏覽代碼

bug fix: etl engine, sqlite inserts

Steve Nyemba 3 年之前
父節點
當前提交
ccc05acc01
共有 4 個文件被更改,包括 74 次插入29 次删除
  1. 45 7
      bin/transport
  2. 1 1
      setup.py
  3. 13 14
      transport/__init__.py
  4. 15 7
      transport/disk.py

+ 45 - 7
bin/transport

@@ -16,7 +16,17 @@ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLI
 Usage :
 	transport --config <path-to-file.json> --procs <number-procs>
 @TODO: Create tables if they don't exist for relational databases
+example of configuration :
 
+1. Move data from a folder to a data-store
+	transport [--folder <path> ] --config <config.json>	 #-- assuming the configuration doesn't have folder 
+	transport --folder <path> --provider <postgresql|mongo|sqlite> --<database|db> <name> --table|doc <document_name>
+In this case the configuration should look like :
+	{folder:..., target:{}}
+2. Move data from one source to another
+	transport --config <file.json>
+	{source:{..},target:{..}} or [{source:{..},target:{..}},{source:{..},target:{..}}]
+	
 
 """
 import pandas as pd
@@ -46,11 +56,23 @@ if len(sys.argv) > 1:
 class Post(Process):
 	def __init__(self,**args):
 		super().__init__()
-		self.PROVIDER = args['target']['type']
-		self.writer = 	transport.factory.instance(**args['target'])
+		
+		if 'provider' not in args['target'] :
+			self.PROVIDER = args['target']['type']
+			self.writer = 	transport.factory.instance(**args['target'])
+		else:
+			self.PROVIDER = args['target']['provider']
+			args['target']['context'] = 'write'
+			
+			self.writer = transport.instance(**args['target'])
+		#
+		# If the table doesn't exists maybe create it ?
+		#
 		self.rows 	=	 args['rows']
+		
 	def run(self):
-		_info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows		
+		_info = {"values":self.rows} if 'couch' in self.PROVIDER else self.rows	
+		
 		self.writer.write(_info)
 		self.writer.close()
 
@@ -59,7 +81,19 @@ class ETL (Process):
 	def __init__(self,**_args):
 		super().__init__()
 		self.name 	= _args['id']
-		self.reader = transport.factory.instance(**_args['source'])
+		if 'provider' not in _args['source'] :
+			#@deprecate
+			self.reader = transport.factory.instance(**_args['source'])
+		else:
+			#
+			# This is the new interface
+			_args['source']['context'] = 'read'	
+			
+			self.reader = transport.instance(**_args['source'])
+		#
+		# do we have an sql query provided or not ....
+		# self.sql = _args['source']['sql'] if 'sql' in _args['source'] else None
+		self.cmd = _args['source']['cmd'] if 'cmd' in _args['source'] else None
 		self._oargs = _args['target'] #transport.factory.instance(**_args['target'])
 		self.JOB_COUNT =  _args['jobs']
 		self.jobs = []
@@ -68,8 +102,11 @@ class ETL (Process):
 		_args['name']  = self.name
 		print (_args)
 	def run(self):
-		idf = self.reader.read()
-		idf = pd.DataFrame(idf)
+		if self.cmd :
+			idf = self.reader.read(**self.cmd)
+		else:
+			idf = self.reader.read() 
+		idf = pd.DataFrame(idf)		
 		idf.columns = [str(name).replace("b'",'').replace("'","").strip() for name in idf.columns.tolist()]
 		self.log(rows=idf.shape[0],cols=idf.shape[1],jobs=self.JOB_COUNT)
 
@@ -79,7 +116,8 @@ class ETL (Process):
 		try:
 			self.log(module='write',action='partitioning')
 			rows = np.array_split(np.arange(idf.shape[0]),self.JOB_COUNT)
-			
+			#
+			# @TODO: locks
 			for i in rows :
 				_id = 'segment #'.join([str(rows.index(i)),self.name])
 				segment = idf.loc[i,:] #.to_dict(orient='records')

+ 1 - 1
setup.py

@@ -8,7 +8,7 @@ def read(fname):
     return open(os.path.join(os.path.dirname(__file__), fname)).read() 
 args    = {
     "name":"data-transport",
-    "version":"1.4.0",
+    "version":"1.4.1",
     "author":"The Phi Technology LLC","author_email":"info@the-phi.com",
     "license":"MIT",
     "packages":["transport"]}

+ 13 - 14
transport/__init__.py

@@ -71,16 +71,7 @@ from google.cloud import bigquery as bq
 import nzpy as nz   #--- netezza drivers
 import os
 
-RDBMS = {
-	
-    "postgresql":{"port":"5432","driver":pg},
-    "redshift":{"port":"5432","driver":pg},
-    "netezza":{"port":"5480","driver":nz},
-	"mysql":{"port":"3306","driver":my},
-	"mariadb":{"port":"3306","driver":my},
-	"mongodb":{"port":"27017","class":{"read"}},
-	"couchdb":{"port":"5984"}
-}
+
 class factory :
 	TYPE = {"sql":{"providers":["postgresql","mysql","neteeza","bigquery","mariadb","redshift"]}}
 	PROVIDERS = {
@@ -91,9 +82,14 @@ class factory :
         "bigquery":{"class":{"read":sql.BQReader,"write":sql.BQWriter}},
         "mysql":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"}},
         "mariadb":{"port":3306,"host":"localhost","default":{"type":"VARCHAR(256)"}},
-		"mongo":{"port":27017,"host":"localhost","class":{"read":mongo.MongoReader,"write":mongo.MongoWriter}},
-		"couch":{"port":5984,"host":"localhost","class":{"read":couch.CouchReader,"write":couch.CouchWriter}},
+		"mongo":{"port":27017,"host":"localhost","class":{"read":mongo.MongoReader,"write":mongo.MongoWriter}},		
+		"couch":{"port":5984,"host":"localhost","class":{"read":couch.CouchReader,"write":couch.CouchWriter}},		
         "netezza":{"port":5480,"driver":nz,"default":{"type":"VARCHAR(256)"}}}
+	#
+	# creating synonyms
+	PROVIDERS['mongodb'] = PROVIDERS['mongo']
+	PROVIDERS['couchdb'] = PROVIDERS['couch']
+	PROVIDERS['sqlite3'] = PROVIDERS['sqlite']
 
 	@staticmethod
 	def instance(**args):
@@ -126,14 +122,17 @@ class factory :
 		return anObject
 
 import time
-def instance(provider,context,**_args):
+def instance(**_args):
 	"""
 
 	@param provider	{file,sqlite,postgresql,redshift,bigquery,netezza,mongo,couch ...}
 	@param context	read|write|rw
 	@param _args	argument to got with the datastore (username,password,host,port ...)
 	"""
-	_id = context if context in ['read','write'] else None
+	
+	provider = _args['provider']
+	context = _args['context']
+	_id = context if context in ['read','write'] else 'read'
 	if _id :
 		args = {'provider':_id}
 		for key in factory.PROVIDERS[provider] :

+ 15 - 7
transport/disk.py

@@ -114,6 +114,7 @@ class DiskWriter(Writer):
 class SQLiteReader (DiskReader):
 	def __init__(self,**args):
 		DiskReader.__init__(self,**args)
+		self.path  = args['database'] if 'database' in args else args['path']
 		self.conn = sqlite3.connect(self.path,isolation_level=None)
 		self.conn.row_factory = sqlite3.Row
 		self.table = args['table']
@@ -145,7 +146,7 @@ class SQLiteWriter(DiskWriter) :
 		DiskWriter.__init__(self,**args)
 		self.table = args['table']
 		
-		self.conn = sqlite3.connect(self.path,isolation_level=None)
+		self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
 		self.conn.row_factory = sqlite3.Row
 		self.fields = args['fields'] if 'fields' in args else []
 		
@@ -184,20 +185,27 @@ class SQLiteWriter(DiskWriter) :
 		if not self.fields :
 			self.init(list(info.keys()))
 		
-		if type(info) != list :
+		if type(info) == object :
 			info = [info]
+		elif type(info) == pd.DataFrame :
+			info = info.to_dict(orient='records')
 		
 		SQLiteWriter.LOCK.acquire()
 		try:
+			
 			cursor = self.conn.cursor()	
-			sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(':values')"])
+			sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(:values)"])
 			for row in info :
-				stream = json.dumps(row)
-				stream = stream.replace("'","''")
-				cursor.execute(sql.replace(":values",stream) )
+				stream =["".join(["",value,""]) if type(value) == str else value for value in row.values()]
+				stream = json.dumps(stream).replace("[","").replace("]","")
+				
+				
+				self.conn.execute(sql.replace(":values",stream) )
+				# cursor.commit()
 			
-			# self.conn.commit()
+			self.conn.commit()
 				# print (sql)
 		except Exception as e :
+			print (e)
 			pass
 		SQLiteWriter.LOCK.release()