瀏覽代碼

Communication fix

steve 8 年之前
父節點
當前提交
29ac48a2eb
共有 2 個文件被更改,包括 24 次插入5 次删除
  1. 23 4
      src/utils/agents/actor.py
  2. 1 1
      src/utils/transport.py

+ 23 - 4
src/utils/agents/actor.py

@@ -5,7 +5,8 @@
 		- Alert : Sends an email or Webhook
 		- Apps 	: Kill, Start
 		- Folder: Archive, Delete (all, age, size)
-		
+        By design we are to understand that a message is structured as follows:
+            {to,from,content} with content either being an arbitrary stream (or JSON)
 	@TODO: 
 		- upgrade to python 3.x
 """
@@ -14,7 +15,7 @@ from threading import Thread
 import os
 import subprocess
 from monitor import ProcessCounter
-from utils.transport import QueueListener
+from utils.transport import QueueListener, QueueWriter
 class Actor(Thread):
 	def __init__(self,config):
 		Thread.__init__(self)
@@ -38,6 +39,10 @@ class Actor(Thread):
 		return stream
 	def callback(self,channel,method,header,stream):
                 print [self.getIdentifier(),stream]
+                message = json.loads(stream)
+                content = message['content']
+                sender = message['from']
+                self.post(to=sender,content=content)
                 #message = None
                 #try:
                     #message = json.loads(stream)
@@ -47,8 +52,22 @@ class Actor(Thread):
                     #if 'id' in message :
                         #if 'payload' in message:
                             #self.execute(message['payload']
-                            
-                        
+        """
+            Sending a message to a queue with parameters to,from,content
+        """
+        def post(self,**args):
+                to      = args['to'] 
+                content = args['content']
+                message = {"from":self.getIdentifier(),"to":to,"content":content}
+                host	= self.config['api']
+                uid	= self.config['key']
+                qid	= to#self.config['id']
+                print [host,uid,qid]
+                qwriter = QueueWriter(host=host,uid=uid,qid=qid)
+                qwriter.init(qid)
+                qwriter.write(label=qid,row="got it")
+                #qwriter.close()
+                pass
 	def run(self):
 		info = {}
 		host	= self.config['api']

+ 1 - 1
src/utils/transport.py

@@ -418,7 +418,7 @@ class QueueListener(QueueReader):
 		properties = pika.ConnectionParameters(host=self.host)
 		self.connection = pika.BlockingConnection(properties)
 		self.channel	= self.connection.channel()
-		self.channel.exchange_declare(exchange=self.uid,type='direct' )
+		self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True )
 
 		self.info = self.channel.queue_declare(exclusive=True,queue=qid)
 		print self.info.method.queue