Browse Source

Actor implementation with Rabbitmq support

steve 8 years ago
parent
commit
357be165e2
2 changed files with 56 additions and 8 deletions
  1. 40 7
      src/utils/agents/actor.py
  2. 16 1
      src/utils/transport.py

+ 40 - 7
src/utils/agents/actor.py

@@ -11,21 +11,54 @@
 """
 import json
 from threading import Thread
+import os
+import subprocess
+from monitor import ProcessCounter
 class Actor(Thread):
 	def __init__(self,config):
+		Thread.__init__(self)
+		self.items = []
+	def init(self,litems):
+		self.items = litems
+        def process(self,item):
 		pass
-	def init(self,config):
-		pass
+	def execute(self,cmd):
+		stream = None
+		try:
+			handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
+			stream = handler.communicate()[0]
+		except Exception,e:
+			pass
+		return stream
+	def callback(self,channel,method,header,stream):
+		print stream
+		
+	def run(self):
+		info = {}
+		info['exchange'] = self.config['organization']
+		info['uid']	= self.config['id']
+		info['qid']	= ['action']
+		
+		qlistener = QueueListener(info)		
+		qlistener.read()
+		r = [self.process(item) for item in self.litems]
 class Kill(Actor):
 	def __init__(self,config):
 		Actor.__init__(self,config)
-	def init (self,app):
-		pass
+	def process(self,item):
+		cmd = "".join(["ps -eo pid,command|grep ",item,'|grep -E"^ {0,1}[0-9]+" -o|xargs kill -9'])
+		self.execute(cmd)
+		#
+		# We need to make sure we can get assess the process on this server
+		#
 class Start(Actor):
 	def __init__(self,config):
 		Actor.__init__(self,config)
-	def init(self,args):
-		path = args['path']
-		args = args['args'] if 'args' in args else ''
+	def process(self,item):
+		path = item['path']
+		args = item['args'] if 'args' in item else ''
+		cmd = " ".join([path,args])
+		self.execute(cmd)
+	
 class Alert(Actor):
 	pass

+ 16 - 1
src/utils/transport.py

@@ -345,6 +345,10 @@ class QueueReader(MessageQueue,Reader):
 		#self.uid = params['uid']
 		#self.qid = params['qid']
 		MessageQueue.__init__(self,**params);
+		if 'durable' in params :
+			self.durable = True
+		else:
+			self.durable = False
 		self.size = -1
 		self.data = {}
 	
@@ -355,7 +359,8 @@ class QueueReader(MessageQueue,Reader):
 		self.channel	= self.connection.channel()
 		self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
 
-		self.info = self.channel.queue_declare(queue=qid,durable=True)
+		self.info = self.channel.queue_
+declare(queue=qid,durable=True)
 	
 
 
@@ -410,6 +415,16 @@ class QueueReader(MessageQueue,Reader):
 			# r[qid].append( self.data)
 		
 		return self.data
+class QueueListener(QueueReader):
+	def init(self,qid,callback):
+		properties = pika.ConnectionParameters(host=self.host)
+		self.connection = pika.BlockingConnection(properties)
+		self.channel	= self.connection.channel()
+		self.channel.exchange_declare(exchange=self.uid,type='fanout')
+
+		self.info = self.channel.queue_declare(queue=qid,exclusive=True)
+		self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,no_ack=True)
+		self.callback = callback
 		
 """
 	This class is designed to write output as sql insert statements