Steve Nyemba 4 years ago
parent
commit
e4a1ef8dd7
3 changed files with 33 additions and 7 deletions
  1. 1 1
      setup.py
  2. 3 3
      transport/disk.py
  3. 29 3
      transport/mongo.py

+ 1 - 1
setup.py

@@ -8,7 +8,7 @@ def read(fname):
     return open(os.path.join(os.path.dirname(__file__), fname)).read() 
 args    = {
     "name":"data-transport",
-    "version":"1.2.0",
+    "version":"1.2.2",
     "author":"The Phi Technology LLC","author_email":"info@the-phi.com",
     "license":"MIT",
     "packages":["transport"]}

+ 3 - 3
transport/disk.py

@@ -106,7 +106,7 @@ class DiskWriter(Writer):
 		finally:
 			DiskWriter.THREAD_LOCK.release()
 class SQLiteReader (DiskReader):
-	def __init__(self,**args)
+	def __init__(self,**args):
 		DiskReader.__init__(self,**args)
 		self.conn = sqlite3.connect(self.path,isolation_level=None)
 		self.conn.row_factory = sqlite3.Row
@@ -114,10 +114,10 @@ class SQLiteReader (DiskReader):
 	def read(self,**args):
 		if 'sql' in args :
 			sql = args['sql']			
-		else if 'filter' in args :
+		elif 'filter' in args :
 			sql = "SELECT :fields FROM ",self.table, "WHERE (:filter)".replace(":filter",args['filter'])
 			sql = sql.replace(":fields",args['fields']) if 'fields' in args else sql.replace(":fields","*")
-		return = pd.read_sql(sql,self.conn)
+		return  pd.read_sql(sql,self.conn)
 	def close(self):
 		try:
 			self.conn.close()

+ 29 - 3
transport/mongo.py

@@ -17,6 +17,7 @@ if sys.version_info[0] > 2 :
 else:
 	from common import Reader, Writer
 import json
+import re
 class Mongo :
     """
     Basic mongodb functions are captured here
@@ -54,9 +55,34 @@ class MongoReader(Mongo,Reader):
     def __init__(self,**args):
         Mongo.__init__(self,**args)
     def read(self,**args):
-        collection = self.db[self.uid]
-        _filter = args['filter'] if 'filter' in args else {}
-        return collection.find(_filter)
+        if 'mongo' in args :
+            #
+            # @TODO:
+            cmd = args['mongo']
+            r =  []
+            out = self.db.command(cmd)
+            #@TODO: consider using a yield (generator) works wonders
+            while True :
+                if 'cursor' in out :
+                    key = 'firstBatch' if 'firstBatch' in out['cursor'] else 'nextBatch'
+                else:
+                    key = 'n'
+                if 'cursor' in out and out['cursor'][key] :
+                    r += list(out['cursor'][key])
+                elif out[key]:
+                    r.append (out[key]) 
+                    # yield out['cursor'][key]
+                if key not in ['firstBatch','nextBatch'] or ('cursor' in out and out['cursor']['id']  == 0) :
+                    break
+                else:
+                    out = self.db.command({"getMore":out['cursor']['id'],"collection":out['cursor']['ns'].split(".")[-1]}) 
+                
+                
+            return r
+        else:
+            collection = self.db[self.uid]                
+            _filter = args['filter'] if 'filter' in args else {}
+            return collection.find(_filter)
     def view(self,**args):
         """
         This function is designed to execute a view (map/reduce) operation