Przeglądaj źródła

Handling of actions @TODO: Folder clean/archive

steve 8 lat temu
rodzic
commit
5083ea7c90
2 zmienionych plików z 139 dodań i 140 usunięć
  1. 9 6
      requirements.txt
  2. 130 134
      src/utils/agents/actor.py

+ 9 - 6
requirements.txt

@@ -1,13 +1,16 @@
-aniso8601==1.2.0
-click==6.6
-couchdbkit==0.6.5
 Flask==0.11.1
 Flask-Session==0.3.0
 Flask-SocketIO==2.8.2
-http-parser==0.8.3
-itsdangerous==0.24
 Jinja2==2.8
 MarkupSafe==0.23
+Werkzeug==0.11.11
+aniso8601==1.2.0
+argparse==1.2.1
+click==6.6
+couchdbkit==0.6.5
+http-parser==0.8.3
+itsdangerous==0.24
+ngram==3.3.0
 numpy==1.11.3
 pika==0.10.0
 python-dateutil==2.6.0
@@ -17,4 +20,4 @@ pytz==2016.10
 restkit==4.2.2
 six==1.10.0
 socketpool==0.5.3
-Werkzeug==0.11.11
+wsgiref==0.1.2

+ 130 - 134
src/utils/agents/actor.py

@@ -17,52 +17,46 @@ import subprocess
 from monitor import ProcessCounter
 from utils.transport import QueueListener, QueueWriter, QueueReader
 from utils.params import PARAMS
-class Actor:
-        def __init__(self):
-                pass
-        def getIdentifier(self):
-                return self.__class__.__name__.lower()
+from ngram import NGram as ng
+class Actor(Thread):
+    def __init__(self):
+        Thread.__init__(self)
+        pass
+    def getIdentifier(self):
+        return self.__class__.__name__.lower()
     """
         Initializing the class with configuration. The configuration will be specific to each subclass
 
     """
-	def init(self,config):
+    def init(self,config,item=None):
         self.config = config
+        self.item = item
 	def process(self,item):
 		pass
     def isValid(self,item):
         return False
 
-	def execute(self,cmd):
-		stream = None
-		try:
-			handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
-			stream = handler.communicate()[0]
-		except Exception,e:
-			pass
-		return stream
-
-        """
-            Sending a message to a queue with parameters to,from,content
-        """
-        def post(self,**args):
-            # to      = args['to'] 
-            # content = args['content']
-            # message = {"from":self.getIdentifier(),"to":to,"content":content}
-            # host	= self.config['api']
-            # uid	= self.config['key']
-            # qid	= to#self.conorfig['id']
-            
-            # qwriter = QueueWriter(host=host,uid=uid,qid=qid)
-            # qwriter.init(qid)
-            # qwriter.write(label=qid,row=content)
-            # #qwriter.close()
+    def execute(self,cmd):
+        stream = None
+        try:
+            subprocess.call (cmd,shell=False)
+            #stream = handler.communicate()[0]
+        except Exception,e:
             pass
+        
+    def run(self):
+        if self.item is not None:
+            self.process(self.item)
+    """
+        Sending a message to a queue with parameters to,from,content
+    """
+    def post(self,**args):
+        pass
 class Folders(Actor):
-        def isvalid(self,item):
-            print self.conf
-        def process(self,item):
-            print item
+    def isvalid(self,item):
+        print self.conf
+    def process(self,item):
+        print item
 class Kill(Actor):
     
     def isValid(self,item):                
@@ -73,31 +67,68 @@ class Kill(Actor):
 		#
 		# We need to make sure we can get assess the process on this server
 		#
+
 class Start(Actor):
-	def __init__(self):
-		Actor.__init__(self)
-        def isValid(self,name):
-            return name in self.config
+    def __init__(self):
+        Actor.__init__(self)
+        self.ng = None
+    
+    def init(self,config,item):
+        Actor.init(self,config,item)
+        self.config = config['start']
+        self.ng = ng(self.config.keys())
+
+    def isValid(self,name):
+        items = self.ng.search(name) 
+        if len(items) == 0 :
+            return False
+        else:
+            return items[0][1] > 0.1
                     
-	def process(self,name):
-        if name in self.config :
-            item = self.config['actions']['apps']['start'][name]
-            path = item['path']
-            args = item['args'] if 'args' in item else ''
-            cmd = " ".join([path,args])
-            self.execute(cmd)
+    def process(self,row):
+        name    = row['label']
+        items   = self.ng.search(name)[0]
+        app = items[0]
+
+
+        args = self.config[app]
+        cmd = " ".join([app,args])
+        
+        self.execute([app,args])
+"""
+    This class is designed to handle applications i.e start/stopping applications
+    @TODO: Assess if a reboot is required, by looking at the variance/anomaly detection
+"""
 class Apps(Actor):
     def __init__(self):
         Actor.__init__(self)
+        self.crashes = []
+        self.running = []
+    
     def isValid(self,rows):
         status = [row['status'] for row in rows]
         return 'crash' in status
-
+    
+    def classify(self,rows):
+        self.crashes = []
+        self.running = []
+        for row in rows:
+            if row['status'] == 'crash' :
+                self.crashes.append(row)
+            else:
+                self.running.append(row)
+    
     def process(self,rows):
-        rows = [row for row in rows if row['status'] == 'crash'] :
-        handler = Start()
-        for app in rows:
-            handler.process(app['label']) 
+        #rows = [row for row in rows if row['status'] == 'crash'] :
+        self.classify(rows)
+        #handler = Start()
+        #handler.init(self.config)
+        #[handler.process(row_crash) for row_crash in self.crashes ]
+        for row_crash in self.crashes:
+            thread = Start()
+            thread.init(self.config,row_crash)
+            thread.daemon = True
+            thread.start()
             
 """
     The orchestrator class is designed to aggregate actions and communicate back to the caller
@@ -106,65 +137,46 @@ class Apps(Actor):
     The orchestrator is implemented using a simple iterator design-pattern
     @TODO: action specifications should be provided remotely
 """
-class Orchestrator(Actor,Thread):
-        def __init__(self,config=None):
-            Thread.__init__(self)
-            if config is None:
-                f = open(PARAMS['path'])
-                config = json.loads(f.read())
-                f.close()
-            self.config = config
-            Actor.__init__(self)
-            self.actors = {"apps":Apps(),"folders":Folders()}
-            self.is_master_node = False
-            self.items = []
+class Orchestrator(Actor):
+    
+    def __init__(self,config=None):
+        Actor.__init__(self)
+        if config is None:
+            f = open(PARAMS['path'])
+            config = json.loads(f.read())
+            f.close()
+        self.config = config
+        Actor.__init__(self)
+        self.actors = {"apps":Apps(),"folders":Folders()}
+        self.is_master_node = False
+        self.items = []
+        #
+        # If the configuration only has id,key then this is NOT the maestro
+        #
+        host = config['api']
+        qid = config['id']
+        print "Initialized ***** ",self.getIdentifier(), " as ",config['id']
+        
             #
-            # If the configuration only has id,key then this is NOT the maestro
+            # This object will have to request for the configuration
             #
-            host = config['api']
-            qid = config['id']
-            if 'actions' in config :
-                #
-                # We are to assume this is the maestro/main node, and it will make the configuration available to other nodes
-                # NOTE: This is has a predefined master thus is subject to known short comings (zombies)
-                #
-                
-                q = QueueWriter(host=host,uid=config['key'],qid=qid)
-                q.flush(config['id'])
-                q.write(label=qid,row=json.dumps(config['actions']))
-                q.close()
-                self.is_master_node = True
-                        
-                
-            else:
-                qid = config['master']
-                q = QueueReader(host=host,uid=config['key'],qid=qid)                    
-                r = q.read()
-                q.close()
-                info = r[qid][0]
-                self.init(info)
-            print "Initialized ***** ",self.getIdentifier(), " as ",config['id']
-            
-                #
-                # This object will have to request for the configuration
-                #
-            #for id in config['actions'] :
-                #conf = config['actions'][id]
-                #item = eval("".join([id,"(",json.dumps(conf),")"]))
-                #self.actors[id.lower()]  = item
+        #for id in config['actions'] :
+            #conf = config['actions'][id]
+            #item = eval("".join([id,"(",json.dumps(conf),")"]))
+            #self.actors[id.lower()]  = item
         """
             This function is designed to provide the orchestrator a configuration             
             @pre
         """
-        def init(self,config):
-                
-            for id in config:
-                
-                setup_info  = config[id]
-                item        = eval("".join([id,"(",json.dumps(setup_info),")"]))
-                self.actors[id.lower()]  = item
+    def init(self,config):
+            
+        for id in config:
+            
+            setup_info  = config[id]
+            item        = eval("".join([id,"(",json.dumps(setup_info),")"]))
+            self.actors[id.lower()]  = item
                 
-	def callback(self,channel,method,header,stream):
+    def callback(self,channel,method,header,stream):
                 
         message = json.loads(stream)
         if 'content' in message :
@@ -174,23 +186,26 @@ class Orchestrator(Actor,Thread):
             to = message['to']
             if isinstance(content,basestring) and content.lower() in ['quit'] or to=='quit':
                 if content.lower() == 'quit' or to == 'quit':
+                    print '**** closing ',self.getIdentifier()
                     channel.close()
             else:
                 id = to.lower()
                 actor = self.actors[id]
-                print [id,actor.isValid(content)]
-                if actor is not None and actor.isValid(content) :                            
+                
+                if actor is not None and actor.isValid(content) :
+                    actor.init(self.config['actions'])                            
                     actor.process(content)
                 else:
                     content = {"status":"invalid","content":content}
                 
                 #self.post(to=sender,content=content)
                 
-	def run(self):
-		info = {}
-		host	= self.config['api']
-		uid	= self.config['key']
-		qid	= self.config['id']
+    def run(self):
+
+        info = {}
+        host	= self.config['api']
+        uid	= self.config['key']
+        qid	= self.config['id']
 		
         qlistener = QueueListener(qid=qid,uid=uid,host=host)		
         qlistener.callback = self.callback
@@ -201,29 +216,10 @@ class Orchestrator(Actor,Thread):
     This class is designed to send a message to a given AMQP enpoint
     The AMQP endpoint is implemented by QueueWriter class
 """
-class Alert(Actor):
-        def process(self,item):
-                
-                pass
-
-config = {
-        "id":"demo-000",
-        "key":"[0v8]-247&7!v3","api":"localhost",
-        "actions":{
-                "Kill":["firefox"],
-                "Alert":[]
-                   }
-        
-        
-    }            
-#thread = Orchestrator(config)
-#thread.start()
-
-thread = Orchestrator()
-thread.start()
-
-#config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"}
-#actor = Kill(config)
-#actor.start()
+# class Alert(Actor):
+#     def process(self,item):
+#         pass
 
-#config = {"id":"demo-100","key":"[0v8]-247&7!v3","api":"localhost"}
+if __name__ == '__main__':
+    thread = Orchestrator()
+    thread.start()