瀏覽代碼

bug fix and reworked threading model

steve 8 年之前
父節點
當前提交
272079214e
共有 2 個文件被更改,包括 85 次插入26 次删除
  1. 24 15
      src/api/index.py
  2. 61 11
      src/utils/workers.py

+ 24 - 15
src/api/index.py

@@ -22,7 +22,8 @@ import re
 import monitor
 import Queue
 from utils.transport import *
-
+from utils.workers import ThreadManager, Factory
+import atexit
 PARAMS  = {'context':''}
 if len(sys.argv) > 1:
 	
@@ -47,34 +48,36 @@ app.config['SECRET_KEY'] = '!h8-[0v8]247-4-360'
 
 f = open(PARAMS['path'])
 CONFIG 	= json.loads(f.read())
-HANDLERS= {}
+#HANDLERS= {}
 
-for key in CONFIG['monitor'] :
+#for key in CONFIG['monitor'] :
 	
-	className = CONFIG['monitor'][key]['class']
-	ref	= "".join(["monitor.",className,"()"])
-	ref 	=  eval(ref)
-	HANDLERS[key] = {"class":ref,"config":CONFIG['monitor'][key]["config"]}
+	#className = CONFIG['monitor'][key]['class']
+	#ref	= "".join(["monitor.",className,"()"])
+	#ref 	=  eval(ref)
+	#HANDLERS[key] = {"class":ref,"config":CONFIG['monitor'][key]["config"]}
 
 f.close()
 
 #
 #
-from threading import Thread, RLock
+#from threading import Thread, RLock
 p = CONFIG['store']['args']
 class_read = CONFIG['store']['class']['read']
 class_write= CONFIG['store']['class']['write']
 factory = DataSourceFactory()
 #gWriter = factory.instance(type='CouchdbWritera',args=p)
 #gReader = factory.instance(type='CouchdbReader',args=p)
-p['qid'] = HANDLERS['processes']['config'].keys()
+#p['qid'] = HANDLERS['processes']['config'].keys()
 gReader = factory.instance(type=class_read,args=p)
-gWriter = factory.instance(type=class_write,args=p)
-mthread = monitor.Monitor(HANDLERS,gWriter,'processes',)
+#gWriter = factory.instance(type=class_write,args=p)
+#mthread = monitor.Monitor(HANDLERS,gWriter,'processes',)
 
+atexit.register(ThreadManager.stop)
 @app.route('/get/<id>')
 def procs(id):	
 	try:
+		gReader = factory.instance(type=class_read,args=p)
 		d =  gReader.read()
 		
 		r = {}
@@ -93,10 +96,16 @@ def procs(id):
 """
 @app.route('/sandbox')
 def sandbox():
-	if 'sandbox' in HANDLERS:
-		handler = HANDLERS['sandbox']['class']
-		conf = HANDLERS['sandbox']['config']
+	global CONFIG
+	print CONFIG['monitor']
+	if 'sandbox' in CONFIG['monitor']:
+		#handler = HANDLERS['sandbox']['class']
+		#conf = HANDLERS['sandbox']['config']
 		r = []
+		p = Factory.instance('sandbox',CONFIG)
+		handler = p['class']
+		conf	= p['config']
+		
 		for id in conf:
 			handler.init(conf[id])
 			r.append (dict(handler.composite(),**{"label":id}))
@@ -171,7 +180,7 @@ def anomalies_get():
 	
 if __name__== '__main__':
 	
-	mthread.start()
+	ThreadManager.start(CONFIG)	
 	app.run(host='0.0.0.0',debug=True,threaded=True)
 
 	

+ 61 - 11
src/utils/workers.py

@@ -1,5 +1,5 @@
 #import multiprocessing
-from threading import Thread, Lock
+from threading import Thread, RLock
 from utils import transport
 from utils.ml import AnomalyDetection
 import time
@@ -56,15 +56,19 @@ class Learner(Thread) :
 	"""
 	def __init__(self,config,lock):
 		Thread.__init__(self)
-		self.name='Zulu-Learner'
-		self.lock = lock
+		self.name		= 'Zulu-Learner'
+		self.lock 		= lock
 		self.reader_class	= config['store']['class']['read']
 		self.write_class	= config['store']['class']['write']
-		self.rw_args	= config['store']['args']
-		self.features = config['learner']['anomalies']['features']
-		self.yo	= config['learner']['anomalies']['label']
-		self.apps = config['learner']['anomalies']['apps']
+		self.rw_args		= config['store']['args']
+		self.features 		= config['learner']['anomalies']['features']
+		self.yo			= config['learner']['anomalies']['label']
+		self.apps 		= config['learner']['anomalies']['apps']
 		self.factory 		= transport.DataSourceFactory()
+		self.quit		= False
+	
+	def stop(self):
+		self.quit = True
 	"""
 		This function will initiate learning every (x-hour)
 		If there is nothing to learn the app will simply go to sleep
@@ -75,7 +79,7 @@ class Learner(Thread) :
 		#
 		# This is the motherload of innefficiency ...
 		#
-		while True:
+		while self.quit == False:
 			r = {}
 			for key in data :
 				logs = data[key]
@@ -86,7 +90,7 @@ class Learner(Thread) :
 					value = handler.learn(logs,'label',app,self.features,self.yo)
 					
 					if value is not None:
-						print value
+						
 						if key not in r:
 							r[key] = {}
 						r[key][app] = value
@@ -109,7 +113,53 @@ class Learner(Thread) :
 
 			TIME_ELLAPSED = 60*120	#-- Every 2 hours
 			time.sleep(TIME_ELLAPSED)
+		print "Exiting ",self.name
 			
 
-
-
+"""
+	This class is a singleton designed to start quit dependent threads
+		* monitor	is designed to act as a data collection agent
+		* learner	is designed to be a learner i.e machine learning model(s)
+	@TODO: 
+		- How to move them to processes that can be read by the os (that would allow us to eat our own dog-food)
+		- Additionally we also need to have a pruning thread, to control the volume of data we have to deal with.This instills the "will to live" in the application
+"""
+class ThreadManager:
+		
+	Pool = {}
+	@staticmethod
+	def start(config):
+		lock = RLock()
+		ThreadManager.Pool['monitor'] = Top(config,lock)
+		ThreadManager.Pool['learner'] = Learner(config,lock)
+		for id in ThreadManager.Pool :
+			thread = ThreadManager.Pool[id]
+			thread.start()
+	@staticmethod
+	def stop():
+		for id in ThreadManager.Pool :
+			thread = Pool[id]
+			thread.stop()
+	@staticmethod
+	def status():
+		r = {}
+		for id in ThreadManager.Pool :
+			thread = ThreadManager.Pool[id]
+			r[id] = thread.isAlive()
+			
+		
+		
+class Factory :
+	"""
+		This function will return an instance of an object in the specified in the configuration file
+	"""
+	@staticmethod
+	def instance(id,config):
+		if id in config['monitor'] :
+			className 	= config['monitor'][id]['class']
+			ref		= "".join(["monitor.",className,"()"])
+			ref 	=  eval(ref)
+			return {"class":ref,"config":config['monitor'][id]["config"]}
+		else:
+			return None
+