Browse Source

Merge pull request #4 from lnyemba/dev

updates with version handling and new handler
Steve L. Nyemba 1 year ago
parent
commit
b04eb5e7e5
6 changed files with 243 additions and 30 deletions
  1. 124 24
      transport/__init__.py
  2. 4 1
      transport/mongo.py
  3. 63 0
      transport/providers.py
  4. 42 0
      transport/qlistener.py
  5. 9 4
      transport/sql.py
  6. 1 1
      transport/version.py

+ 124 - 24
transport/__init__.py

@@ -38,6 +38,7 @@ if sys.version_info[0] > 2 :
 	from transport import sql as sql
 	from transport import etl as etl
 	from transport.version import __version__
+	from transport import providers
 else:
 	from common import Reader, Writer,Console #, factory
 	import disk
@@ -48,37 +49,39 @@ else:
 	import sql
 	import etl
 	from version import __version__
+	import providers
 import psycopg2 as pg
 import mysql.connector as my
 from google.cloud import bigquery as bq
 import nzpy as nz   #--- netezza drivers
 import os
 
-class providers :
-	POSTGRESQL 	= 'postgresql'
-	MONGODB 	= 'mongodb'
+# class providers :
+# 	POSTGRESQL 	= 'postgresql'
+# 	MONGODB 	= 'mongodb'
 	
-	BIGQUERY	='bigquery'
-	FILE 	= 'file'
-	ETL = 'etl'
-	SQLITE = 'sqlite'
-	SQLITE3= 'sqlite'
-	REDSHIFT = 'redshift'
-	NETEZZA = 'netezza'
-	MYSQL = 'mysql'
-	RABBITMQ = 'rabbitmq'
-	MARIADB  = 'mariadb'
-	COUCHDB = 'couch'
-	CONSOLE = 'console'
-	ETL = 'etl'
-	#
-	# synonyms of the above
-	BQ 		= BIGQUERY
-	MONGO 	= MONGODB
-	FERRETDB= MONGODB
-	PG 		= POSTGRESQL
-	PSQL 	= POSTGRESQL
-	PGSQL	= POSTGRESQL
+# 	BIGQUERY	='bigquery'
+# 	FILE 	= 'file'
+# 	ETL = 'etl'
+# 	SQLITE = 'sqlite'
+# 	SQLITE3= 'sqlite'
+# 	REDSHIFT = 'redshift'
+# 	NETEZZA = 'netezza'
+# 	MYSQL = 'mysql'
+# 	RABBITMQ = 'rabbitmq'
+# 	MARIADB  = 'mariadb'
+# 	COUCHDB = 'couch'
+# 	CONSOLE = 'console'
+# 	ETL = 'etl'
+# 	#
+# 	# synonyms of the above
+# 	BQ 		= BIGQUERY
+# 	MONGO 	= MONGODB
+# 	FERRETDB= MONGODB
+# 	PG 		= POSTGRESQL
+# 	PSQL 	= POSTGRESQL
+# 	PGSQL	= POSTGRESQL
+# import providers
 
 class IEncoder (json.JSONEncoder):
 	def default (self,object):
@@ -156,6 +159,103 @@ class factory :
 import time
 def instance(**_args):
 	"""
+	creating an instance given the provider, we should have an idea of :class, :driver
+	:provider
+	:read|write = {connection to the database}
+	"""	
+	_provider = _args['provider']
+	_group = None
+	
+	for _id in providers.CATEGORIES :
+		if _provider in providers.CATEGORIES[_id] :
+			_group = _id
+			break
+	if _group :
+		_classPointer = _getClassInstance(_group,**_args)
+		#
+		# Let us reformat the arguments
+		if 'read' in _args or 'write' in _args :
+			_args = _args['read'] if 'read' in _args else _args['write']
+			_args['provider'] = _provider
+		if _group == 'sql' :
+			_info = _get_alchemyEngine(**_args)
+
+			_args = dict(_args,**_info)
+			_args['driver'] = providers.DRIVERS[_provider]
+			
+		else:
+			if _provider in providers.DEFAULT :
+				_default = providers.DEFAULT[_provider]
+				_defkeys = list(set(_default.keys()) - set(_args.keys()))
+				if _defkeys :
+					for key in _defkeys :
+						_args[key] = _default[key]
+			pass
+		#
+		# get default values from 
+		
+		return _classPointer(**_args)
+	#
+	# Let us determine the category of the provider that has been given
+def _get_alchemyEngine(**_args):
+	"""
+	This function returns the SQLAlchemy engine associated with parameters, This is only applicable for SQL _items
+	:_args	arguments passed to the factory {provider and other}
+	"""
+	#@TODO: Enable authentication files (private_key)
+	_username = _args['username'] if 'username' in _args else ''
+	_password = _args['password'] if 'password' in _args else ''
+	_account = _args['account'] if 'account' in _args else ''
+	_database =  _args['database']	
+	_provider = _args['provider']
+	if _username != '':
+		_account = _username + ':'+_password+'@'
+	_host = _args['host'] if 'host' in _args else ''
+	_port = _args['port'] if 'port' in _args else ''
+	if _provider in providers.DEFAULT :
+		_default = providers.DEFAULT[_provider]
+		_host = _host if _host != '' else (_default['host'] if 'host' in _default else '')
+		_port = _port if _port != '' else (_default['port'] if 'port' in _default else '')
+	if _port == '':		
+		_port = providers.DEFAULT['port'] if 'port' in providers.DEFAULT else ''
+	#
+
+	if _host != '' and _port != '' :
+		_fhost = _host+":"+str(_port) #--formatted hostname
+	else:
+		_fhost = _host
+	# Let us update the parameters we have thus far
+	#
+	
+	
+	uri = ''.join([_provider,"://",_account,_fhost,'/',_database])
+	
+	_engine =  sqlalchemy.create_engine (uri,future=True)
+	_out = {'sqlalchemy':_engine}
+	_pargs = {'host':_host,'port':_port,'username':_username,'password':_password}
+	for key in _pargs :
+		if _pargs[key] != '' :
+			_out[key] = _pargs[key]
+	return _out
+def _getClassInstance(_group,**_args):
+	"""
+	This function returns the class instance we are attempting to instanciate
+	:_group		items in providers.CATEGORIES.keys()
+	:_args		arguments passed to the factory class
+	"""		
+	if 'read' in _args or 'write' in _args :
+		_context = 'read' if 'read' in _args else _args['write']
+		_info = _args[_context]
+	else:
+		_context = _args['context'] if 'context' in _args else 'read'
+	_class = providers.READ[_group] if _context == 'read' else providers.WRITE[_group]
+	if type(_class) == dict and _args['provider'] in _class:
+		_class = _class[_args['provider']]
+
+	return _class
+
+def __instance(**_args):
+	"""
 
 	@param provider	{file,sqlite,postgresql,redshift,bigquery,netezza,mongo,couch ...}
 	@param context	read|write|rw

+ 4 - 1
transport/mongo.py

@@ -127,6 +127,8 @@ class MongoReader(Mongo,Reader):
                 
             return pd.DataFrame(r)
         else:
+            
+            
             if 'table' in args  or 'collection' in args :
                 if 'table' in args:
                     _uid = args['table']
@@ -134,7 +136,8 @@ class MongoReader(Mongo,Reader):
                     _uid = args['collection']
                 else:
                     _uid = self.uid 
-
+            else:
+                _uid = self.uid
             collection = self.db[_uid]                
             _filter = args['filter'] if 'filter' in args else {}
             _df =  pd.DataFrame(collection.find(_filter))

+ 63 - 0
transport/providers.py

@@ -0,0 +1,63 @@
+from transport.common import Reader, Writer,Console #, factory
+from transport import disk
+import sqlite3
+from transport import s3 as s3
+from transport import rabbitmq as queue
+from transport import couch as couch
+from transport import mongo as mongo
+from transport import sql as sql
+from transport import etl as etl
+from transport import qlistener
+import psycopg2 as pg
+import mysql.connector as my
+from google.cloud import bigquery as bq
+import nzpy as nz   #--- netezza drivers
+import os
+
+from transport.version import __version__
+
+POSTGRESQL 	= 'postgresql'
+MONGODB 	= 'mongodb'
+HTTP='http'
+BIGQUERY	='bigquery'
+FILE 	= 'file'
+ETL = 'etl'
+SQLITE = 'sqlite'
+SQLITE3= 'sqlite'
+REDSHIFT = 'redshift'
+NETEZZA = 'netezza'
+MYSQL = 'mysql'
+RABBITMQ = 'rabbitmq'
+MARIADB  = 'mariadb'
+COUCHDB = 'couch'
+CONSOLE = 'console'
+ETL = 'etl'
+#
+# synonyms of the above
+BQ 		= BIGQUERY
+MONGO 	= MONGODB
+FERRETDB= MONGODB
+PG 		= POSTGRESQL
+PSQL 	= POSTGRESQL
+PGSQL	= POSTGRESQL
+S3      = 's3'	
+AWS_S3  = 's3'
+RABBIT = RABBITMQ
+
+QLISTENER = 'qlistener'
+
+DRIVERS  = {PG:pg,REDSHIFT:pg,MYSQL:my,MARIADB:my,NETEZZA:nz,SQLITE:sqlite3}
+CATEGORIES ={'sql':[NETEZZA,PG,MYSQL,REDSHIFT,SQLITE,MARIADB],'nosql':[MONGODB,COUCHDB],'cloud':[BIGQUERY],'file':[FILE],
+             'queue':[RABBIT,QLISTENER],'memory':[CONSOLE,QLISTENER],'http':[HTTP]}
+
+READ = {'sql':sql.SQLReader,'nosql':{MONGODB:mongo.MongoReader,COUCHDB:couch.CouchReader},'cloud':sql.BigQueryReader,
+        'file':disk.DiskReader,'queue':{RABBIT:queue.QueueReader,QLISTENER:qlistener.qListener}
+        }
+WRITE = {'sql':sql.SQLWriter,'nosql':{MONGODB:mongo.MongoWriter,COUCHDB:couch.CouchWriter},'cloud':sql.BigQueryWriter,
+         'file':disk.DiskWriter,'queue':{RABBIT:queue.QueueWriter,QLISTENER:qlistener.qListener}
+        }
+DEFAULT = {PG:{'host':'localhost','port':5432},MYSQL:{'host':'localhost','port':3306}}
+DEFAULT[MONGODB] = {'port':27017,'host':'localhost'}
+DEFAULT[REDSHIFT] = DEFAULT[PG]
+DEFAULT[MARIADB] = DEFAULT[MYSQL]
+DEFAULT[NETEZZA] = {'port':5480}

+ 42 - 0
transport/qlistener.py

@@ -0,0 +1,42 @@
+import queue
+from threading import Thread, Lock
+from transport.common import Reader,Writer
+import numpy as np
+import pandas as pd
+
+class qListener :
+    lock = Lock()
+    _queue = {'default':queue.Queue()}
+    def __init__(self,**_args):
+        self._cache     = {}
+        self._callback  = _args['callback'] if 'callback' in _args else None
+        self._id = _args['id'] if 'id' in _args else 'default'
+        if self._id not in qListener._queue :
+            qListener._queue[self._id] = queue.Queue()
+        thread = Thread(target=self._forward)
+        thread.start()
+    def _forward(self):
+        _q = qListener._queue[self._id]
+        _data = _q.get()
+        _q.task_done()       
+        self._callback(_data)
+
+    def has(self,**_args) :
+        return self._callback is not None
+        
+   
+    def close(self):
+        """
+        This will empty the queue and have it ready for another operation
+        """
+        _q = qListener._queue[self._id]
+        with _q.mutex:
+            _q.queue.clear()  
+            _q.all_tasks_done.notify_all()  
+
+    def write(self,_data,**_args):
+        _id = _args['id'] if 'id' in _args else self._id
+        
+        _q = qListener._queue[_id]
+        _q.put(_data)
+        _q.join()

+ 9 - 4
transport/sql.py

@@ -29,6 +29,7 @@ from multiprocessing import Lock, RLock
 import pandas as pd
 import numpy as np
 import nzpy as nz   #--- netezza drivers
+import sqlite3
 import copy
 import os
 
@@ -58,8 +59,8 @@ class SQLRW :
         # _info['host'] = 'localhost' if 'host' not in _args else _args['host']
         # _info['port'] = SQLWriter.REFERENCE[_provider]['port'] if 'port' not in _args else _args['port']
 
-        _info['host'] = _args['host']
-        _info['port'] = _args['port']
+        _info['host'] = _args['host'] if 'host' in _args else ''
+        _info['port'] = _args['port'] if 'port' in _args else ''
         
         # if 'host' in _args :
         #     _info['host'] = 'localhost' if 'host' not in _args else _args['host']
@@ -98,8 +99,12 @@ class SQLRW :
         if _handler == my :
             _info['database'] = _info['dbname']
             del _info['dbname']
-        
-        self.conn = _handler.connect(**_info)
+        if _handler == sqlite3 :
+            _info = {'path':_info['dbname'],'isolation_level':'IMMEDIATE'}
+        if _handler != sqlite3 :
+            self.conn = _handler.connect(**_info) 
+        else:
+            self.conn = _handler.connect(_info['path'],isolation_level='IMMEDIATE')
         self._engine = _args['sqlalchemy']  if 'sqlalchemy' in _args else None
     def meta(self,**_args):
         schema = []

+ 1 - 1
transport/version.py

@@ -1,2 +1,2 @@
 __author__ = 'The Phi Technology'
-__version__= '1.8.1'
+__version__= '1.8.2'