Browse Source

bug fix & new feature

Steve Nyemba 2 years ago
parent
commit
883a6ef22f
3 changed files with 29 additions and 23 deletions
  1. 1 0
      transport/__init__.py
  2. 18 1
      transport/common.py
  3. 10 22
      transport/mongo.py

+ 1 - 0
transport/__init__.py

@@ -59,6 +59,7 @@ class providers :
 	FILE 	= 'file'
 	ETL = 'etl'
 	SQLITE = 'sqlite'
+	SQLITE3= 'sqlite'
 	REDSHIFT = 'redshift'
 	NETEZZA = 'netezza'
 	MYSQL = 'mysql'

+ 18 - 1
transport/common.py

@@ -115,4 +115,21 @@ class Console(Writer):
 		finally:
 			if self.lock :
 				Console.lock.release()
-	
+"""
+@NOTE : Experimental !!
+"""	
+class Proxy :
+	"""
+	This class will forward a call to a function that is provided by the user code
+	"""
+	def __init__(self,**_args):
+		self.callback = _args['callback']
+	def read(self,**_args) :
+		try:
+			return self.callback(**_args)
+		except Exception as e:
+			return self.callback()
+
+		pass
+	def write(self,data,**_args):
+		self.callback(data,**_args)

+ 10 - 22
transport/mongo.py

@@ -10,7 +10,7 @@ from bson.binary    import Binary
 import nujson as json
 from datetime import datetime
 import pandas as pd
-
+import numpy as np
 import gridfs
 # from transport import Reader,Writer
 import sys
@@ -33,33 +33,15 @@ class Mongo :
             :username   username for authentication
             :password   password for current user
         """
-        # port = str(args['port']) if 'port' in args else '27017'
-        # host = args['host'] if 'host' in args else 'localhost'
-        # host = ":".join([host,port]) #-- Formatting host information here
-        # self.uid    = args['doc'] if 'doc' in args else None  #-- document identifier
-        # self.dbname = args['dbname'] if 'dbname' in args else args['db']
+
         self.authMechanism= 'SCRAM-SHA-256' if 'mechanism' not in args else args['mechanism']
         # authSource=(args['authSource'] if 'authSource' in args else self.dbname)
         self._lock = False if 'lock' not in args else args['lock']
 
         username = password = None
-        # if 'username' in args and 'password' in args: 
-        #     username = args['username']       
-        #     password=args['password']
         if 'auth_file' in args :
             _info = json.loads((open(args['auth_file'])).read())
-            # username = _info['username']
-            # password = _info['password']
-            # if 'mechanism' in _info:
-            #     authMechanism = _info['mechanism']
-            # if 'authSource' in _info:
-            #     authSource = _info['authSource']
-            # #
-            # # We are allowing the authentication file to set collection and databases too
-            # if 'db' in _info :
-            #     self.dbname = _info['db']
-            # if 'doc' in _info :
-            #     self.uid = _info['doc']
+
             
         else:
             _info = {}
@@ -100,7 +82,8 @@ class Mongo :
         pass
     def close(self):
         self.client.close()
-
+    def meta(self,**_args):
+        return []
 class MongoReader(Mongo,Reader):
     """
     This class will read from a mongodb data store and return the content of a document (not a collection)
@@ -113,6 +96,11 @@ class MongoReader(Mongo,Reader):
             #
             # @TODO:
             cmd = args['mongo']
+            if "aggregate" in cmd :
+                if "allowDiskUse" not in cmd :
+                    cmd["allowDiskUse"] = True
+                if "cursor" not in cmd :
+                    cmd["cursor"] = {}
             r =  []
             out = self.db.command(cmd)
             #@TODO: consider using a yield (generator) works wonders