فهرست منبع

issues with data processing @TODO: consider streaming

Steve L. Nyemba 8 سال پیش
والد
کامیت
fd05da326d
6فایلهای تغییر یافته به همراه157 افزوده شده و 65 حذف شده
  1. 33 30
      src/api/index.py
  2. 6 4
      src/api/static/css/default.css
  3. 12 16
      src/api/static/js/dashboard.js
  4. 1 1
      src/api/templates/dashboard.html
  5. 80 12
      src/monitor.py
  6. 25 2
      test/TestServerMonitor.py

+ 33 - 30
src/api/index.py

@@ -16,6 +16,7 @@ import os
 import json
 import re
 import monitor
+import Queue
 PARAMS  = {'context':''}
 if len(sys.argv) > 1:
 	
@@ -37,52 +38,54 @@ app = Flask(__name__)
 f = open(PARAMS['path'])
 CONFIG 	= json.loads(f.read())
 HANDLERS= {}
+
 for key in CONFIG :
-	
+	if key == "monitor":
+		continue
 	className = CONFIG[key]['class']
 	ref	= "".join(["monitor.",className,"()"])
 	ref 	=  eval(ref)
+	#ref.init(CONFIG[key]['config'])
+
+
 	HANDLERS[key] = {"class":ref,"config":CONFIG[key]["config"]}
 
 f.close()
-"""
-	This function determines the status of a given observation as follows
-	considering:
-	x	memory used
-	y	cpu used
-	z	memory allocated
-	
-	x	y	z
-	0	0	0	crash
-	0	0	1	idle
-	1	0	1	idle
-	1	1	1	running
-	
-	This classification is known and we will not write a learner for this. The implementation will account for relationships such as assuming if memory is allocated and cpu is used chances the application is running because there will be memory used otherwise idle
-	
-"""
 
-	
+#
+#
+from threading import Timer,Thread
+ProcessQueue = Queue.LifoQueue()
+mthread = monitor.Monitor(HANDLERS,ProcessQueue,'processes')
+mthread.start()
+#(Timer(10,mthread.run)).start()
+#mthread = Process(target=monitor.Monitor,args=(HANDLERS,ProcessQueue,'processes'))
+#mthread.start()
+
 @app.route('/get/<id>')
-def procs(id):
-	if id in HANDLERS:
-		handler = HANDLERS[id]["class"]
-		
-		conf = HANDLERS[id]["config"]
-		r = {}
-		for key in conf:
-			
-			handler.init(conf[key])
-			r[key] = handler.composite()
+def procs(id):	
+	if id in HANDLERS and len(mthread.logs)>0:
+		r = ProcessQueue.get(block=True,timeout=15)
+		index = len(mthread.logs) -1
+		r = mthread.logs[index]
 		return json.dumps(r)
 	else:
 		return "[]"
 	pass
-
+@app.route('/trends') 
+def trends ():
+	id = request.args.get('id')
+	handler = monitor.mapreducer()
+	r = handler.filter(id,mthread.logs)
+	r = handler.run(r,handler.mapper,None)
+	print [" **** ",len(r)]
+	return json.dumps(r)
 @app.route('/dashboard')
 def dashboard():
 	context = PARAMS['context']
 	return render_template('dashboard.html',context=context)
 if __name__== '__main__':
 	app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX=?RT'
-	app.run(host='0.0.0.0',debug=True,threaded=True)
+	app.run(host='0.0.0.0',debug=True,threaded=True)
+
+	

+ 6 - 4
src/api/static/css/default.css

@@ -19,14 +19,16 @@ body {
 	font-family:sans-serif;
 	
 }
+.no-border{ border:1px solid transparent}
 .border { border:1px solid #CAD5E0}
-.border-bottom{
-	border-bottom:1px solid #CAD5E0;
-}
+.border-bottom{	border-bottom:1px solid #CAD5E0;}
+.border-right { border-right:1px solid #CAD5E0;}
+.border-left { border-left:1px solid #CAD5E0;}
+.border-top { border-top:1px solid #CAD5E0;}
 .grid {
 	font-family:helvetica;
 	font-weight:lighter;
-	width:47%;
+	
 	margin:4px;
 	padding:4px;
 }

+ 12 - 16
src/api/static/js/dashboard.js

@@ -29,21 +29,14 @@ monitor.processes.init = function(x){
 	})
 }
 monitor.processes.render = function(label,data) {
-	var status = {"idle":'<i class="fa fa-ellipsis-h" title="IDLE"></i>',"running":'<i class="fa fa-check" title="RUNNING"></i>',"crash":'<i class="fa fa-times" title="CRASHED"></i>'}
+	
 	data = jx.utils.patterns.visitor(data,function(row){
-		
-		row.status = status[row.status]
-// 		var m = row.memory_usage >0
-// 		var c = row.cpu_usage > 0
-// 		var r = row.memory_available > 0
-// 		if ( r && c && m) {
-// 			row.status = status['running']
-// 		}else if (r && (!m || !c) ){
-// 			row.status = status['idle']
-// 		}else if (!r){
-// 			row.status = status['crash']
-// 		}
-		
+		var status = {"idle":'<i class="fa fa-ellipsis-h" title="IDLE"></i>',"running":'<i class="fa fa-check" title="RUNNING"></i>',"crash":'<i class="fa fa-times" title="CRASHED"></i>'}	
+		if (!row.status.match(/class/)) {
+			row.status_id = row.status
+			row.status = status[row.status]		
+			
+		}
 		return row
 	})
 	jx.dom.set.value('latest_processes','') ;
@@ -51,9 +44,12 @@ monitor.processes.render = function(label,data) {
 	var options = {}
 	
 	options.data = data
-	//options.rowClass = function(item,index){return 'small'}
+	options.rowClass = function (item, index,evt) {
+		
+		return 'small'
+	}
 	options.autoload  = true
-	options.fields = [{name:'label',type:'text',title:"Process",headercss:"small bold"},{name:"cpu_usage",type:"number",title:"CPU", headercss:"small bold"},{name:"memory_usage",type:"text",title:"Mem. Used",type:"number",headercss:"small bold"},{name:"memory_available",type:"number",title:"Mem. Avail",headercss:"small bold"},
+	options.fields = [{name:'label',type:'text',title:"Process",headercss:"small bold",css:"small"},{name:"cpu_usage",type:"number",title:"CPU", headercss:"small bold"},{name:"memory_usage",type:"text",title:"Mem. Used",type:"number",headercss:"small bold"},{name:"memory_available",type:"number",title:"Mem. Avail",headercss:"small bold"},
 	{name:"status",type:"text",title:"Status",headercss:"small bold",align:"center"}
 	]
 	var grid = $('#latest_processes').jsGrid(options) ;

+ 1 - 1
src/api/templates/dashboard.html

@@ -33,7 +33,7 @@
 	<div class="left height-quarter">
 		<div class="grid">
 			<div class="small" style="text-transform:capitalize">Monitoring <span id="latest_processes_label"></span></div>
-			<div id="latest_processes"></div>
+			<div id="latest_processes" class="small"></div>
 		</div>
 	</div>	
 	

+ 80 - 12
src/monitor.py

@@ -12,8 +12,8 @@ from sets import Set
 import re
 import datetime
 import Queue
-
-
+from threading import Thread
+import time
 class Analysis:
 	def __init__(self):
 		self.logs = []
@@ -96,7 +96,8 @@ class Sandbox(Analysis):
 		n = len(Set(required_modules) - Set(sandbox_modules))
 		value = 1 - (n/N)
 		missing = list(Set(required_modules) - Set(sandbox_modules))
-		return {"value":value,"missing":missing}
+		
+		return dict(self.now,**{"value":value,"missing":missing})
 
 """
 	This class performs the analysis of a list of processes and determines
@@ -121,7 +122,7 @@ class ProcessCounter(Analysis):
 		#N = len(r)
 		#n = sum(r)
 		#return n/N
-		return r
+		return dict(self.now,**r)
 """
 	This class returns an application's both memory and cpu usage
 """
@@ -170,7 +171,7 @@ class DetailProcess(Analysis):
 		r= {"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"label":row[3]}
 		status = self.status(r)
 		r['status'] = status
-		return r
+		return dict(self.now,**r)
 	def composite(self):
 		#value = self.evaluate(self.name)
 		#row= {"memory_usage":value[0],"cpu_usage":value[1]}
@@ -178,17 +179,84 @@ class DetailProcess(Analysis):
 		#ma = [self.evaluate(name) for name in self.names]
 		ma = []
 		for name in self.names:
+			
 			matrix = self.evaluate(name)
+			
 			ma += [self.format(row) for row in matrix]
 			
 		#return [{"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"label":row[3]} for row in ma]
 		
 		return ma
 
-class QListener(Thread)
-	def __init__(self,handlers):
-		self.handlers = handlers
-		self.queue = Queue.LifoQueue()
-	def post(self) :
-		for handler in self.handlers:
-			self.queue.put(handler.)
+class Monitor (Thread):
+	def __init__(self,pConfig,pQueue,id='processes') :
+		Thread.__init__(self)
+		
+		self.config 		= pConfig[id]
+		self.queue	= pQueue;
+		self.logs	= []
+		self.handler = self.config['class']
+		self.mconfig = self.config['config']
+	def run(self):
+		r = {}
+		while True:
+			for label in self.mconfig:
+				
+					
+					self.handler.init(self.mconfig[label])
+					r[label] = self.handler.composite()
+			self.logs.append(r)
+			self.queue.put(r)
+			self.prune()
+			self.queue.task_done()
+			
+			time.sleep(10)
+	def prune(self) :
+		MAX_ENTRIES = 1000
+		if len(self.logs) > MAX_ENTRIES :
+			BEG = len(self.logs) - MAX_SIZE -1 
+			self.logs = self.logs[BEG:]
+
+class mapreducer:
+	def __init__(self):
+		self.store = {}
+	def filter (self,key,dataset):
+		return [row[key] for row in dataset if key in row]
+	def run(self,dataset,mapper,reducer):
+		r = None
+		if mapper is not None:
+			if isinstance(dataset,list) :
+				[mapper(row,self.emit) for row in dataset]
+			
+			
+		if reducer is not None:
+			r = [reducer(self.store[key]) for key in self.store]
+		else:
+			r = self.store
+		return r
+	def mapper(self,row,emit):
+		[emit(item['label'],item) for item in row ]
+	def reducer(self,values):
+		return value			
+
+	def emit(self,key,content):
+		if key not in self.store:
+			self.store[key] = []
+		self.store[key].append(content)
+# 	#
+# 	# We need to generate the appropriate dataset here
+# 	# map/reduce is a well documented technique for generating datasets
+# 	#
+# 	def map(self,key,id,rows):
+		
+# 		#r =  [row[key] for row in rows if key in row]
+# 		for row in rows:
+# 			if key in row :
+# 				for xr in row[key]:
+# 					self.emit(xr['label'],xr)
+		
+		
+
+# def reduce(keys,values):
+# 	print values[0]
+# 	return r

+ 25 - 2
test/TestServerMonitor.py

@@ -1,6 +1,7 @@
 from __future__ import division
 import unittest
 from monitor import Env, DetailProcess, ProcessCounter, Sandbox
+import monitor
 import os
 class TestMonitorServer(unittest.TestCase):
 	
@@ -19,7 +20,6 @@ class TestMonitorServer(unittest.TestCase):
 		p = DetailProcess()
 		p.init(['rabbitmq-server','python','apache2'])
 		r = p.composite()
-		print r
 		self.assertTrue(r)
 	
 	def test_ProcessCount(self):
@@ -34,6 +34,29 @@ class TestMonitorServer(unittest.TestCase):
 		sandbox_path	= os.sep.join([os.environ['PYTHONPATH'],"..",'sandbox'])
 		p	= Sandbox()
 		p.init({"sandbox":sandbox_path,"requirements":requirements_path})
-		print p.composite()
+		p.composite()
+	def test_map(self):
+		p = DetailProcess()
+		p.init(['rabbitmq-server','python','apache2'])
+		r ={"test": p.composite()}
+		logs = [r,{"x-test":p.composite()}]
+		key = "test"
+		id = "memory_usage"
+		
+		def mapper(row,emit):
+			[emit(item['label'],item) for item in row ]
+		def reducer(values):
+			end = len(values)-1
+			beg = 0 if end < 100 else end - 100
+			return values[beg:]
+				
+			
+		#def reducer(values):
+		
+		mrh = monitor.mapreducer()
+		logs = mrh.filter('test',logs)
+		
+		print mrh.run(logs,mapper,None)
+		
 if __name__ == '__main__' :
 	unittest.main()