Преглед на файлове

implementing orchestrator

steve преди 8 години
родител
ревизия
48a80a9b8e
променени са 1 файла, в които са добавени 46 реда и са изтрити 10 реда
  1. 46 10
      src/utils/agents/actor.py

+ 46 - 10
src/utils/agents/actor.py

@@ -17,6 +17,10 @@ import subprocess
 from monitor import ProcessCounter
 from utils.transport import QueueListener, QueueWriter
 class Actor(Thread):
+        @staticmethod
+        def instance(id,config):
+                pass
+    
 	def __init__(self,config):
 		Thread.__init__(self)
 		self.config = config
@@ -78,16 +82,7 @@ class Actor(Thread):
                 qwriter.write(label=qid,row=content)
                 #qwriter.close()
                 pass
-	def run(self):
-		info = {}
-		host	= self.config['api']
-		uid	= self.config['key']
-		qid	= self.config['id']
-		
-		qlistener = QueueListener(qid=qid,uid=uid,host=host)		
-		qlistener.callback = self.callback
-		qlistener.read()
-		r = [self.process(item) for item in self.items]
+
 class Kill(Actor):
 	def __init__(self,config):
 		Actor.__init__(self,config)
@@ -106,12 +101,53 @@ class Start(Actor):
 		cmd = " ".join([path,args])
 		self.execute(cmd)
 	
+class Orchestrator(Actor):
+        def __init__(self,config):
+                self.actors = {}
+                for id in config :
+                    _config_ = config[id]
+                    item = Actor.instance(id,config[id])
+                    self.actors[id] = item
+                pass
+
+	def callback(self,channel,method,header,stream):
+                print [self.getIdentifier(),stream]
+                message = json.loads(stream)
+                content = message['content']
+                sender = message['from']
+                if content.lower() == 'quit' :
+                    channel.close()
+                    print " *** ",self.getIdentifier()
+                elif content.lower() == 'ping':
+                    self.post(to=sender,content="1")
+                else:
+                    self.process(content)
+                    self.post(to=sender,content=content)
+                
+            
+        def process(self,item):
+                id = item['cmd']
+                actor = self.actors[id]
+                actor.process(item) ;
+
+	def run(self):
+		info = {}
+		host	= self.config['api']
+		uid	= self.config['key']
+		qid	= self.config['id']
+		
+		qlistener = QueueListener(qid=qid,uid=uid,host=host)		
+		qlistener.callback = self.callback
+		qlistener.read()
+		r = [self.process(item) for item in self.items]
+
 """
     This class is designed to send a message to a given AMQP enpoint
     The AMQP endpoint is implemented by QueueWriter class
 """
 class Alert(Actor):
         def process(self,item):
+                
                 pass