Преглед изворни кода

Bug fix with actors and remote initialization

steve пре 8 година
родитељ
комит
7fc6210ac2
2 измењених фајлова са 90 додато и 30 уклоњено
  1. 83 27
      src/utils/agents/actor.py
  2. 7 3
      src/utils/transport.py

+ 83 - 27
src/utils/agents/actor.py

@@ -15,7 +15,7 @@ from threading import Thread
 import os
 import subprocess
 from monitor import ProcessCounter
-from utils.transport import QueueListener, QueueWriter
+from utils.transport import QueueListener, QueueWriter, QueueReader
 class Actor:
         def __init__(self,config):
                 self.config = config
@@ -56,6 +56,7 @@ class Actor:
 
 class Kill(Actor):
         def isValid(self,item):
+                print self.config
                 return (item is not None) and (item in self.config)
 	def process(self,item):
 		cmd = "".join(["ps -eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9'])
@@ -67,7 +68,7 @@ class Start(Actor):
 	def __init__(self,config):
 		Actor.__init__(self,config)
         def isValid(self,name):
-                return name in self.config:
+                return name in self.config
                     
 	def process(self,name):
                 item = self.config[name]
@@ -87,43 +88,87 @@ class Orchestrator(Actor,Thread):
                 Thread.__init__(self)
                 Actor.__init__(self,config)
                 self.actors = {}
-                for id in config['actions'] :
-                    conf = config['actions'][id]
-                    item = eval("".join([id,"(",json.dumps(conf),")"]))
+                self.is_master_node = False
+                self.items = []
+                #
+                # If the configuration only has id,key then this is NOT the maestro
+                #
+                host = config['api']
+                qid = config['id']
+                if 'actions' in config :
+                    #
+                    # We are to assume this is the maestro/main node, and it will make the configuration available to other nodes
+                    # NOTE: This is has a predefined master thus is subject to known short comings (zombies)
+                    #
+                    
+                    q = QueueWriter(host=host,uid=config['key'],qid=qid)
+                    q.flush(config['id'])
+                    q.write(label=qid,row=json.dumps(config['actions']))
+                    q.close()
+                    self.is_master_node = True
+                            
+                    
+                else:
+                    qid = config['master']
+                    q = QueueReader(host=host,uid=config['key'],qid=qid)                    
+                    r = q.read()
+                    q.close()
+                    info = r[qid][0]
+                    self.init(info)
+                print "Initialized ***** ",self.getIdentifier(), " as ",config['id']
+                
+                    #
+                    # This object will have to request for the configuration
+                    #
+                #for id in config['actions'] :
+                    #conf = config['actions'][id]
+                    #item = eval("".join([id,"(",json.dumps(conf),")"]))
+                    #self.actors[id.lower()]  = item
+        """
+            This function is designed to provide the orchestrator a configuration             
+            @pre
+        """
+        def init(self,config):
+                
+                for id in config:
+                    
+                    setup_info  = config[id]
+                    item        = eval("".join([id,"(",json.dumps(setup_info),")"]))
                     self.actors[id.lower()]  = item
                 
-
 	def callback(self,channel,method,header,stream):
                 
                 message = json.loads(stream)
-                content = message['content']
-                sender = message['from']
-                to = message['to']
-                if content.lower() == 'quit' or to == 'quit':
-                    channel.close()
-                    print " *** ",self.getIdentifier()
-                elif content.lower() == 'ping' or to == 'ping':
-                    self.post(to=sender,content="1")
-                else:
-                    id = to.lower()
-                    actor = self.actors[id]
-                    if actor.isValid(content) :
-                        actor.process(content)
+                if 'content' in message :
+                    content = message['content']
+                    sender = message['from']
+                    to = message['to']
+                    if content.lower() == 'quit' or to == 'quit':
+                        channel.close()
+                        
+                    elif content.lower() == 'ping' or to == 'ping':
+                        self.post(to=sender,content="1")
                     else:
-                        content = {"status":"invalid","content":content}
-                    
-                    self.post(to=sender,content=content)
+                        id = to.lower()
+                        actor = self.actors[id]
+                        print "\tPerforming ",actor.getIdentifier()," on ",content," status ",actor.isValid(content)
+                        if actor.isValid(content) :
+                            actor.process(content)
+                        else:
+                            content = {"status":"invalid","content":content}
+                        
+                        self.post(to=sender,content=content)
                 
 	def run(self):
 		info = {}
 		host	= self.config['api']
 		uid	= self.config['key']
 		qid	= self.config['id']
-		
-		qlistener = QueueListener(qid=qid,uid=uid,host=host)		
-		qlistener.callback = self.callback
-		qlistener.read()
-		#r = [self.process(item) for item in self.items]
+		if not self.is_master_node :
+                    qlistener = QueueListener(qid=qid,uid=uid,host=host)		
+                    qlistener.callback = self.callback
+                    qlistener.read()
+                    r = [self.process(item) for item in self.items]
 
 """
     This class is designed to send a message to a given AMQP enpoint
@@ -144,8 +189,19 @@ config = {
         
         
     }            
+#thread = Orchestrator(config)
+#thread.start()
+
+config = {
+        "master":"demo-000",
+        "id":"demo-001",
+        "key":"[0v8]-247&7!v3","api":"localhost"
+        
+        
+    } 
 thread = Orchestrator(config)
 thread.start()
+
 #config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"}
 #actor = Kill(config)
 #actor.start()

+ 7 - 3
src/utils/transport.py

@@ -261,6 +261,7 @@ class MessageQueue:
 		self.close()
 		return resp
 	def close(self):
+            if self.connection.is_closed == False :
 		self.channel.close()
 		self.connection.close()
 """
@@ -351,7 +352,6 @@ class QueueReader(MessageQueue,Reader):
 			self.durable = False
 		self.size = -1
 		self.data = {}
-	
 	def init(self,qid):
 		
 		properties = pika.ConnectionParameters(host=self.host)
@@ -368,6 +368,7 @@ class QueueReader(MessageQueue,Reader):
 
 	"""
 	def callback(self,channel,method,header,stream):
+                
 		r = []
 		if re.match("^\{|\[",stream) is not None:
 			r = json.loads(stream)
@@ -399,9 +400,12 @@ class QueueReader(MessageQueue,Reader):
 		# We enabled the reader to be able to read from several queues (sequentially for now)
 		# The qid parameter will be an array of queues the reader will be reading from
 		#
+		if isinstance(self.qid,basestring) :
+                    self.qid = [self.qid]
 		for qid in self.qid:
 			self.init(qid)
 			# r[qid] = []
+			
 			if self.info.method.message_count > 0:
 				
 				self.channel.basic_consume(self.callback,queue=qid,no_ack=False);
@@ -420,8 +424,8 @@ class QueueListener(QueueReader):
 		self.channel	= self.connection.channel()
 		self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True )
 
-		self.info = self.channel.queue_declare(exclusive=True,queue=qid)
-		print self.info.method.queue
+		self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid)
+		
 		self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid)
 		#self.callback = callback
 	def read(self):