Sfoglia il codice sorgente

DC: connecting to queue

Steve Nyemba 7 anni fa
parent
commit
9509c66973
3 ha cambiato i file con 105 aggiunte e 112 eliminazioni
  1. 1 0
      src/data-collector.py
  2. 71 26
      src/utils/agents/actor.py
  3. 33 86
      src/utils/agents/manager.py

+ 1 - 0
src/data-collector.py

@@ -84,6 +84,7 @@ class Collector(Thread) :
 		# Initialiing the agents with the parameter values we know of
 		r = []
 		for agent in _agents :
+			
 			if agent.getName() in SYS_ARGS :
 				values = SYS_ARGS[agent.getName()]
 				# print (["init ",agent.getName(),values])

+ 71 - 26
src/utils/agents/actor.py

@@ -55,7 +55,7 @@ class Actor():
     def init(self,args):
         self.config = args
         
-    def isValid(self,item):
+    def isValid(self,**item):
         return False
 
     def execute(self,cmd):
@@ -75,47 +75,83 @@ class Apps(Actor) :
     """
     def __init__(self):
         Actor.__init__(self)
-        self.ng = None
-    def init(self,config) :
+        # self.ng = None
+    
+    def isValid(self,**args):
+        """ 
+            We insure that the provided application exists and that the payload is correct
+            p   validate the payload
+            q   validate the app can be restarted
+
+            @NOTE: killing the application has no preconditions/requirements
+        """
+        params = args['params']
+        action = args['action']
+        p = len(set(params.keys()) & set(['cmd','label'])) == 2
+        q = False
+        r = action in ['reboot','kill','start']
+        if p :
+            q = os.path.exists(params['cmd'])
+        return p and q and r
+    def init(self,action,params) :
         """
             This function will initialize the the actor with applications and associated arguments
             @param args {"apps_o":"","app_x":params}
         """
-        Actor.init(self,config)        
-        self.ng = ng(self.config.keys())
+        self.action = action
+        self.params = params     
+        # self.ng = ng(self.config.keys())
     
     
-    def can_start(self,name):
-        """
-            This function is intended to determine if it is possible to boot an application
+    # def can_start(self,name):
+    #     """
+    #         This function is intended to determine if it is possible to boot an application
 
-        """
-        items = self.ng.search(name) if self.ng is not None else []
-        if len(items) == 0 :
-            return False
-        else:
-            return items[0][1] > 0.01
+    #     """
+    #     items = self.ng.search(name) if self.ng is not None else []
+    #     if len(items) == 0 :
+    #         return False
+    #     else:
+    #         return items[0][1] > 0.01
 
-    def startup(self,name) :
+    def startup(self,cmd) :
         """
             This function is intended to start a program given the configuration
         """        
-        items   = self.ng.search(name)[0]
-        app = items[0]
-        args = self.config[app]
-        
-        cmd = " ".join([app,args,"&"    ])        
-        self.execute([app,args])
+        # items   = self.ng.search(name)[0]
+        # app = items[0]
+        # args = self.config[app]
+        try:
+            os.system(cmd +" &")
+        except Exception, e:
+            print e
         
     def kill(self,name) :
         """ 
             kill processes given the name, The function will not be case sensitive and partial names are accepted
             @NOTE: Make sure the reference to the app is not ambiguous
         """
-        args = "".join(['ps -eo pid,command|grep -E -i "',name.lower(),'"|grep -E "^ {0,}[0-9]+" -o|xargs kill -9'])        
-        #self.execute([args])
-        subprocess.call([args],shell=True)
-        
+        try:
+            args = "".join(['ps -eo pid,command|grep -E -i "',name.lower(),'"|grep -E "^ {0,}[0-9]+" -o|xargs kill -9'])        
+            #self.execute([args])
+            subprocess.call([args],shell=True)
+        except Exception,e:
+            print e
+    def run(self):
+        __action = str(self.action)
+        __params = dict(self.params)
+        if self.action == 'reboot' :
+            def pointer():
+                self.kill(__params['label'])
+                self.startup(__params['cmd'])
+        else:
+            def pointer() :
+                self.startup(__params['cmd'])
+        print __action,__params
+        print pointer
+        pointer()
+        # thread = Thread(target=pointer)
+        # thread.start()
     def analyze(self,logs) :
         """
             This function is designed to analyze a few logs and take appropriate action
@@ -186,7 +222,16 @@ class Folders(Actor):
         #self.action     = args['action'] #{clear,archive} config['actions']['folders']
         self.threshold  = self.get_size( args['threshold']) #self.config['threshold'])
         
-    
+    def isValid(self,**args):
+        action = args['action']
+        params = args['params']
+        p = action in ['clean','archive','backup']
+        q = False
+        r = False
+        if p :
+            q = params.keys()[0] in ['label','folder']
+            r = os.path.exists(params.values[0])
+        return p and q and r
     def archive(self,item):
         """
             This function will archive all files in a given folder

+ 33 - 86
src/utils/agents/manager.py

@@ -38,7 +38,13 @@ class Manager() :
 		self.DELAY  = int(self.plan['metadata']['delay'])
 		self.host = args['host']
 		self.update()	#-- Initializing status information
-
+		_args={"host":"dev.the-phi.com","qid":self.id,"uid":self.key}
+		#
+		# Connecting to the messaging service
+		self.qlistener = self.factory.instance(type="QueueListener",args=_args)
+		self.qlistener.callback = self.callback
+		self.qlistener.init(self.id)
+		self.qlistener.read()
 	def update(self) :
 		"""
 			This method inspect the plans for the current account and makes sure it can/should proceed
@@ -64,7 +70,9 @@ class Manager() :
 			self.LIMIT = -1
 	
 		#self.filter(meta)
-		
+		#
+		# We are removing all that is not necessary i.e making sure the features matches the plan user has paid for
+		#
 		self.agents  = self.filter('agents',meta,self.agents)
 		self.actors = self.filter('actors',meta,self.actors)
 		self.setup(meta)
@@ -89,60 +97,14 @@ class Manager() :
 	def filter(self,id,meta,objects):
 		values = meta[id].replace(' ','').split(',')
 		return [item for item in objects if item.getName() in values]
-	def __filter(self,meta) :
-		scope = []
-		lactors= []
-		for item in meta :
-			scope = scope + item['scope'].split(',')
-			if 'actors' in item :
-				lactors= lactors + item['actors'].split(',')
-		self.agents = [agent for agent in self.agents if agent.getName() in scope]
-		if len(lactors) == 0 :
-			self.actors = []
-		self.actors = [ actor for actor in self.actors if actor.getIdentifier() in lactors]
-		if len(self.actors) > 0 :
-			#
-			# We should configure the actors accordingly and make sure they are operational
-			#
-			
-			conf = {"apps":None}
-			#
-			# We need to get the configuration for the apps remotely
-			# 
-			read_class 	= self.config['store']['class']['read']
-			read_args	= self.config['store']['args']
-			couchdb	= self.factory.instance(type=read_class,args=read_args)
-			uinfo 	= couchdb.view('config/apps',key=self.key)
-			if 'apps' in uinfo :
-				conf['apps'] = uinfo['apps']
-			#
-			# Threshold will always give a default value
-			#
-			info = couchdb.view('config/folders',key=self.key)
-			threshold = info
-			conf['folder_threshold'] = threshold
-
-			mailer = None
-			for actor in self.actors :
-				id = actor.getIdentifier()
-				if id == "mailer" :
-					mailer = actor.Mailer()
-				if conf[id] is None :
-					continue
-				args = conf[id]
-				actor.init(args)
-			#
-			# Initializing the mailer
-			if mailer is not None and mailer in self.config:
-				mailer.init(self.config['mailer'])
-			
-		return meta
+	
 	def setup(self,meta) :
 		conf = {"folders":None,"apps":None}
 		read_class 	= self.config['store']['class']['read']
 		read_args	= self.config['store']['args']
 		
 
+		args = None
 		couchdb	= self.factory.instance(type=read_class,args=read_args)
 		args 	= couchdb.view('config/apps',key=self.key)
 		if len(args.keys()) > 0 :
@@ -157,41 +119,7 @@ class Manager() :
 		for actor in self.actors :
 			if args is not None and actor.getName() == name and len(args.keys()) > 0:								
 				actor.init(args)
-
-		
-	def __setup(self,meta):
-		#
-		# We should configure the actors accordingly and make sure they are operational
-		#
-		conf = {"folders":meta['folder_threshold'],"apps":None}
-		#
-		# We need to get the configuration for the apps remotely
-		# 
-		read_class 	= self.config['store']['class']['read']
-		read_args	= self.config['store']['args']
-		
-		couchdb	= self.factory.instance(type=read_class,args=read_args)
-		uinfo 	= couchdb.view('config/apps',key=self.key)
-		if 'apps' in uinfo :
-			conf['apps'] = uinfo['apps']
 		
-		for agent in self.agents :
-			agent.init(conf['apps'])
-
-		mailer = None
-		for actor in self.actors :
-			id = actor.getIdentifier()
-			if id == "mailer" :
-				mailer = actor.Mailer()
-			if conf[id] is None :
-				continue
-			args = conf[id]
-			actor.init(args)
-		#
-		# Initializing the mailer
-		if mailer is not None and mailer in self.config:
-			mailer.init(self.config['mailer'])
-				
 	def isvalid(self):
 		self.update()
 		return self.DELAY > -1 and self.LIMIT > -1
@@ -210,10 +138,25 @@ class Manager() :
 			This function enables the manager to be able to receive messages and delegate them to the appropriate actor
 			@channel
 		"""
-		print [channel,header]
 		message = json.loads(stream)
-		data = message['data']
+		#
+		# we should inspect the message and insure it has reached the appropriate recepient
+		#
+		if 'node' in message and message['node'] == self.id :
+			action = message['action']
+			params = message['params']
+			self.delegate(action,params)
+
+	def delegate(self,action,params):
+		for actor in self.actors :
+			
+			if actor.isValid(action=action,params=params) :
+				
+				actor.init(action,params)
+				actor.run()
 
+				break
+		pass
 	def run(self):
 		#DELAY=35*60 #- 35 Minutes
 		#LIMIT=1000
@@ -240,6 +183,10 @@ class Manager() :
 				else:
 					#label = id
 					row = data
+					#
+					# @TODO:
+					# This data should be marked if it has been flagged for reboot
+					# 
 				if type(row)==list and len(row) == 0 :
 
 					continue