Переглянути джерело

Bug fix with learner, @TODO: Determine epsilon i.e the right one to get good f-score

steve 8 роки тому
батько
коміт
54744253ca
3 змінених файлів з 55 додано та 21 видалено
  1. 22 13
      src/utils/ml.py
  2. 23 6
      src/utils/workers.py
  3. 10 2
      test/TestServerMonitor.py

+ 22 - 13
src/utils/ml.py

@@ -16,7 +16,7 @@ class ML:
 		# @TODO: Make sure this approach works across all transport classes
 		# We may have a potential issue of how the data is stored ... it may not scale
 		#
-		return [item[0] for item in data if item[0][attr] == value]
+		return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value]
 	@staticmethod
 	def Extract(lattr,data):
 		return [[row[id] for id in lattr] for row in data]
@@ -27,7 +27,7 @@ class ML:
 	@TODO: determine computationally determine epsilon
 """
 class AnomalyDetection:
-	def split(self,data,index=-1,threshold=0.7) :
+	def split(self,data,index=-1,threshold=0.8) :
 		N	= len(data)
 		# if N < LIMIT:
 		# 	return None
@@ -47,10 +47,15 @@ class AnomalyDetection:
 	"""
 	def learn(self,data,key,value,features,label):
 		xo = ML.Filter(key,value,data)
-		if len(xo) < 100 :
+		
+		if not xo :
 			return None
+		
+		#if len(xo) < 100 :
+			#return None
 		# attr = conf['features']
 		# label= conf['label']
+		
 		yo= ML.Extract([label['name']],xo)
 		xo = ML.Extract(features,xo)
 		yo = self.getLabel(yo,label)
@@ -58,12 +63,14 @@ class AnomalyDetection:
 		xo = self.split(xo)
 		yo = self.split(yo)
 
-		p = self.gParameters(xo['train'])
-		
-		px =  self.gPx(p['mean'],p['cov'],xo['test'])
-		
-		perf = self.gPerformance(px,yo['test'])
-		return {"parameters":p,"performance":perf}
+		if xo['train'] :
+			p = self.gParameters(xo['train'])
+			
+			px =  self.gPx(p['mean'],p['cov'],xo['test'])
+			
+			perf = self.gPerformance(px,yo['test'])
+			return {"parameters":p,"performance":perf}
+		return None
 	def getLabel(self,yo,label_conf):
 		return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ]
 
@@ -72,7 +79,7 @@ class AnomalyDetection:
 		This function will compute the probability density function given a particular event/set of events
 		@pre xu.shape[0] == sigma[0] == sigma[1]
 	"""
-	def gPx(self,xu,sigma,data,EPSILON=0.05):
+	def gPx(self,xu,sigma,data,EPSILON=0.25):
 		n = len(data[0])
 		
 		r = []
@@ -84,6 +91,7 @@ class AnomalyDetection:
 			d = np.matrix(row - xu)
 			d.shape = (n,1)
 			b = np.exp((-0.5*np.transpose(d)) * (np.linalg.inv(sigma)*d))
+			
 			px = float(b/a)
 			r.append([px,int(px < EPSILON)])
 		return r
@@ -103,8 +111,8 @@ class AnomalyDetection:
 			fp += 1 if (test[i][1] != labels[i] and test[i][1] == 1) else 0
 			fn += 1 if (test[i][1] != labels[i] and test[i][1] == 0) else 0
 			tn += 1 if (test[i][1] == labels[i] and test[i][1] == 0) else 0
-		precision = tp / (tp + fp)
-		recall	= tp / (tp + fn)
+		precision = tp / (tp + fp) if tp + fp > 0 else 1
+		recall	= tp / (tp + fn) if tp  + fp > 0 else 1
 		fscore 	= (2 * precision * recall)/ (precision + recall)
 		return {"precision":precision,"recall":recall,"fscore":fscore}
 
@@ -124,4 +132,5 @@ class AnomalyDetection:
 		#
 		m = np.array([ (m[i,:] - u[i])/r[i] for i in range(0,n)])
 		sigma = np.cov(m)
-		return {"cov":sigma,"mean":u}
+		sigma = [ list(row) for row in sigma]
+		return {"cov":sigma,"mean":list(u)}

+ 23 - 6
src/utils/workers.py

@@ -1,6 +1,7 @@
 #import multiprocessing
 from threading import Thread, Lock
 from utils import transport
+from utils.ml import AnomalyDetection
 import time
 import monitor
 import sys
@@ -78,17 +79,33 @@ class Learner(Thread) :
 			r = {}
 			for key in data :
 				logs = data[key]
-				r[key] = {}
+				
 				for app in self.apps:
+					
 					handler = AnomalyDetection()
-					r[key][app] = lhandler.learn(data,'label',app,self.features,self.yo)
+					value = handler.learn(logs,'label',app,self.features,self.yo)
+					
+					if value is not None:
+						print value
+						if key not in r:
+							r[key] = {}
+						r[key][app] = value
 			#
 			# At this point we've already learnt every thing we need to learn
 			#
-			self.lock.aquire()
-			writer = sef.factory.instance(type.self.write_class,args=self.rw_args)
-			writer.write('learn',r)
-			self.lock.release()
+			
+			if r.keys() :
+				
+				self.lock.acquire()
+				writer = self.factory.instance(type=self.write_class,args=self.rw_args)
+				writer.write(label='learn',row=r)
+				self.lock.release()
+
+			if 'MONITOR_CONFIG_PATH' in os.environ:
+				#
+				# This suggests we are in development mode
+				#
+				break
 
 			TIME_ELLAPSED = 60*120	#-- Every 2 hours
 			time.sleep(TIME_ELLAPSED)

+ 10 - 2
test/TestServerMonitor.py

@@ -5,7 +5,8 @@ import monitor
 import os
 import json
 from utils.workers import Top, Learner
-from multiprocessing import Lock
+#from multiprocessing import Lock
+from threading import Lock
 path = os.environ['MONITOR_CONFIG_PATH']
 f = open(path)
 CONFIG = json.loads( f.read())
@@ -46,7 +47,14 @@ class TestMonitorServer(unittest.TestCase):
 	def test_StartTop(self):		
 		lock = Lock()
 		p = Top(CONFIG,lock)		
+		#p.start()
+		
+		#p.join()
+	def test_StartLearner(self):
+		lock = Lock()
+		p = Learner(CONFIG,lock)
 		p.start()
-		p.join()
+		
+		
 if __name__ == '__main__' :
 	unittest.main()