Sfoglia il codice sorgente

Adding an iterator-like design pattern to the actor/orchestrator class hierarchy

steve 8 anni fa
parent
commit
4d238e7a79
1 ha cambiato i file con 57 aggiunte e 64 eliminazioni
  1. 57 64
      src/utils/agents/actor.py

+ 57 - 64
src/utils/agents/actor.py

@@ -16,56 +16,27 @@ import os
 import subprocess
 from monitor import ProcessCounter
 from utils.transport import QueueListener, QueueWriter
-class Actor(Thread):
-        @staticmethod
-        def instance(id,config):
-                pass
-    
-	def __init__(self,config):
-		Thread.__init__(self)
-		self.config = config
-		self.items = []
-		self.__id = config['id']
+class Actor:
+        def __init__(self,config):
+                self.config = config
         def getIdentifier(self):
-                return self.__id
+                return self.__class__.__name__.lower()
             
 	def init(self,litems):
 		self.items = litems
 	def process(self,item):
 		pass
+        def isValid(self,item):
+                return False
 	def execute(self,cmd):
 		stream = None
 		try:
-                        print self.getIdentifier()
-                        print cmd
 			handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
 			stream = handler.communicate()[0]
 		except Exception,e:
 			pass
 		return stream
-	def callback(self,channel,method,header,stream):
-                print [self.getIdentifier(),stream]
-                message = json.loads(stream)
-                content = message['content']
-                sender = message['from']
-                if content.lower() == 'quit' :
-                    channel.close()
-                    print " *** ",self.getIdentifier()
-                elif content.lower() == 'ping':
-                    self.post(to=sender,content="1")
-                else:
-                    self.process(content)
-                    self.post(to=sender,content=content)
-                
-                #message = None
-                #try:
-                    #message = json.loads(stream)
-                #except Exception, e:
-                    #pass
-                #if message is not None:
-                    #if 'id' in message :
-                        #if 'payload' in message:
-                            #self.execute(message['payload']
+
         """
             Sending a message to a queue with parameters to,from,content
         """
@@ -84,8 +55,8 @@ class Actor(Thread):
                 pass
 
 class Kill(Actor):
-	def __init__(self,config):
-		Actor.__init__(self,config)
+        def isValid(self,item):
+                return (item is not None) and (item in self.config)
 	def process(self,item):
 		cmd = "".join(["ps -eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9'])
 		self.execute(cmd)
@@ -95,41 +66,54 @@ class Kill(Actor):
 class Start(Actor):
 	def __init__(self,config):
 		Actor.__init__(self,config)
-	def process(self,item):
+        def isValid(self,name):
+                return name in self.config:
+                    
+	def process(self,name):
+                item = self.config[name]
 		path = item['path']
 		args = item['args'] if 'args' in item else ''
 		cmd = " ".join([path,args])
 		self.execute(cmd)
-	
-class Orchestrator(Actor):
+"""
+    The orchestrator class is designed to aggregate actions and communicate back to the caller
+    Mesage passing is structured as follows {from,to,content} The content is designed to be understood by the actor
+    
+    The orchestrator is implemented using a simple iterator design-pattern
+    @TODO: action specifications should be provided remotely
+"""
+class Orchestrator(Actor,Thread):
         def __init__(self,config):
+                Thread.__init__(self)
+                Actor.__init__(self,config)
                 self.actors = {}
-                for id in config :
-                    _config_ = config[id]
-                    item = Actor.instance(id,config[id])
-                    self.actors[id] = item
-                pass
+                for id in config['actions'] :
+                    conf = config['actions'][id]
+                    item = eval("".join([id,"(",json.dumps(conf),")"]))
+                    self.actors[id.lower()]  = item
+                
 
 	def callback(self,channel,method,header,stream):
-                print [self.getIdentifier(),stream]
+                
                 message = json.loads(stream)
                 content = message['content']
                 sender = message['from']
-                if content.lower() == 'quit' :
+                to = message['to']
+                if content.lower() == 'quit' or to == 'quit':
                     channel.close()
                     print " *** ",self.getIdentifier()
-                elif content.lower() == 'ping':
+                elif content.lower() == 'ping' or to == 'ping':
                     self.post(to=sender,content="1")
                 else:
-                    self.process(content)
+                    id = to.lower()
+                    actor = self.actors[id]
+                    if actor.isValid(content) :
+                        actor.process(content)
+                    else:
+                        content = {"status":"invalid","content":content}
+                    
                     self.post(to=sender,content=content)
                 
-            
-        def process(self,item):
-                id = item['cmd']
-                actor = self.actors[id]
-                actor.process(item) ;
-
 	def run(self):
 		info = {}
 		host	= self.config['api']
@@ -139,7 +123,7 @@ class Orchestrator(Actor):
 		qlistener = QueueListener(qid=qid,uid=uid,host=host)		
 		qlistener.callback = self.callback
 		qlistener.read()
-		r = [self.process(item) for item in self.items]
+		#r = [self.process(item) for item in self.items]
 
 """
     This class is designed to send a message to a given AMQP enpoint
@@ -150,11 +134,20 @@ class Alert(Actor):
                 
                 pass
 
-            
-config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"}
-actor = Kill(config)
-actor.start()
+config = {
+        "id":"demo-000",
+        "key":"[0v8]-247&7!v3","api":"localhost",
+        "actions":{
+                "Kill":["firefox"],
+                "Alert":[]
+                   }
+        
+        
+    }            
+thread = Orchestrator(config)
+thread.start()
+#config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"}
+#actor = Kill(config)
+#actor.start()
 
-config = {"id":"demo-100","key":"[0v8]-247&7!v3","api":"localhost"}
-actor_1 = Kill(config)
-actor_1.start()
+#config = {"id":"demo-100","key":"[0v8]-247&7!v3","api":"localhost"}