Browse Source

Enabling preconditions to be effective (variance) @TODO: dimensionality reduction

Steve L. Nyemba 8 năm trước cách đây
mục cha
commit
0afbb1c072
3 tập tin đã thay đổi với 55 bổ sung21 xóa
  1. 35 16
      src/utils/ml.py
  2. 4 1
      src/utils/transport.py
  3. 16 4
      src/utils/workers.py

+ 35 - 16
src/utils/ml.py

@@ -16,7 +16,7 @@ class ML:
 		# @TODO: Make sure this approach works across all transport classes
 		# We may have a potential issue of how the data is stored ... it may not scale
 		#
-		
+		value = ML.CleanupName(value)
 		#return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value]
 		return [[item for item in row if item[attr] == value][0] for row in data]
 	@staticmethod
@@ -24,7 +24,9 @@ class ML:
 		if isinstance(lattr,basestring):
 			lattr = [lattr]
 		return [[row[id] for id in lattr] for row in data]
-
+	@staticmethod
+	def CleanupName(value) :
+		return value.replace('$','').replace('.+','')
 	
 """
 	Implements a multivariate anomaly detection
@@ -32,7 +34,7 @@ class ML:
 """
 class AnomalyDetection:
 		
-	def split(self,data,index=-1,threshold=0.9) :
+	def split(self,data,index=-1,threshold=0.65) :
 		N	= len(data)
 		# if N < LIMIT:
 		# 	return None
@@ -52,13 +54,13 @@ class AnomalyDetection:
 	@TODO: Map/Reduce does a good job at filtering
 	"""
 	def learn(self,data,key,value,features,label):
-		xo = ML.Filter(key,value,data)
 		
-		if not xo or len(xo) < 100:
-			return None
 		
-		#if len(xo) < 100 :
-			#return None
+		if len(data) < 10:
+			return None
+		xo = ML.Filter(key,value,data)
+		if len(xo) < 10 :
+			return None
 		# attr = conf['features']
 		# label= conf['label']
 		
@@ -69,9 +71,10 @@ class AnomalyDetection:
 		xo = self.split(xo)
 		yo = self.split(yo)
 		p = self.gParameters(xo['train'])
-		has_cov =  np.linalg.det(p['cov']) #-- making sure the matrix is invertible
+		has_cov =   np.linalg.det(p['cov']) if p else False #-- making sure the matrix is invertible
 		if xo['train'] and has_cov :
 			E = 0.001
+			ACCEPTABLE_FSCORE = 0.6
 			fscore = 0
 			#
 			# We need to find an appropriate epsilon for the predictions
@@ -94,22 +97,31 @@ class AnomalyDetection:
 				
 				
 				__operf__ = self.gPerformance(px,yo['test'])
-				print __operf__
+
 				if __operf__['fscore'] == 1 :
-					break
+					continue
 				if perf is None :
-					perf = __operf__['fscore']
-				elif perf['fscore'] < __perf__['fscore'] and __operf__['fscore']> 0.5 :
 					perf = __operf__
-				
+				elif perf['fscore'] < __operf__['fscore'] and __operf__['fscore'] > ACCEPTABLE_FSCORE :
+					perf = __operf__
 				perf['epsilon'] = Epsilon
+			#
+			# At this point we are assuming we came out of the whole thing with an acceptable performance
+			# The understanding is that error drives performance thus we reject fscore==1
+			#
 			
-			
-			if perf and perf['fscore'] > 0.5 :
+			if perf and perf['fscore'] > ACCEPTABLE_FSCORE :
 				return {"label":value,"parameters":p,"performance":perf}
 			else:
 				return None
 		return None
+	"""
+		This function determines if the preconditions for learning are met
+		For that parameters are passed to the function
+		p
+	"""
+	def canLearn(self,p) :
+		pass
 	def getLabel(self,yo,label_conf):
 		return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ]
 
@@ -188,8 +200,15 @@ class AnomalyDetection:
 			return None
 		r = np.array([ np.sqrt(np.var(m[i,:])) for i in range(0,n)])
 		#
+		# Before we normalize the data we must insure there's is some level of movement in this application
+		# A lack of movement suggests we may not bave enough information to do anything
+		#
+		if 0 in r :
+			return None
+		#
 		#-- Normalizing the matrix then we will compute covariance matrix
 		#
+		
 		m = np.array([ (m[i,:] - u[i])/r[i] for i in range(0,n)])
 		sigma = np.cov(m)
 		sigma = [ list(row) for row in sigma]

+ 4 - 1
src/utils/transport.py

@@ -521,7 +521,10 @@ class CouchdbWriter(Couchdb,Writer):
 	def __init__(self,**args):
 		uri 		= args['uri']
 		self.uid 	= args['uid']
-		self.filename 	= args['filename']
+		if 'filename' in args:
+			self.filename 	= args['filename']
+		else:
+			self.filename = None
 		dbname		= args['dbname']
 		self.server 	= Server(uri=uri) 
 		self.dbase	= self.server.get_db(dbname)

+ 16 - 4
src/utils/workers.py

@@ -1,6 +1,7 @@
 #import multiprocessing
 from threading import Thread, RLock
-from utils import transport
+#from utils import transport
+from utils.transport import *
 from utils.ml import AnomalyDetection,ML
 import time
 import monitor
@@ -17,7 +18,7 @@ class Top(Thread):
 		self.reader_class	= _config['store']['class']['read']
 		self.write_class	= _config['store']['class']['write']
 		self.rw_args		= _config['store']['args']
-		self.factory 		= transport.DataSourceFactory()
+		self.factory 		= DataSourceFactory()
 		
 		self.name = 'Zulu-Top'
 		self.quit = False
@@ -36,6 +37,7 @@ class Top(Thread):
 				apps = self.config[label]
 				self.handler.init(apps)	
 				r = self.handler.composite()
+				
 				gwriter.write(label=label,row=r)
 				time.sleep(5)
 				self.lock.release()
@@ -44,7 +46,7 @@ class Top(Thread):
 				# This suggests we are in development mode
 				#
 				break
-			ELLAPSED_TIME = 60*30
+			ELLAPSED_TIME = 60*20
 			time.sleep(ELLAPSED_TIME)
 		print "Exiting ",self.name
 				
@@ -64,7 +66,7 @@ class Learner(Thread) :
 		self.features 		= config['learner']['anomalies']['features']
 		self.yo			= config['learner']['anomalies']['label']
 		self.apps 		= config['learner']['anomalies']['apps']
-		self.factory 		= transport.DataSourceFactory()
+		self.factory 		= DataSourceFactory()
 		self.quit		= False
 	
 	def stop(self):
@@ -192,3 +194,13 @@ class Factory :
 		else:
 			return None
 
+
+if __name__ =='__main__' :
+	import utils.params as SYS_ARGS	
+	import json
+	PARAMS = SYS_ARGS.PARAMS
+	f = open(PARAMS['path'])
+	CONFIG 	= json.loads(f.read())
+	f.close()
+	ThreadManager.start(CONFIG)	
+