Jelajahi Sumber

Adding/Enhancing actors to respond to events

steve 8 tahun lalu
induk
melakukan
a11c609d1d

File diff ditekan karena terlalu besar
+ 97 - 97
src/api/static/img/uml-design


+ 1 - 1
src/api/static/js/jx

@@ -1 +1 @@
-Subproject commit 36f9f10ff44406cc0418cc5934eb475bc77ebb1e
+Subproject commit 3d00f5a126574f2277cdac25d60008ee35dc8740

+ 125 - 106
src/utils/agents/actor.py

@@ -16,18 +16,23 @@ import os
 import subprocess
 from monitor import ProcessCounter
 from utils.transport import QueueListener, QueueWriter, QueueReader
+from utils.params import PARAMS
 class Actor:
-        def __init__(self,config):
-                self.config = config
+        def __init__(self):
+                pass
         def getIdentifier(self):
                 return self.__class__.__name__.lower()
-            
-	def init(self,litems):
-		self.items = litems
+    """
+        Initializing the class with configuration. The configuration will be specific to each subclass
+
+    """
+	def init(self,config):
+        self.config = config
 	def process(self,item):
 		pass
-        def isValid(self,item):
-                return False
+    def isValid(self,item):
+        return False
+
 	def execute(self,cmd):
 		stream = None
 		try:
@@ -41,24 +46,27 @@ class Actor:
             Sending a message to a queue with parameters to,from,content
         """
         def post(self,**args):
-                to      = args['to'] 
-                content = args['content']
-                message = {"from":self.getIdentifier(),"to":to,"content":content}
-                host	= self.config['api']
-                uid	= self.config['key']
-                qid	= to#self.conorfig['id']
-                
-                qwriter = QueueWriter(host=host,uid=uid,qid=qid)
-                qwriter.init(qid)
-                qwriter.write(label=qid,row=content)
-                #qwriter.close()
-                pass
-
+            # to      = args['to'] 
+            # content = args['content']
+            # message = {"from":self.getIdentifier(),"to":to,"content":content}
+            # host	= self.config['api']
+            # uid	= self.config['key']
+            # qid	= to#self.conorfig['id']
+            
+            # qwriter = QueueWriter(host=host,uid=uid,qid=qid)
+            # qwriter.init(qid)
+            # qwriter.write(label=qid,row=content)
+            # #qwriter.close()
+            pass
+class Folders(Actor):
+        def isvalid(self,item):
+            print self.conf
+        def process(self,item):
+            print item
 class Kill(Actor):
     
-        def isValid(self,item):
-                print self.config
-                return (item is not None) and (item in self.config)
+    def isValid(self,item):                
+        return (item is not None) and (item in self.config)
 	def process(self,item):
 		cmd = "".join(["ps -eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9'])
 		self.execute(cmd)
@@ -66,17 +74,31 @@ class Kill(Actor):
 		# We need to make sure we can get assess the process on this server
 		#
 class Start(Actor):
-	def __init__(self,config):
-		Actor.__init__(self,config)
+	def __init__(self):
+		Actor.__init__(self)
         def isValid(self,name):
-                return name in self.config
+            return name in self.config
                     
 	def process(self,name):
-                item = self.config[name]
-		path = item['path']
-		args = item['args'] if 'args' in item else ''
-		cmd = " ".join([path,args])
-		self.execute(cmd)
+        if name in self.config :
+            item = self.config['actions']['apps']['start'][name]
+            path = item['path']
+            args = item['args'] if 'args' in item else ''
+            cmd = " ".join([path,args])
+            self.execute(cmd)
+class Apps(Actor):
+    def __init__(self):
+        Actor.__init__(self)
+    def isValid(self,rows):
+        status = [row['status'] for row in rows]
+        return 'crash' in status
+
+    def process(self,rows):
+        rows = [row for row in rows if row['status'] == 'crash'] :
+        handler = Start()
+        for app in rows:
+            handler.process(app['label']) 
+            
 """
     The orchestrator class is designed to aggregate actions and communicate back to the caller
     Mesage passing is structured as follows {from,to,content} The content is designed to be understood by the actor
@@ -85,91 +107,95 @@ class Start(Actor):
     @TODO: action specifications should be provided remotely
 """
 class Orchestrator(Actor,Thread):
-        def __init__(self,config):
-                Thread.__init__(self)
-                Actor.__init__(self,config)
-                self.actors = {}
-                self.is_master_node = False
-                self.items = []
+        def __init__(self,config=None):
+            Thread.__init__(self)
+            if config is None:
+                f = open(PARAMS['path'])
+                config = json.loads(f.read())
+                f.close()
+            self.config = config
+            Actor.__init__(self)
+            self.actors = {"apps":Apps(),"folders":Folders()}
+            self.is_master_node = False
+            self.items = []
+            #
+            # If the configuration only has id,key then this is NOT the maestro
+            #
+            host = config['api']
+            qid = config['id']
+            if 'actions' in config :
                 #
-                # If the configuration only has id,key then this is NOT the maestro
+                # We are to assume this is the maestro/main node, and it will make the configuration available to other nodes
+                # NOTE: This is has a predefined master thus is subject to known short comings (zombies)
                 #
-                host = config['api']
-                qid = config['id']
-                if 'actions' in config :
-                    #
-                    # We are to assume this is the maestro/main node, and it will make the configuration available to other nodes
-                    # NOTE: This is has a predefined master thus is subject to known short comings (zombies)
-                    #
-                    
-                    q = QueueWriter(host=host,uid=config['key'],qid=qid)
-                    q.flush(config['id'])
-                    q.write(label=qid,row=json.dumps(config['actions']))
-                    q.close()
-                    self.is_master_node = True
-                            
-                    
-                else:
-                    qid = config['master']
-                    q = QueueReader(host=host,uid=config['key'],qid=qid)                    
-                    r = q.read()
-                    q.close()
-                    info = r[qid][0]
-                    self.init(info)
-                print "Initialized ***** ",self.getIdentifier(), " as ",config['id']
                 
-                    #
-                    # This object will have to request for the configuration
-                    #
-                #for id in config['actions'] :
-                    #conf = config['actions'][id]
-                    #item = eval("".join([id,"(",json.dumps(conf),")"]))
-                    #self.actors[id.lower()]  = item
+                q = QueueWriter(host=host,uid=config['key'],qid=qid)
+                q.flush(config['id'])
+                q.write(label=qid,row=json.dumps(config['actions']))
+                q.close()
+                self.is_master_node = True
+                        
+                
+            else:
+                qid = config['master']
+                q = QueueReader(host=host,uid=config['key'],qid=qid)                    
+                r = q.read()
+                q.close()
+                info = r[qid][0]
+                self.init(info)
+            print "Initialized ***** ",self.getIdentifier(), " as ",config['id']
+            
+                #
+                # This object will have to request for the configuration
+                #
+            #for id in config['actions'] :
+                #conf = config['actions'][id]
+                #item = eval("".join([id,"(",json.dumps(conf),")"]))
+                #self.actors[id.lower()]  = item
         """
             This function is designed to provide the orchestrator a configuration             
             @pre
         """
         def init(self,config):
                 
-                for id in config:
-                    
-                    setup_info  = config[id]
-                    item        = eval("".join([id,"(",json.dumps(setup_info),")"]))
-                    self.actors[id.lower()]  = item
+            for id in config:
+                
+                setup_info  = config[id]
+                item        = eval("".join([id,"(",json.dumps(setup_info),")"]))
+                self.actors[id.lower()]  = item
                 
 	def callback(self,channel,method,header,stream):
                 
-                message = json.loads(stream)
-                if 'content' in message :
-                    content = message['content']
-                    sender = message['from']
-                    to = message['to']
-                    if content.lower() == 'quit' or to == 'quit':
-                        channel.close()
-                        
-                    elif content.lower() == 'ping' or to == 'ping':
-                        self.post(to=sender,content="1")
-                    else:
-                        id = to.lower()
-                        actor = self.actors[id]
-                        print "\tPerforming ",actor.getIdentifier()," on ",content," status ",actor.isValid(content)
-                        if actor.isValid(content) :
-                            actor.process(content)
-                        else:
-                            content = {"status":"invalid","content":content}
-                        
-                        self.post(to=sender,content=content)
+        message = json.loads(stream)
+        if 'content' in message :
+            content = message['content']
+        
+            #sender = message['from']
+            to = message['to']
+            if isinstance(content,basestring) and content.lower() in ['quit'] or to=='quit':
+                if content.lower() == 'quit' or to == 'quit':
+                    channel.close()
+            else:
+                id = to.lower()
+                actor = self.actors[id]
+                print [id,actor.isValid(content)]
+                if actor is not None and actor.isValid(content) :                            
+                    actor.process(content)
+                else:
+                    content = {"status":"invalid","content":content}
+                
+                #self.post(to=sender,content=content)
                 
 	def run(self):
 		info = {}
 		host	= self.config['api']
 		uid	= self.config['key']
 		qid	= self.config['id']
-		if not self.is_master_node :
-                    qlistener = QueueListener(qid=qid,uid=uid,host=host)		
-                    qlistener.callback = self.callback
-                    qlistener.read()
-                    r = [self.process(item) for item in self.items]
+		
+        qlistener = QueueListener(qid=qid,uid=uid,host=host)		
+        qlistener.callback = self.callback
+        qlistener.read()
+        r = [self.process(item) for item in self.items]
 
 """
     This class is designed to send a message to a given AMQP enpoint
@@ -193,14 +219,7 @@ config = {
 #thread = Orchestrator(config)
 #thread.start()
 
-config = {
-        "master":"demo-000",
-        "id":"demo-001",
-        "key":"[0v8]-247&7!v3","api":"localhost"
-        
-        
-    } 
-thread = Orchestrator(config)
+thread = Orchestrator()
 thread.start()
 
 #config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"}

+ 16 - 5
src/utils/agents/data-collector.py

@@ -74,6 +74,7 @@ class ICollector(Thread) :
 		write_class 	= self.config['store']['class']['write']
 		read_args	= self.config['store']['args']
 		DELAY	= self.config['delay'] * 60
+		
 		while self.quit == False:
 			
 			for thread in self.pool :
@@ -88,10 +89,20 @@ class ICollector(Thread) :
 				else:
 					label = id
 					row = data
-                                #
-                                # At this point we should check for the status and if it prompts an action
-                                # @TODO Use a design pattern for this ...
-                                #
+				#
+				# At this point we should check for the status and if it prompts an action
+				# @TODO Use a design pattern for this ...
+				#   - submit the row to Event for analysis
+				#   - The event orchestrator will handle things from this point on
+				#
+				message = {}
+				
+				message['to'] = thread.getName()
+				message['content'] = row
+				qwriter = QueueWriter(host=self.config['api'],uid=self.config['key'],qid=self.id)
+				qwriter.write(label=self.id,row = message)
+				qwriter.close()
+				
 				self.lock.acquire()
 				store = self.factory.instance(type=write_class,args=read_args)
 				store.flush(size=200)
@@ -112,4 +123,4 @@ class ICollector(Thread) :
 if __name__ == '__main__':
 	thread = ICollector()
 	# thread.daemon = True
-	thread.start()
+	thread.start()

+ 4 - 4
src/utils/transport.py

@@ -587,7 +587,7 @@ class CouchdbWriter(Couchdb,Writer):
 		self.dbase.save_doc(document)
 	def flush(self,**params) :
 		
-		size = params['size']
+		size = params['size'] if 'size' in params else 0
 		has_changed = False	
 		document = self.dbase.get(self.uid)
 		for key in document:
@@ -595,7 +595,7 @@ class CouchdbWriter(Couchdb,Writer):
 				content = document[key]
 			else:
 				continue
-			if isinstance(content,list):
+			if isinstance(content,list) and size > 0:
 				index = len(content) - size
 				content = content[index:]
 				document[key] = content
@@ -603,8 +603,8 @@ class CouchdbWriter(Couchdb,Writer):
 			else:
 				document[key] = {}
 				has_changed = True
-		if has_changed:
-			self.dbase.save_doc(document)
+		
+		self.dbase.save_doc(document)
 			
 	def archive(self,params=None):
 		document = self.dbase.get(self.uid)