|
@@ -15,26 +15,12 @@ import gridfs
|
|
|
# from transport import Reader,Writer
|
|
|
import sys
|
|
|
if sys.version_info[0] > 2 :
|
|
|
- from transport.common import Reader, Writer
|
|
|
+ from transport.common import Reader, Writer, IEncoder
|
|
|
else:
|
|
|
from common import Reader, Writer
|
|
|
import json
|
|
|
import re
|
|
|
from multiprocessing import Lock, RLock
|
|
|
-
|
|
|
-def IEncoder (self,object):
|
|
|
- if type(object) == np.integer :
|
|
|
- return int(object)
|
|
|
- elif type(object) == np.floating:
|
|
|
- return float(object)
|
|
|
- elif type(object) == np.ndarray :
|
|
|
- return object.tolist()
|
|
|
- elif type(object) == datetime :
|
|
|
- return o.isoformat()
|
|
|
- else:
|
|
|
- return super(IEncoder,self).default(object)
|
|
|
-
|
|
|
-
|
|
|
class Mongo :
|
|
|
lock = RLock()
|
|
|
"""
|
|
@@ -93,7 +79,7 @@ class Mongo :
|
|
|
q = self.uid in self.client[self.dbname].list_collection_names()
|
|
|
return p and q
|
|
|
def setattr(self,key,value):
|
|
|
- _allowed = ['host','port','db','doc','authSource','mechanism']
|
|
|
+ _allowed = ['host','port','db','doc','collection','authSource','mechanism']
|
|
|
if key in _allowed :
|
|
|
setattr(self,key,value)
|
|
|
pass
|
|
@@ -113,10 +99,13 @@ class MongoReader(Mongo,Reader):
|
|
|
#
|
|
|
# @TODO:
|
|
|
cmd = {}
|
|
|
+ if 'aggregate' not in cmd and 'aggregate' not in args:
|
|
|
+ cmd['aggregate'] = self.uid
|
|
|
+ elif 'aggregate' in args :
|
|
|
+ cmd['aggregate'] = args['aggregate']
|
|
|
if 'pipeline' in args :
|
|
|
cmd['pipeline']= args['pipeline']
|
|
|
- if 'aggregate' not in cmd :
|
|
|
- cmd['aggregate'] = self.uid
|
|
|
+
|
|
|
if 'pipeline' not in args or 'aggregate' not in cmd :
|
|
|
cmd = args['mongo'] if 'mongo' in args else args['cmd']
|
|
|
if "aggregate" in cmd :
|