Quellcode durchsuchen

Bug fix with transport and listener, actor implementation

steve vor 8 Jahren
Ursprung
Commit
e3a0e6e8b0
2 geänderte Dateien mit 23 neuen und 12 gelöschten Zeilen
  1. 13 6
      src/utils/agents/actor.py
  2. 10 6
      src/utils/transport.py

+ 13 - 6
src/utils/agents/actor.py

@@ -14,13 +14,15 @@ from threading import Thread
 import os
 import subprocess
 from monitor import ProcessCounter
+from utils.transport import QueueListener
 class Actor(Thread):
 	def __init__(self,config):
 		Thread.__init__(self)
+		self.config = config
 		self.items = []
 	def init(self,litems):
 		self.items = litems
-        def process(self,item):
+	def process(self,item):
 		pass
 	def execute(self,cmd):
 		stream = None
@@ -35,13 +37,14 @@ class Actor(Thread):
 		
 	def run(self):
 		info = {}
-		info['exchange'] = self.config['organization']
-		info['uid']	= self.config['id']
-		info['qid']	= ['action']
+		host	= self.config['api']
+		uid	= self.config['key']
+		qid	= self.config['id']
 		
-		qlistener = QueueListener(info)		
+		qlistener = QueueListener(qid=qid,uid=uid,host=host)		
+		qlistener.callback = self.callback
 		qlistener.read()
-		r = [self.process(item) for item in self.litems]
+		r = [self.process(item) for item in self.items]
 class Kill(Actor):
 	def __init__(self,config):
 		Actor.__init__(self,config)
@@ -62,3 +65,7 @@ class Start(Actor):
 	
 class Alert(Actor):
 	pass
+
+config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"}
+actor = Kill(config)
+actor.start()

+ 10 - 6
src/utils/transport.py

@@ -359,8 +359,7 @@ class QueueReader(MessageQueue,Reader):
 		self.channel	= self.connection.channel()
 		self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
 
-		self.info = self.channel.queue_
-declare(queue=qid,durable=True)
+		self.info = self.channel.queue_declare(queue=qid,durable=True)
 	
 
 
@@ -416,16 +415,21 @@ declare(queue=qid,durable=True)
 		
 		return self.data
 class QueueListener(QueueReader):
-	def init(self,qid,callback):
+	def init(self,qid):
 		properties = pika.ConnectionParameters(host=self.host)
 		self.connection = pika.BlockingConnection(properties)
 		self.channel	= self.connection.channel()
 		self.channel.exchange_declare(exchange=self.uid,type='fanout')
 
 		self.info = self.channel.queue_declare(queue=qid,exclusive=True)
-		self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,no_ack=True)
-		self.callback = callback
-		
+		self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue)
+		#self.callback = callback
+	def read(self):
+    	
+		self.init(self.qid)
+		self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True);
+		self.channel.start_consuming()
+    		
 """
 	This class is designed to write output as sql insert statements
 	The class will inherit from DiskWriter with minor adjustments