Browse Source

bug fixes and simplifying interfaces

Steve Nyemba 3 years ago
parent
commit
b239a5149f
4 changed files with 111 additions and 55 deletions
  1. 1 1
      setup.py
  2. 21 32
      transport/__init__.py
  3. 50 13
      transport/disk.py
  4. 39 9
      transport/sql.py

+ 1 - 1
setup.py

@@ -8,7 +8,7 @@ def read(fname):
     return open(os.path.join(os.path.dirname(__file__), fname)).read() 
 args    = {
     "name":"data-transport",
-    "version":"1.4.1",
+    "version":"1.4.4",
     "author":"The Phi Technology LLC","author_email":"info@the-phi.com",
     "license":"MIT",
     "packages":["transport"]}

+ 21 - 32
transport/__init__.py

@@ -1,45 +1,24 @@
 """
-Data Transport - 1.0
-Steve L. Nyemba, The Phi Technology LLC
+Data Transport, The Phi Technology LLC
+Steve L. Nyemba, steve@the-phi.com
 
-This module is designed to serve as a wrapper to a set of supported data stores :
+This library is designed to serve as a wrapper to a set of supported data stores :
     - couchdb
     - mongodb
     - Files (character delimited)
     - Queues (RabbmitMq)
     - Session (Flask)
     - s3
+	- sqlite
 The supported operations are read/write and providing meta data to the calling code
 Requirements :
 	pymongo
 	boto
 	couldant
 The configuration for the data-store is as follows :
-	couchdb:
-		{
-			args:{
-				url:<url>,
-				username:<username>,
-				password:<password>,
-				dbname:<database>,
-				doc:<document id>
-			}
-		}
-	RabbitMQ:
-		{
-			
-		}
-	Mongodb:
-	{
-		args:{
-			host:<url>, #localhost:27017
-			username:<username>,
-			password:<password>,
-			dbname:<database>,
-			doc:<document id>s
-
-		}
-	}
+	e.g:
+	mongodb
+		provider:'mongodb',[port:27017],[host:localhost],db:<name>,doc:<_name>,context:<read|write>
 """
 __author__ = 'The Phi Technology'
 import pandas 	as pd
@@ -90,9 +69,17 @@ class factory :
 	PROVIDERS['mongodb'] = PROVIDERS['mongo']
 	PROVIDERS['couchdb'] = PROVIDERS['couch']
 	PROVIDERS['sqlite3'] = PROVIDERS['sqlite']
-
+	
 	@staticmethod
-	def instance(**args):
+	def instance(**_args):
+		if 'type' in _args :
+			#
+			# Legacy code being returned
+			return factory._instance(**_args);
+		else:
+			return instance(**_args)
+	@staticmethod
+	def _instance(**args):
 		"""
 		This class will create an instance of a transport when providing 
 		:type	name of the type we are trying to create
@@ -131,7 +118,7 @@ def instance(**_args):
 	"""
 	
 	provider = _args['provider']
-	context = _args['context']
+	context = _args['context']if 'context' in _args else None
 	_id = context if context in ['read','write'] else 'read'
 	if _id :
 		args = {'provider':_id}
@@ -142,6 +129,7 @@ def instance(**_args):
 			args[key] = value
 		#
 		#
+		
 		args = dict(args,**_args)
 		
 		# print (provider in factory.PROVIDERS)
@@ -149,6 +137,7 @@ def instance(**_args):
 			pointer = factory.PROVIDERS[provider]['class'][_id] 
 		else:
 			pointer = sql.SQLReader if _id == 'read' else sql.SQLWriter
+		
 		return pointer(**args)
 
-	return None
+	return None

+ 50 - 13
transport/disk.py

@@ -111,13 +111,47 @@ class DiskWriter(Writer):
 			pass
 		finally:
 			DiskWriter.THREAD_LOCK.release()
-class SQLiteReader (DiskReader):
-	def __init__(self,**args):
-		DiskReader.__init__(self,**args)
-		self.path  = args['database'] if 'database' in args else args['path']
-		self.conn = sqlite3.connect(self.path,isolation_level=None)
+class SQLite :
+	def __init__(self,**_args) :
+		self.path  = _args['database'] if 'database' in _args else _args['path']
+		self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
 		self.conn.row_factory = sqlite3.Row
-		self.table = args['table']
+		self.fields = _args['fields'] if 'fields' in _args else []
+	def has (self,**_args):
+		found = False
+		try:
+			if 'table' in _args :
+				table = _args['table']
+				sql = "SELECT * FROM :table limit 1".replace(":table",table) 
+				_df = pd.read_sql(sql,self.conn)
+				found = _df.columns.size > 0
+		except Exception as e:
+			pass
+		return found
+	def close(self):
+		try:
+			self.conn.close()
+		except Exception as e :
+			print(e)
+	def apply(self,sql):
+		try:
+			if not sql.lower().startswith('select'):
+				cursor = self.conn.cursor()
+				cursor.execute(sql)
+				cursor.close()
+				self.conn.commit()
+			else:
+				return pd.read_sql(sql,self.conn)
+		except Exception as e:
+			print (e)
+class SQLiteReader (SQLite,DiskReader):
+	def __init__(self,**args):
+		super().__init__(**args)
+		# DiskReader.__init__(self,**args)
+		# self.path  = args['database'] if 'database' in args else args['path']
+		# self.conn = sqlite3.connect(self.path,isolation_level=None)
+		# self.conn.row_factory = sqlite3.Row
+		self.table = args['table'] if 'table' in args else None
 	def read(self,**args):
 		if 'sql' in args :
 			sql = args['sql']			
@@ -135,7 +169,7 @@ class SQLiteReader (DiskReader):
 		except Exception as e :
 			pass
 
-class SQLiteWriter(DiskWriter) :
+class SQLiteWriter(SQLite,DiskWriter) :
 	connection = None
 	LOCK = Lock()
 	def __init__(self,**args):
@@ -143,12 +177,13 @@ class SQLiteWriter(DiskWriter) :
 		:path
 		:fields json|csv
 		"""
-		DiskWriter.__init__(self,**args)
-		self.table = args['table']
+		# DiskWriter.__init__(self,**args)
+		super().__init__(**args)
+		self.table = args['table'] if 'table' in args else None
 		
-		self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
-		self.conn.row_factory = sqlite3.Row
-		self.fields = args['fields'] if 'fields' in args else []
+		# self.conn = sqlite3.connect(self.path,isolation_level="IMMEDIATE")
+		# self.conn.row_factory = sqlite3.Row
+		# self.fields = args['fields'] if 'fields' in args else []
 		
 		if self.fields and not self.isready():
 			self.init(self.fields)
@@ -185,7 +220,7 @@ class SQLiteWriter(DiskWriter) :
 		if not self.fields :
 			self.init(list(info.keys()))
 		
-		if type(info) == object :
+		if type(info) == dict :
 			info = [info]
 		elif type(info) == pd.DataFrame :
 			info = info.to_dict(orient='records')
@@ -196,6 +231,8 @@ class SQLiteWriter(DiskWriter) :
 			cursor = self.conn.cursor()	
 			sql = " " .join(["INSERT INTO ",self.table,"(", ",".join(self.fields) ,")", "values(:values)"])
 			for row in info :
+				print (row)
+				print (row.values())
 				stream =["".join(["",value,""]) if type(value) == str else value for value in row.values()]
 				stream = json.dumps(stream).replace("[","").replace("]","")
 				

+ 39 - 9
transport/sql.py

@@ -75,7 +75,15 @@ class SQLRW :
             _info['securityLevel'] = 0
             del _info['dbname']
         self.conn = _handler.connect(**_info)
-    
+    def has(self,**_args):
+        found = False
+        try:
+            table = _args['table']
+            sql = "SELECT * FROM :table LIMIT 1".replace(":table",table)
+            found = pd.read_sql(sql,self.conn).shape[0]
+        except Exception as e:
+            pass
+        return found
     def isready(self):
         _sql = "SELECT * FROM :table LIMIT 1".replace(":table",self.table)
         try:
@@ -201,8 +209,14 @@ class SQLWriter(SQLRW,Writer):
                     # values = [ "".join(["'",str(_row[key]),"'"]) if np.nan(_row[key]).isnumeric() else str(_row[key]) for key in _row]
                     # print (values)
                     query = _sql.replace(":fields",",".join(fields)).replace(":values",values)
-                    
-                    cursor.execute(query,_row.values())
+                    if type(info) == pd.DataFrame :
+                        _values = info.values.tolist()
+                    elif type(info) == list and type(info[0]) == dict:
+                        print ('........')
+                        _values = [tuple(item.values()) for item in info]
+                    else:
+                        _values = info;
+                    cursor.execute(query,_values)
                 
 
                 pass
@@ -210,14 +224,23 @@ class SQLWriter(SQLRW,Writer):
                 _fields = ",".join(self.fields)
                 # _sql = _sql.replace(":fields",_fields)
                 # _sql = _sql.replace(":values",",".join(["%("+name+")s" for name in self.fields]))
-                _sql = _sql.replace("(:fields)","")
+                # _sql = _sql.replace("(:fields)","")
+                _sql = _sql.replace(":fields",_fields)
                 values = ", ".join("?"*len(self.fields)) if self._provider == 'netezza' else ",".join(["%s" for name in self.fields])
                 _sql = _sql.replace(":values",values)
-                
-                # for row in info :
-                #     values = ["'".join(["",value,""]) if not str(value).isnumeric() else value for value in row.values()]
-                
-                cursor.executemany(_sql,info)   
+                if type(info) == pd.DataFrame :
+                    _info = info[self.fields].values.tolist()
+                elif  type(info) == dict :
+                    _info = info.values()
+                else:
+                    # _info = []
+
+                    _info = pd.DataFrame(info)[self.fields].values.tolist()
+                    # for row in info :
+                        
+                    #     if type(row) == dict :
+                    #         _info.append( list(row.values()))
+                cursor.executemany(_sql,_info)   
             
             # self.conn.commit()
         except Exception as e:
@@ -250,6 +273,13 @@ class BigQuery:
         client = bq.Client.from_service_account_json(self.path)
         ref     = client.dataset(self.dataset).table(table)
         return client.get_table(ref).schema
+    def has(self,**_args):
+        found = False
+        try:
+            found = self.meta(**_args) is not None
+        except Exception as e:
+            pass
+            return found
 class BQReader(BigQuery,Reader) :
     def __init__(self,**_args):