瀏覽代碼

optimizations mongodb

Steve Nyemba 3 年之前
父節點
當前提交
e5fadc64a0
共有 2 個文件被更改,包括 21 次插入11 次删除
  1. 7 7
      bin/transport
  2. 14 4
      transport/mongo.py

+ 7 - 7
bin/transport

@@ -63,8 +63,8 @@ class Post(Process):
 		else:
 		else:
 			self.PROVIDER = args['target']['provider']
 			self.PROVIDER = args['target']['provider']
 			args['target']['context'] = 'write'
 			args['target']['context'] = 'write'
-			
-			self.writer = transport.instance(**args['target'])
+			self.store = args['target']
+			# self.writer = transport.instance(**args['target'])
 		#
 		#
 		# If the table doesn't exists maybe create it ?
 		# If the table doesn't exists maybe create it ?
 		#
 		#
@@ -86,9 +86,9 @@ class Post(Process):
 			else:
 			else:
 				value = ''
 				value = ''
 			_info[name] = _info[name].fillna(value)
 			_info[name] = _info[name].fillna(value)
-		
-		self.writer.write(_info)
-		self.writer.close()
+		writer = transport.factory.instance(**self.store)
+		writer.write(_info)
+		writer.close()
 
 
 		
 		
 class ETL (Process):
 class ETL (Process):
@@ -139,11 +139,11 @@ class ETL (Process):
 			#
 			#
 			# @TODO: locks
 			# @TODO: locks
 			for i in np.arange(self.JOB_COUNT) :
 			for i in np.arange(self.JOB_COUNT) :
-				print ()
-				print (i)
 				_id = 'segment # '.join([str(i),' ',self.name])
 				_id = 'segment # '.join([str(i),' ',self.name])
 				indexes = rows[i]
 				indexes = rows[i]
 				segment = idf.loc[indexes,:].copy() #.to_dict(orient='records')
 				segment = idf.loc[indexes,:].copy() #.to_dict(orient='records')
+				if segment.shape[0] == 0 :
+					continue
 				proc = Post(target = self._oargs,rows = segment,name=_id)
 				proc = Post(target = self._oargs,rows = segment,name=_id)
 				self.jobs.append(proc)
 				self.jobs.append(proc)
 				proc.start()
 				proc.start()

+ 14 - 4
transport/mongo.py

@@ -20,7 +20,9 @@ else:
 	from common import Reader, Writer
 	from common import Reader, Writer
 import json
 import json
 import re
 import re
+from multiprocessing import Lock, RLock
 class Mongo :
 class Mongo :
+    lock = RLock()
     """
     """
     Basic mongodb functions are captured here
     Basic mongodb functions are captured here
     """
     """
@@ -44,6 +46,7 @@ class Mongo :
         self.uid    = args['doc']  #-- document identifier
         self.uid    = args['doc']  #-- document identifier
         self.dbname = args['dbname'] if 'dbname' in args else args['db']
         self.dbname = args['dbname'] if 'dbname' in args else args['db']
         self.db = self.client[self.dbname]
         self.db = self.client[self.dbname]
+        self._lock = False if 'lock' not in args else args['lock']
         
         
     def isready(self):
     def isready(self):
         p = self.dbname in self.client.list_database_names() 
         p = self.dbname in self.client.list_database_names() 
@@ -144,10 +147,17 @@ class MongoWriter(Mongo,Writer):
         # if type(info) == list :
         # if type(info) == list :
         #     self.db[self.uid].insert_many(info)
         #     self.db[self.uid].insert_many(info)
         # else:
         # else:
-        if type(info) == list or type(info) == pd.DataFrame :
-            self.db[self.uid].insert_many(info if type(info) == list else info.to_dict(orient='records'))
-        else:
-            self.db[self.uid].insert_one(info)
+        try:
+
+            if self._lock :
+                Mongo.lock.acquire()
+            if type(info) == list or type(info) == pd.DataFrame :
+                self.db[self.uid].insert_many(info if type(info) == list else info.to_dict(orient='records'))
+            else:
+                self.db[self.uid].insert_one(info)
+        finally:
+            if self._lock :
+                Mongo.lock.release()
     def set(self,document):
     def set(self,document):
         """
         """
         if no identifier is provided the function will delete the entire collection and set the new document.
         if no identifier is provided the function will delete the entire collection and set the new document.