Pārlūkot izejas kodu

refactoring learning and enabling learning api

Steve L. Nyemba 8 gadi atpakaļ
vecāks
revīzija
cbed6dbb70
3 mainītis faili ar 52 papildinājumiem un 30 dzēšanām
  1. 38 23
      src/api/index.py
  2. 1 1
      src/utils/ml.py
  3. 13 6
      src/utils/transport.py

+ 38 - 23
src/api/index.py

@@ -43,7 +43,7 @@ p = CONFIG['store']['args']
 class_read = CONFIG['store']['class']['read']
 class_write= CONFIG['store']['class']['write']
 factory = DataSourceFactory()
-gReader = factory.instance(type=class_read,args=p)
+# gReader = factory.instance(type=class_read,args=p)
 
 atexit.register(ThreadManager.stop)
 @app.route('/get/<id>')
@@ -99,8 +99,7 @@ def trends ():
 	class_read = CONFIG['store']['class']['read']
 
 	
-	gReader = factory.instance(type=class_read,args=p)
-	
+	gReader = factory.instance(type=class_read,args=p)	
 	r = gReader.read()
 	if id in r:
 		r = r[id] #--matrix
@@ -131,15 +130,17 @@ def dashboard():
 	This function is designed to trigger learning for anomaly detection
 	@TODO: forward this to a socket i.e non-blocking socket
 """
-@app.route('/learn')
+@app.route('/anomalies/get')
 def learn():
 	global CONFIG
 	p = CONFIG['store']['args']
 	class_read = CONFIG['store']['class']['read']	
 	gReader = factory.instance(type=class_read,args=p)
 	d =  gReader.read()
+	
 	if 'learn' in d :
 		info = d['learn']
+		
 		del d['learn']
 	else :
 		info = []
@@ -147,27 +148,45 @@ def learn():
 	if 'id' in request.args:
 		id = request.args['id']
 		d = d[id]
-		apps =  CONFIG['monitor']['processes']['config'][id]
-		#print (apps)
-		
 		params = {}
 		for item in info:
-			id = item['label']
-			params[id] = item
+			
+			label = item['label']
+			params[label] = item
+		
 		#apps = list(set(ML.Extract(['label'],d)))
-		p = AnomalyDetection()
-		for name in apps :
-			xo = ML.Filter('label',name,d)		
-			_info  = params[name]
-			#info = ML.Filter('label',app,logs)	
-			value = p.predict(xo,_info)
-			print [row[1] for row in value]
-			break
+		r = []
+		if params :
+			#
+			# If we have parameters available 
+			p = AnomalyDetection()
+			apps = params.keys()			
+			for name in apps :
+				if name not in params:
+					continue
+				_info	= params[name]		
+				try:
+					xo 	= ML.Filter('label',name,d)	
+				except Exception,e:
+					xo = []
+					#print name,e
+				if len(xo) == 0:					
+					continue	
+				xo 	= [xo[ len(xo) -1]]
+				
+				value	= p.predict(xo,_info)[0]
+				
+				if len(value):
+					report = dict(_info,**{'predicton':value})
+					r.append(report)
+				
+				
+				
 			#print app,value
 			#if value is not None:
 			#	r.append(value)
-	print r
-	return json.dumps([])
+	
+	return json.dumps(r)
 		
 
 
@@ -175,10 +194,6 @@ def learn():
 def anomalies_status():
 	pass
 
-@app.route('/anomalies/get')
-def anomalies_get():
-	pass
-	
 if __name__== '__main__':
 	
 	#ThreadManager.start(CONFIG)	

+ 1 - 1
src/utils/ml.py

@@ -116,7 +116,7 @@ class AnomalyDetection:
 		@param info	stored information about this	
 	"""
 	def predict(self,xo,info):
-		
+			
 		xo = ML.Extract(info['features'],xo)
 		
 		if not xo :

+ 13 - 6
src/utils/transport.py

@@ -246,12 +246,12 @@ class MessageQueue:
 		self.qid = params['qid']
 	
 	def isready(self):
-		self.init()
+		#self.init()
 		resp =  self.connection is not None and self.connection.is_open
 		self.close()
 		return resp
 	def close(self):
-			
+		print "closing ..."
 		self.channel.close()
 		self.connection.close()
 """
@@ -463,7 +463,10 @@ class CouchdbReader(Couchdb,Reader):
 		#
 		# setting the basic parameters for 
 		Couchdb.__init__(self,**args)
-		self.filename 	= args['filename']
+		if 'filename' in args :
+			self.filename 	= args['filename']
+		else:
+			self.filename = None
 
 	def isready(self):
 		#
@@ -487,8 +490,7 @@ class CouchdbReader(Couchdb,Reader):
 			r = False
 		
 		return r	
-
-	def read(self,size=-1):
+	def stream(self):
 		content = self.dbase.fetch_attachment(self.uid,self.filename).split('\n') ;
 		i = 1
 		for row in content:
@@ -496,7 +498,12 @@ class CouchdbReader(Couchdb,Reader):
 			if size > 0 and i == size:
 				break
 			i = i + 1
-	
+		
+	def read(self,size=-1):
+		if self.filename is not None:
+			self.stream()
+		else:
+			return self.basic_read()
 	def basic_read(self):
 		document = self.dbase.get(self.uid) 
 		del document['_id'], document['_rev']