Ver Fonte

bug fix & aliasing

Steve Nyemba há 5 anos atrás
pai
commit
a16b969b69
1 ficheiros alterados com 20 adições e 4 exclusões
  1. 20 4
      transport/queue.py

+ 20 - 4
transport/queue.py

@@ -33,6 +33,9 @@ class MessageQueue:
 		self.connection = None
 		self.channel 	= None
 		
+		self.name 	= self.__class__.__name__.lower() if 'name' not in params else 'wtf'
+		
+
 		self.credentials= pika.PlainCredentials('guest','guest')
 		if 'username' in params :
 			self.credentials = pika.PlainCredentials(
@@ -57,6 +60,8 @@ class MessageQueue:
 		resp =  self.connection is not None and self.connection.is_open
 		self.close()
 		return resp
+	def finalize(self):
+		pass
 	def close(self):
 		if self.connection.is_closed == False :
 			self.channel.close()
@@ -81,12 +86,13 @@ class QueueWriter(MessageQueue,Writer):
 		
 
 
-	"""
+
+	def write(self,data,_type='text/plain'):
+		"""
 		This function writes a stream of data to the a given queue
 		@param object	object to be written (will be converted to JSON)
 		@TODO: make this less chatty
-	"""
-	def write(self,data,_type='text/plain'):
+		"""
 		# xchar = None
 		# if  'xchar' in params:
 		# 	xchar = params['xchar']
@@ -207,11 +213,14 @@ class QueueReader(MessageQueue,Reader):
 			# r[qid].append( self.data)
 
 		return self.data
-class QueueListener(QueueReader):
+class QueueListener(MessageQueue):
 	"""
 	This class is designed to have an active listener (worker) against a specified Exchange/Queue
 	It is initialized as would any other object and will require a callback function to address the objects returned.
 	"""
+	def __init__(self,**args):
+		MessageQueue.__init__(self,**args)
+		self.listen = self.read
 	# def init(self,qid):
 	# 	properties = pika.ConnectionParameters(host=self.host)
 	# 	self.connection = pika.BlockingConnection(properties)
@@ -222,11 +231,18 @@ class QueueListener(QueueReader):
 		
 	# 	self.channel.queue_bind(exchange=self.exchange,queue=self.info.method.queue,routing_key=qid)
 		#self.callback = callback
+	
+	def finalize(self,channel,ExceptionReason):
+		pass
+	 
 	def callback(self,channel,method,header,stream) :
 		raise Exception("....")
 	def read(self):
     	
 		self.init(self.queue)
+		
 		self.channel.basic_consume(self.queue,self.callback,auto_ack=True);
 		self.channel.start_consuming()
+		
+