Przeglądaj źródła

Bug fix, with router in transport class and actor hierarchy

steve 8 lat temu
rodzic
commit
7423f6a9ec
2 zmienionych plików z 25 dodań i 7 usunięć
  1. 21 3
      src/utils/agents/actor.py
  2. 4 4
      src/utils/transport.py

+ 21 - 3
src/utils/agents/actor.py

@@ -20,6 +20,10 @@ class Actor(Thread):
 		Thread.__init__(self)
 		self.config = config
 		self.items = []
+		self.__id = config['id']
+        def getIdentifier(self):
+                return self.__id
+            
 	def init(self,litems):
 		self.items = litems
 	def process(self,item):
@@ -33,8 +37,18 @@ class Actor(Thread):
 			pass
 		return stream
 	def callback(self,channel,method,header,stream):
-		print stream
-		
+                print [self.getIdentifier(),stream]
+                #message = None
+                #try:
+                    #message = json.loads(stream)
+                #except Exception, e:
+                    #pass
+                #if message is not None:
+                    #if 'id' in message :
+                        #if 'payload' in message:
+                            #self.execute(message['payload']
+                            
+                        
 	def run(self):
 		info = {}
 		host	= self.config['api']
@@ -68,4 +82,8 @@ class Alert(Actor):
 
 config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"}
 actor = Kill(config)
-actor.start()
+actor.start()
+
+config = {"id":"demo-100","key":"[0v8]-247&7!v3","api":"localhost"}
+actor_1 = Kill(config)
+actor_1.start()

+ 4 - 4
src/utils/transport.py

@@ -410,7 +410,6 @@ class QueueReader(MessageQueue,Reader):
 				
 				pass
 				#self.close()
-			
 			# r[qid].append( self.data)
 		
 		return self.data
@@ -419,10 +418,11 @@ class QueueListener(QueueReader):
 		properties = pika.ConnectionParameters(host=self.host)
 		self.connection = pika.BlockingConnection(properties)
 		self.channel	= self.connection.channel()
-		self.channel.exchange_declare(exchange=self.uid,type='fanout')
+		self.channel.exchange_declare(exchange=self.uid,type='direct' )
 
-		self.info = self.channel.queue_declare(queue=qid,exclusive=True)
-		self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue)
+		self.info = self.channel.queue_declare(exclusive=True,queue=qid)
+		print self.info.method.queue
+		self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid)
 		#self.callback = callback
 	def read(self):