Explorar o código

Merge branch 'community' of http://dev.the-phi.com/git/steve/monitor into community

Steve L. Nyemba %!s(int64=8) %!d(string=hai) anos
pai
achega
6dc7a4c343
Modificáronse 9 ficheiros con 651 adicións e 120 borrados
  1. 13 10
      config.json
  2. 67 0
      init.sh
  3. 14 0
      readme.md
  4. 9 6
      requirements.txt
  5. 243 0
      src/api/static/img/uml-design
  6. 1 1
      src/api/static/js/jx
  7. 272 93
      src/utils/agents/actor.py
  8. 21 3
      src/utils/agents/data-collector.py
  9. 11 7
      src/utils/transport.py

+ 13 - 10
config.json

@@ -1,15 +1,18 @@
 {
-	"virtual-env":{
-		"class":"Sandbox",
-		"config":{
-			"3-launchpad":{"requirements":"/Users/steve/Documents/git/repair-file/required.txt","sandbox":"/Users/steve/Documents/git/sandbox"}
-		}
+	"id":"zulu-hacker",
+	"key":"4q-h8r5-247&!570p=[0v8]x360",
+	"api":"localhost",
+	"delay":0.5,
+	"store":{
+		"class":{"read":"CouchdbReader","write":"CouchdbWriter"},
+		"args":{"uri":"http://localhost:5984","dbname":"monitor","uid":"logs"}
 	},
-	"processes":{
-		"class":"DetailProcess",
-		"config":{
-			"system":["postgresql","couchdb","httpd"]
-			}
+	"procs":["kate","firefox"],
+	"folders":["/home/steve/tmp","/home/steve/git","/home/steve/Downloads"],
+	"actions":{
+		"folders":{"threshold":"10mb","action":"archive"},
+		"apps":{"firefox":"","kate":""	}
 	}
 	
+	
 }

+ 67 - 0
init.sh

@@ -0,0 +1,67 @@
+#!/bin/bash
+#
+# This script is designed to handle various operations related to setting up, starting, stopping the python application
+# It will assume the requirements file is at the root (not with the source code)
+#
+
+export PYTHONPATH=$PWD/src
+pip_upgrade='sandbox/bin/pip freeze|sort |diff requirements.txt -|grep \<|grep -E " .+$" -o'
+install(){
+	
+	virtualenv sandbox
+	sandbox/bin/pip install -r requirements.txt
+	`sandbox/bin/pip freeze|sort |diff requirements.txt -|grep \<|grep -E " .+$" -o|sandbox/bin/pip install --upgrade` 
+
+
+}
+upgrade(){
+	git pull
+	count=`sandbox/bin/pip freeze|sort |diff requirements.txt -|grep \<|grep -E " .+$" -o|wc -l`
+	if [ ! "$count" = "0" ]; then
+	 	`sandbox/bin/pip freeze|sort |diff requirements.txt -|grep \<|grep -E " .+$" -o|sandbox/bin/pip install --upgrade`
+	 else
+	 	echo "No Upgrade required for sandbox"
+	 fi
+	
+}
+start(){
+	
+	if [ "$1" = "collector" ]; then
+		sandbox/bin/python src/utils/agents/data-collector.py --path $PWD/config.json
+	else
+
+		sandbox/bin/python src/api/index.py --path $PWD/config.json &
+	fi
+
+}
+stop(){
+	ps -eo pid,command|grep python|grep -E "$PWD"|grep index.py|grep -E "^ {0,}[0-9]+" -o |xargs kill -9
+	ps -eo pid,command|grep python|grep -E "$PWD"|grep data-collector|grep -E "^ {0,}[0-9]+" -o |xargs kill -9
+}
+
+status(){
+	pid=`ps -eo pid,command|grep python|grep -E "$PWD"|grep index.py|grep -E "^ {0,}[0-9]+" -m 1 -o`
+	if [ "$pid" = "" ]; then
+		echo "API IS OFFLINE"
+	else
+		echo "API IS ONLINE $pid"
+	fi
+	pid=`ps -eo pid,command|grep python|grep -E "$PWD"|grep data-collector|grep -E "^ {0,}[0-9]+" -m 1 -o`
+	if [ "$pid" = "" ]; then 
+		echo "DATA-COLLECTOR IS OFFLINE"
+	else
+		echo "DATA-COLLECTOR IS ONLINE $pid"
+	fi
+
+}
+
+if [ "$1" = "start" ]; then
+	if [ "$2" = "collector" ]; then
+		start "collector"
+	else
+		start
+	fi
+else
+	$1
+
+fi

+ 14 - 0
readme.md

@@ -6,6 +6,20 @@ The program answers basic questions:
 	- Is a given program still running
 	- How much resource (memory/cpu) a program is using up
 	- The number of processes found
+	- Folder monitoring ...
+	
+#Architecture
+
+The architecture of the system is distributed with a central master node,
+    {
+        "id":"",
+        "key":"",
+        "apps":[],
+        "sandbox":[{"path":"","requirements":""}],
+        "folders":["path-1"],
+        "store":{}
+        "actions":{}
+    }
 
 The agent will perform three basic functions :
 

+ 9 - 6
requirements.txt

@@ -1,13 +1,16 @@
-aniso8601==1.2.0
-click==6.6
-couchdbkit==0.6.5
 Flask==0.11.1
 Flask-Session==0.3.0
 Flask-SocketIO==2.8.2
-http-parser==0.8.3
-itsdangerous==0.24
 Jinja2==2.8
 MarkupSafe==0.23
+Werkzeug==0.11.11
+aniso8601==1.2.0
+argparse==1.2.1
+click==6.6
+couchdbkit==0.6.5
+http-parser==0.8.3
+itsdangerous==0.24
+ngram==3.3.0
 numpy==1.11.3
 pika==0.10.0
 python-dateutil==2.6.0
@@ -17,4 +20,4 @@ pytz==2016.10
 restkit==4.2.2
 six==1.10.0
 socketpool==0.5.3
-Werkzeug==0.11.11
+wsgiref==0.1.2

A diferenza do arquivo foi suprimida porque é demasiado grande
+ 243 - 0
src/api/static/img/uml-design


+ 1 - 1
src/api/static/js/jx

@@ -1 +1 @@
-Subproject commit 36f9f10ff44406cc0418cc5934eb475bc77ebb1e
+Subproject commit 3d00f5a126574f2277cdac25d60008ee35dc8740

+ 272 - 93
src/utils/agents/actor.py

@@ -13,112 +13,291 @@
 import json
 from threading import Thread
 import os
+import shutil
 import subprocess
 from monitor import ProcessCounter
-from utils.transport import QueueListener, QueueWriter
+from utils.transport import QueueListener, QueueWriter, QueueReader
+from utils.params import PARAMS
+from ngram import NGram as ng
 class Actor(Thread):
-	def __init__(self,config):
-		Thread.__init__(self)
-		self.config = config
-		self.items = []
-		self.__id = config['id']
-        def getIdentifier(self):
-                return self.__id
-            
-	def init(self,litems):
-		self.items = litems
+    def __init__(self):
+        Thread.__init__(self)
+        pass
+    def getIdentifier(self):
+        return self.__class__.__name__.lower()
+    """
+        Initializing the class with configuration. The configuration will be specific to each subclass
+
+    """
+    def init(self,config,item=None):
+        self.config = config
+        self.item = item
 	def process(self,item):
 		pass
-	def execute(self,cmd):
-		stream = None
-		try:
-                        print self.getIdentifier()
-                        print cmd
-			handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
-			stream = handler.communicate()[0]
-		except Exception,e:
-			pass
-		return stream
-	def callback(self,channel,method,header,stream):
-                print [self.getIdentifier(),stream]
-                message = json.loads(stream)
-                content = message['content']
-                sender = message['from']
-                if content.lower() == 'quit' :
-                    channel.close()
-                    print " *** ",self.getIdentifier()
-                elif content.lower() == 'ping':
-                    self.post(to=sender,content="1")
-                else:
-                    self.process(content)
-                    self.post(to=sender,content=content)
-                
-                #message = None
-                #try:
-                    #message = json.loads(stream)
-                #except Exception, e:
-                    #pass
-                #if message is not None:
-                    #if 'id' in message :
-                        #if 'payload' in message:
-                            #self.execute(message['payload']
-        """
-            Sending a message to a queue with parameters to,from,content
-        """
-        def post(self,**args):
-                to      = args['to'] 
-                content = args['content']
-                message = {"from":self.getIdentifier(),"to":to,"content":content}
-                host	= self.config['api']
-                uid	= self.config['key']
-                qid	= to#self.config['id']
-                
-                qwriter = QueueWriter(host=host,uid=uid,qid=qid)
-                qwriter.init(qid)
-                qwriter.write(label=qid,row=content)
-                #qwriter.close()
-                pass
-	def run(self):
-		info = {}
-		host	= self.config['api']
-		uid	= self.config['key']
-		qid	= self.config['id']
-		
-		qlistener = QueueListener(qid=qid,uid=uid,host=host)		
-		qlistener.callback = self.callback
-		qlistener.read()
-		r = [self.process(item) for item in self.items]
+    def isValid(self,item):
+        return False
+
+    def execute(self,cmd):
+        stream = None
+        try:
+            subprocess.call (cmd,shell=False)
+            #stream = handler.communicate()[0]
+        except Exception,e:
+            pass
+        
+    def run(self):
+        if self.item is not None:
+            self.process(self.item)
+    """
+        Sending a message to a queue with parameters to,from,content
+    """
+    def post(self,**args):
+        pass
+"""
+    This is designed to handle folders i.e cleaning/archiving the folders
+    
+"""
+class Folders(Actor):
+    def init(self,config,item):
+        Actor.init(self,config,item)
+        self.lfolders   = config['folders']
+        self.config     = config['actions']['folders']
+        self.threshold  = self.get_size(self.config['threshold'])
+        self.item       = item
+    
+    def archive(self,item):
+    """
+        This function will archive all files in a given folder
+        @pre : isValid
+    """
+        folder = item['label']
+        signature='-'.join([str(item['date']),str(item['count']),'-files'])
+        tarball=os.sep([folder,signature])
+        shutil.make_archive(tarball,'tar',folder)
+        self.clean(item)
+        #
+        # @TODO: The archive can be uploaded to the cloud or else where
+        #   - This allows the submission of data to a processing engine if there ever were one
+        #
+        pass
+    
+    def clean(self,item):
+    """
+        This function consists in deleting files from a given folder
+    """
+        rpath = item['label']
+        lists = os.listdir(item['label'])
+        for name in list() :
+            path = os.sep([item['label'],name])
+            if os.path.isdir(path) :
+                shutil.rmtree(path)
+            else:
+                os.remove(path)
+        #
+        # 
+
+    def get_size(self,value):
+        units = {'MB':1000,'GB':1000000,'TB':1000000000} # converting to kb
+        key = set(unites) & set(re.split('(\d+)',value.upper()))
+        if len(key) == 0:
+            return -1
+        key = key.pop()
+        return float(value.upper().replace('MB','').strip()) * units[key]
+    
+    def isvalid(self,item):        
+    """
+        This function returns whether the following :
+        p : folder exists
+        q : has_reached threashold
+    """
+        
+        
+        p = os.path.exists(item['label']) and item['label'] in self.lfolders
+    
+        q = self.get_size(item['size']) >= self.threshold
+        return p and q
+    
+    def process(self,item):
+        if self.isValid(item) :
+            
+            name = self.config['action']
+            stream = "".join([name,'(',json.dumps(item),')'])
+            eval(stream)
+        
+        
 class Kill(Actor):
-	def __init__(self,config):
-		Actor.__init__(self,config)
+    
+    def isValid(self,item):                
+        return (item is not None) and (item in self.config)
 	def process(self,item):
-		cmd = "".join(["ps -eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9'])
-		self.execute(cmd)
+		args = "".join(["-eo pid,command|grep ",item,'|grep -E "^ {0,1}[0-9]+" -o|xargs kill -9'])
+		self.execute(["ps",args])
 		#
 		# We need to make sure we can get assess the process on this server
 		#
+
 class Start(Actor):
-	def __init__(self,config):
-		Actor.__init__(self,config)
-	def process(self,item):
-		path = item['path']
-		args = item['args'] if 'args' in item else ''
-		cmd = " ".join([path,args])
-		self.execute(cmd)
-	
+    def __init__(self):
+        Actor.__init__(self)
+        self.ng = None
+    
+    def init(self,config,item):
+        Actor.init(self,config,item)
+        self.config = config['apps']
+        self.ng = ng(self.config.keys())
+
+    def isValid(self,name):
+        items = self.ng.search(name) 
+        if len(items) == 0 :
+            return False
+        else:
+            return items[0][1] > 0.1
+                    
+    def process(self,row):
+        name    = row['label']
+        items   = self.ng.search(name)[0]
+        app = items[0]
+
+
+        args = self.config[app]
+        cmd = " ".join([app,args])
+        
+        self.execute([app,args])
+"""
+    This class is designed to handle applications i.e start/stopping applications
+    @TODO: Assess if a reboot is required, by looking at the variance/anomaly detection
+"""
+class Apps(Actor):
+    def __init__(self):
+        Actor.__init__(self)
+        self.crashes = []
+        self.running = []
+    
+    def isValid(self,rows):
+        status = [row['status'] for row in rows]
+        return 'crash' in status
+    
+    def classify(self,rows):
+        self.crashes = []
+        self.running = []
+        for row in rows:
+            if row['status'] == 'crash' :
+                self.crashes.append(row)
+            else:
+                self.running.append(row)
+    def reboot(self):
+        for row_run in self.running:
+            pass
+    def start(self):
+        for row_crash in self.crashes:
+            thread = Start()
+            thread.init(self.config,row_crash)
+            thread.daemon = True
+            thread.start()
+        
+    def process(self,rows):
+        self.classify(rows)
+        if self.crashes :
+            self.start()
+        if self.running:
+            self.reboot()
+        
+        
+class Event(Thread):    
+    def __init__(self,config):
+        pass
+    def run(self):
+"""
+    The orchestrator class is designed to aggregate actions and communicate back to the caller
+    Mesage passing is structured as follows {from,to,content} The content is designed to be understood by the actor
+    
+    The orchestrator is implemented using a simple iterator design-pattern
+    @TODO: action specifications should be provided remotely
+"""
+class Orchestrator(Actor):
+    
+    def __init__(self,config=None):
+        Actor.__init__(self)
+        if config is None:
+            f = open(PARAMS['path'])
+            config = json.loads(f.read())
+            f.close()
+        self.config = config
+        Actor.__init__(self)
+        
+        self.actors = {"apps":Apps(),"folders":Folders()}
+        self.is_master_node = False
+        self.items = []
+        #
+        # If the configuration only has id,key then this is NOT the maestro
+        #
+        host = config['api']
+        qid = config['id']
+        print "Initialized ***** ",self.getIdentifier(), " as ",config['id']
+        
+            #
+            # This object will have to request for the configuration
+            #
+        #for id in config['actions'] :
+            #conf = config['actions'][id]
+            #item = eval("".join([id,"(",json.dumps(conf),")"]))
+            #self.actors[id.lower()]  = item
+        """
+            This function is designed to provide the orchestrator a configuration             
+            @pre
+        """
+    def init(self,config):
+            
+        for id in config:
+            
+            setup_info  = config[id]
+            item        = eval("".join([id,"(",json.dumps(setup_info),")"]))
+            self.actors[id.lower()]  = item
+                
+    def callback(self,channel,method,header,stream):
+                
+        message = json.loads(stream)
+        if 'content' in message :
+            content = message['content']
+	    print self.actors        
+            to = message['to']
+            if isinstance(content,basestring) and content.lower() in ['quit'] or to=='quit':
+                if content.lower() == 'quit' or to == 'quit':
+                    print '**** closing ',self.getIdentifier()
+                    channel.close()
+            else:
+
+                id = to.lower()
+                actor = self.actors[id]
+                
+                if actor is not None and actor.isValid(content) :
+                    actor.init(self.config['actions'])                            
+                    actor.process(content)
+                else:
+                    content = {"status":"invalid","content":content}
+                
+                #self.post(to=sender,content=content)
+                
+    def run(self):
+
+        info = {}
+        host	= self.config['api']
+        uid	= self.config['key']
+        qid	= self.config['id']
+		
+        qlistener = QueueListener(qid=qid,uid=uid,host=host)		
+        qlistener.callback = self.callback
+        qlistener.read()
+        r = [self.process(item) for item in self.items]
+
 """
     This class is designed to send a message to a given AMQP enpoint
     The AMQP endpoint is implemented by QueueWriter class
 """
-class Alert(Actor):
-        def process(self,item):
-                pass
-
-            
-config = {"id":"demo","key":"[0v8]-247&7!v3","api":"localhost"}
-actor = Kill(config)
-actor.start()
+# class Alert(Actor):
+#     def process(self,item):
+#         pass
 
-config = {"id":"demo-100","key":"[0v8]-247&7!v3","api":"localhost"}
-actor_1 = Kill(config)
-actor_1.start()
+if __name__ == '__main__':
+    thread = Orchestrator()
+    thread.start()

+ 21 - 3
src/utils/agents/data-collector.py

@@ -27,6 +27,8 @@ class ICollector(Thread) :
 		self.factory	= DataSourceFactory()
 		self.init()
 		self.name = 'data-collector@'+self.id
+		
+		
 	def init(self):
 		
 		
@@ -51,7 +53,9 @@ class ICollector(Thread) :
 		self.quit = False
 		#self.DELAY = PARAMS['delay']*60
 		self.DELAY = self.config['delay']
-		
+		#
+		# we need to instanciate the actor orchestrator
+		#
 	"""
 		This function returns an instance of a data collector class :
 		ProcessDetails, FileWatch, ... provided the class name
@@ -70,6 +74,7 @@ class ICollector(Thread) :
 		write_class 	= self.config['store']['class']['write']
 		read_args	= self.config['store']['args']
 		DELAY	= self.config['delay'] * 60
+		
 		while self.quit == False:
 			
 			for thread in self.pool :
@@ -84,7 +89,20 @@ class ICollector(Thread) :
 				else:
 					label = id
 					row = data
-
+				#
+				# At this point we should check for the status and if it prompts an action
+				# @TODO Use a design pattern for this ... (Aggregation?)
+				#   - submit the row to Event for analysis
+				#   - The event orchestrator will handle things from this point on
+				#
+				message = {}
+				
+				message['to'] = thread.getName()
+				message['content'] = row
+				qwriter = QueueWriter(host=self.config['api'],uid=self.config['key'],qid=self.id)
+				qwriter.write(label=self.id,row = message)
+				qwriter.close()
+				
 				self.lock.acquire()
 				store = self.factory.instance(type=write_class,args=read_args)
 				store.flush(size=200)
@@ -105,4 +123,4 @@ class ICollector(Thread) :
 if __name__ == '__main__':
 	thread = ICollector()
 	# thread.daemon = True
-	thread.start()
+	thread.start()

+ 11 - 7
src/utils/transport.py

@@ -261,6 +261,7 @@ class MessageQueue:
 		self.close()
 		return resp
 	def close(self):
+            if self.connection.is_closed == False :
 		self.channel.close()
 		self.connection.close()
 """
@@ -351,7 +352,6 @@ class QueueReader(MessageQueue,Reader):
 			self.durable = False
 		self.size = -1
 		self.data = {}
-	
 	def init(self,qid):
 		
 		properties = pika.ConnectionParameters(host=self.host)
@@ -368,6 +368,7 @@ class QueueReader(MessageQueue,Reader):
 
 	"""
 	def callback(self,channel,method,header,stream):
+                
 		r = []
 		if re.match("^\{|\[",stream) is not None:
 			r = json.loads(stream)
@@ -399,9 +400,12 @@ class QueueReader(MessageQueue,Reader):
 		# We enabled the reader to be able to read from several queues (sequentially for now)
 		# The qid parameter will be an array of queues the reader will be reading from
 		#
+		if isinstance(self.qid,basestring) :
+                    self.qid = [self.qid]
 		for qid in self.qid:
 			self.init(qid)
 			# r[qid] = []
+			
 			if self.info.method.message_count > 0:
 				
 				self.channel.basic_consume(self.callback,queue=qid,no_ack=False);
@@ -420,8 +424,8 @@ class QueueListener(QueueReader):
 		self.channel	= self.connection.channel()
 		self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True )
 
-		self.info = self.channel.queue_declare(exclusive=True,queue=qid)
-		print self.info.method.queue
+		self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid)
+		
 		self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid)
 		#self.callback = callback
 	def read(self):
@@ -583,7 +587,7 @@ class CouchdbWriter(Couchdb,Writer):
 		self.dbase.save_doc(document)
 	def flush(self,**params) :
 		
-		size = params['size']
+		size = params['size'] if 'size' in params else 0
 		has_changed = False	
 		document = self.dbase.get(self.uid)
 		for key in document:
@@ -591,7 +595,7 @@ class CouchdbWriter(Couchdb,Writer):
 				content = document[key]
 			else:
 				continue
-			if isinstance(content,list):
+			if isinstance(content,list) and size > 0:
 				index = len(content) - size
 				content = content[index:]
 				document[key] = content
@@ -599,8 +603,8 @@ class CouchdbWriter(Couchdb,Writer):
 			else:
 				document[key] = {}
 				has_changed = True
-		if has_changed:
-			self.dbase.save_doc(document)
+		
+		self.dbase.save_doc(document)
 			
 	def archive(self,params=None):
 		document = self.dbase.get(self.uid)