Переглянути джерело

transport framework update (pika,couchdbkit,...)

Steve L. Nyemba 8 роки тому
батько
коміт
45b054de8c
1 змінених файлів з 21 додано та 2 видалено
  1. 21 2
      src/utils/transport.py

+ 21 - 2
src/utils/transport.py

@@ -345,6 +345,10 @@ class QueueReader(MessageQueue,Reader):
 		#self.uid = params['uid']
 		#self.qid = params['qid']
 		MessageQueue.__init__(self,**params);
+		if 'durable' in params :
+			self.durable = True
+		else:
+			self.durable = False
 		self.size = -1
 		self.data = {}
 	
@@ -406,11 +410,26 @@ class QueueReader(MessageQueue,Reader):
 				
 				pass
 				#self.close()
-			
 			# r[qid].append( self.data)
 		
 		return self.data
-		
+class QueueListener(QueueReader):
+	def init(self,qid):
+		properties = pika.ConnectionParameters(host=self.host)
+		self.connection = pika.BlockingConnection(properties)
+		self.channel	= self.connection.channel()
+		self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True )
+
+		self.info = self.channel.queue_declare(exclusive=True,queue=qid)
+		print self.info.method.queue
+		self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid)
+		#self.callback = callback
+	def read(self):
+    	
+		self.init(self.qid)
+		self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True);
+		self.channel.start_consuming()
+    		
 """
 	This class is designed to write output as sql insert statements
 	The class will inherit from DiskWriter with minor adjustments