Browse Source

Bug fixes with folder monitoring

Steve L. Nyemba 8 years ago
parent
commit
7e01d4a47d
4 changed files with 163 additions and 75 deletions
  1. 89 71
      src/monitor.py
  2. 17 0
      src/utils/ml.py
  3. 53 1
      src/utils/workers.py
  4. 4 3
      test/TestServerMonitor.py

+ 89 - 71
src/monitor.py

@@ -14,6 +14,8 @@ import datetime
 import urllib2 as http, base64
 import urllib2 as http, base64
 from threading import Thread, RLock
 from threading import Thread, RLock
 import time
 import time
+import numpy as np
+from utils.ml import ML
 class Analysis:
 class Analysis:
 	def __init__(self):
 	def __init__(self):
 		self.logs = []
 		self.logs = []
@@ -77,9 +79,16 @@ class Sandbox(Analysis):
 	def __init__(self):
 	def __init__(self):
 		Analysis.__init__(self)
 		Analysis.__init__(self)
 	def init(self,conf):
 	def init(self,conf):
-		#Analysis.init(self)
-		self.sandbox_path = conf['sandbox']
-		self.requirements_path = conf['requirements']
+		#Analysis.init(self)		
+		if os.path.exists(conf['sandbox']) :
+			self.sandbox_path = conf['sandbox']
+		else:
+			self.sandbox_path = None
+		if os.path.exists(conf['requirements']) :
+			self.requirements_path = conf['requirements']
+		else:
+			self.requirements_path = None
+
 	def get_requirements (self):
 	def get_requirements (self):
 		f = open(self.requirements_path)
 		f = open(self.requirements_path)
 		return [ name.replace('-',' ').replace('_',' ') for name in f.read().split('\n') if name != '']
 		return [ name.replace('-',' ').replace('_',' ') for name in f.read().split('\n') if name != '']
@@ -100,14 +109,17 @@ class Sandbox(Analysis):
 	"""
 	"""
 	def composite(self):
 	def composite(self):
 		Analysis.init(self)
 		Analysis.init(self)
-		required_modules= self.get_requirements()
-		sandbox_modules	= self.get_sandbox_requirements()
-		N = len(required_modules)
-		n = len(Set(required_modules) - Set(sandbox_modules))
-		value = round(1 - (n/N),2)*100
-		missing = list(Set(required_modules) - Set(sandbox_modules))
-		
-		return dict(self.getNow(),**{"value":value,"missing":missing})
+		if self.sandbox_path and self.requirements_path :
+			required_modules= self.get_requirements()
+			sandbox_modules	= self.get_sandbox_requirements()
+			N = len(required_modules)
+			n = len(Set(required_modules) - Set(sandbox_modules))
+			value = round(1 - (n/N),2)*100
+			missing = list(Set(required_modules) - Set(sandbox_modules))
+			
+			return dict(self.getNow(),**{"value":value,"missing":missing})
+		else:
+			return None
 
 
 """
 """
 	This class performs the analysis of a list of processes and determines
 	This class performs the analysis of a list of processes and determines
@@ -219,100 +231,106 @@ class DetailProcess(Analysis):
 	Additionally the the details are summarized in terms of global size, and oldest file.
 	Additionally the the details are summarized in terms of global size, and oldest file.
 """
 """
 class FileWatch(Analysis):
 class FileWatch(Analysis):
-	def __init__(self,folders):
-		self.folders = folders
+	def __init__(self):
 		pass
 		pass
+	def init(self,folders):
+		self.folders = folders;
 	def split(self,row):
 	def split(self,row):
 		
 		
 		x = row.split(' ')
 		x = row.split(' ')
 		r = {}
 		r = {}
 		months = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
 		months = ['Jan','Feb','Mar','Apr','May','Jun','Jul','Aug','Sep','Oct','Nov','Dec']
 		if x:
 		if x:
-			if 'K' in x[0]:
-				print x
-				size = float(x[0].replace('K','').replace('KB','')) / 1000
-			elif 'M' in x[0] :
-				size = float(x[0].replace('MB','').replace('M',''))
-			elif 'G' in x[0] :
-				size = float(x[0].replace('GB','').replace('G','')) * 1000
-			elif 'T' in x[0] :
-				size = float(x[0].replace('TB','').replace('T','')) * 1000000
-			else :
-				#
-				# Size provided in bytes we are converting into MB
-				size = float(x[0].replace('B','')) / 1000000
+			BYTES_TO_MB = 1000000
+			size = int(x[0])/BYTES_TO_MB
 			month	= months.index(x[1]) + 1
 			month	= months.index(x[1]) + 1
 			day	= int(x[2])
 			day	= int(x[2])
 			age = -1
 			age = -1
 			hour=minute = 0
 			hour=minute = 0
 			if ':' in x[3] :
 			if ':' in x[3] :
 				hour,minute	= x[3].split(':')
 				hour,minute	= x[3].split(':')
-			if re.match('^\d+$',x[4]):	
-				year	= int(x[4])
+				now = datetime.datetime.today()
+				if month == now.month :
+					year	= now.year
+				else:
+					year = now.year - 1
 			else:
 			else:
-				year	= datetime.datetime.today().year
-			print [year,month,day,x[4]]
+				year = int(x[3])
+				hour = 0
+				minute = 0
+				
+			
+			
 			file_date = datetime.datetime(year,month,day,int(hour),int(minute))
 			file_date = datetime.datetime(year,month,day,int(hour),int(minute))
-			size = round(size,2)
+			# size = round(size,2)
 			#file_date = datetime.datetime(year,month,day,hour,minute)
 			#file_date = datetime.datetime(year,month,day,hour,minute)
-			age = (file_date - datetime.datetime.now()).days
+			age = (datetime.datetime.now() - file_date ).days
+			
 			return {"size":size,"age":age}	
 			return {"size":size,"age":age}	
 		return None
 		return None
 
 
 	def evaluate(self,path):
 	def evaluate(self,path):
 		cmd = "find  :path -print0|xargs -0 ls -ls |awk '{print $6,$7,$8,$9,$10}'".replace(":path",path)
 		cmd = "find  :path -print0|xargs -0 ls -ls |awk '{print $6,$7,$8,$9,$10}'".replace(":path",path)
-		print cmd
+		
 		handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
 		handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
 		ostream = handler.communicate()[0].split('\n')
 		ostream = handler.communicate()[0].split('\n')
 		
 		
-		return [self.split(stream) for stream in ostream if stream.strip() != '']
+		#return [self.split(stream) for stream in ostream if stream.strip() != '' and '.DS_Store' not in stream and 'total' not in stream]
+		return [self.split(stream) for stream in ostream if path not in stream and not set(['','total','.DS_Store']) & set(stream.split(' '))]
 	def composite(self):
 	def composite(self):
-		d = [] #-- matrix of details (age,size)
-		s = {} #-- summary of the 
+		d = [] #-- vector of details (age,size)
+		
+		now = datetime.datetime.today()
 		for folder in self.folders:
 		for folder in self.folders:
 			if os.path.exists(folder):
 			if os.path.exists(folder):
-				d += self.evaluate(folder)
-				xo = np.array(ML.Extract(['size','age'],d))
-				s[folder] = [np.sum(xo[:,0]),np.max(xo[:,1])]
+				xo_raw = self.evaluate(folder)
+				xo = np.array(ML.Extract(['size','age'],xo_raw))				
+				xo = {"label":folder,"details":xo_raw,"summary":{"size":round(np.sum(xo[:,0]),2),"age":np.max(xo[:,1]),"count":len(xo[:,1])}}
+				xo['day'] = now.day
+				xo['month'] = now.month
+				xo['year'] = now.year
+				d.append(xo)
 		
 		
-		return {"summary":s,"details":d}
-class Monitor (Thread):
-	def __init__(self,pConfig,pWriter,id='processes') :
-		Thread.__init__(self)
+		return d
+
+
+# class Monitor (Thread):
+# 	def __init__(self,pConfig,pWriter,id='processes') :
+# 		Thread.__init__(self)
 		
 		
-		self.config 	= pConfig[id]
-		self.writer	= pWriter;
-		self.logs	= []
-		self.handler = self.config['class']
-		self.mconfig = self.config['config']
+# 		self.config 	= pConfig[id]
+# 		self.writer	= pWriter;
+# 		self.logs	= []
+# 		self.handler = self.config['class']
+# 		self.mconfig = self.config['config']
 		
 		
 			
 			
 		
 		
-	def stop(self):
-		self.keep_running = False
-	def run(self):
-		r = {}
-		self.keep_running = True
-		lock = RLock()
-		while self.keep_running:
-			lock.acquire()
-			for label in self.mconfig:
+# 	def stop(self):
+# 		self.keep_running = False
+# 	def run(self):
+# 		r = {}
+# 		self.keep_running = True
+# 		lock = RLock()
+# 		while self.keep_running:
+# 			lock.acquire()
+# 			for label in self.mconfig:
 				
 				
-				self.handler.init(self.mconfig[label])
-				r = self.handler.composite()
-				self.writer.write(label=label,row = r)
+# 				self.handler.init(self.mconfig[label])
+# 				r = self.handler.composite()
+# 				self.writer.write(label=label,row = r)
 				
 				
-				time.sleep(2)
-			lock.release()		
+# 				time.sleep(2)
+# 			lock.release()		
 			
 			
-			self.prune()
-			TIME_LAPSE = 60*2
-			time.sleep(TIME_LAPSE)
-		print "Stopped ..."
-	def prune(self) :
+# 			self.prune()
+# 			TIME_LAPSE = 60*2
+# 			time.sleep(TIME_LAPSE)
+# 		print "Stopped ..."
+# 	def prune(self) :
 		
 		
-		MAX_ENTRIES = 100
-		if len(self.logs) > MAX_ENTRIES :
-			BEG = len(self.logs) - MAX_SIZE -1 
-			self.logs = self.logs[BEG:]
+# 		MAX_ENTRIES = 100
+# 		if len(self.logs) > MAX_ENTRIES :
+# 			BEG = len(self.logs) - MAX_SIZE -1 
+# 			self.logs = self.logs[BEG:]
 
 

+ 17 - 0
src/utils/ml.py

@@ -46,6 +46,23 @@ class ML:
 	@staticmethod
 	@staticmethod
 	def CleanupName(value) :
 	def CleanupName(value) :
 		return value.replace('$','').replace('.+','')
 		return value.replace('$','').replace('.+','')
+	@staticmethod
+	def distribution(xo,lock) :
+		lock.acquire()
+		d = []
+		m = {}
+		for xi in xo :
+			value = round(xi,2)
+			id = str(value)
+			if id in m :
+				index = m[id]
+				d[index][1] += 1
+			else:
+				m[id] = len(d)
+				d.append([value,1])
+		lock.release()
+		del m
+		return d
 	
 	
 """
 """
 	Implements a multivariate anomaly detection
 	Implements a multivariate anomaly detection

+ 53 - 1
src/utils/workers.py

@@ -7,6 +7,16 @@ import time
 import monitor
 import monitor
 import sys
 import sys
 import os
 import os
+import datetime
+class BasicWorker(Thread):
+	def __init__(self,config,lock):
+		Thread.__init__(self)
+		self.reader_class	= config['store']['class']['read']
+		self.write_class	= config['store']['class']['write']
+		self.rw_args		= config['store']['args']
+		self.factory 		= DataSourceFactory()
+		self.lock 		= lock
+		
 """
 """
 	This class is intended to collect data given a configuration
 	This class is intended to collect data given a configuration
 
 
@@ -145,7 +155,48 @@ class Learner(Thread) :
 			time.sleep(TIME_ELLAPSED)
 			time.sleep(TIME_ELLAPSED)
 		print "Exiting ",self.name
 		print "Exiting ",self.name
 			
 			
-
+class FileWatchWorker(BasicWorker):
+	def __init__(self,config,lock):
+		BasicWorker.__init__(self,config,lock)
+		self.name = "Zulu-FileWatch"
+		self.config = config ;
+		self.folder_config = config['monitor']['folders']['config']
+		self.quit = False
+	def stop(self):
+		self.quit = True
+	def run(self):
+		TIME_ELAPSED = 60 * 10
+		handler = monitor.FileWatch()
+		while self.quit == False :
+			r = []
+			print ' ** ',self.name,datetime.datetime.today()
+			for id in self.folder_config :
+				folders = self.folder_config [id]
+				handler.init(folders)
+				xo = handler.composite()
+				#
+				# We should perform a distribution analysis of the details in order to have usable data
+				#
+				
+				xo_age = [row['age'] for row in xo[0]['details']]
+				xo_size= [row['size'] for row in xo[0]['details']]
+				xo[0]['details'] = {"age":ML.distribution(xo_age,self.lock),"size":ML.distribution(xo_size,self.lock)}
+				
+				#
+				# Now we can save the file
+				# 
+				self.lock.acquire()
+				writer = self.factory.instance(type=self.write_class,args=self.rw_args)
+				writer.write(label='folders',row=xo)
+				self.lock.release()
+			if 'MONITOR_CONFIG_PATH' in os.environ:
+				#
+				# This suggests we are in development mode
+				#
+				break
+			time.sleep(TIME_ELAPSED)
+		print 'Exiting ',self.name
+	
 """
 """
 	This class is a singleton designed to start quit dependent threads
 	This class is a singleton designed to start quit dependent threads
 		* monitor	is designed to act as a data collection agent
 		* monitor	is designed to act as a data collection agent
@@ -162,6 +213,7 @@ class ThreadManager:
 		lock = RLock()
 		lock = RLock()
 		ThreadManager.Pool['monitor'] = Top(config,lock)
 		ThreadManager.Pool['monitor'] = Top(config,lock)
 		ThreadManager.Pool['learner'] = Learner(config,lock)
 		ThreadManager.Pool['learner'] = Learner(config,lock)
+		ThreadManager.Pool['file-watch'] = FileWatchWorker(config,lock)
 		for id in ThreadManager.Pool :
 		for id in ThreadManager.Pool :
 			thread = ThreadManager.Pool[id]
 			thread = ThreadManager.Pool[id]
 			thread.start()
 			thread.start()

+ 4 - 3
test/TestServerMonitor.py

@@ -56,11 +56,12 @@ class TestMonitorServer(unittest.TestCase):
 		lock = Lock()
 		lock = Lock()
 		p = Learner(CONFIG,lock)
 		p = Learner(CONFIG,lock)
 		p.start()
 		p.start()
-	def test_JMX(self):
+	def test_FileWatch(self):
 		conf =CONFIG['monitor']['folder']
 		conf =CONFIG['monitor']['folder']
 		path =os.environ['FILE_PATH']
 		path =os.environ['FILE_PATH']
-		fw  = FileWatch(conf)
-		print fw.evaluate(path)
+		fw  = FileWatch()
+		fw.init([path])
+		print fw.composite()
 		
 		
 if __name__ == '__main__' :
 if __name__ == '__main__' :
 	unittest.main()
 	unittest.main()