Browse Source

using couchdb as store

Steve L. Nyemba 8 years ago
parent
commit
f2e36701d8
3 changed files with 47 additions and 31 deletions
  1. 8 4
      requirements.txt
  2. 22 10
      src/api/index.py
  3. 17 17
      src/monitor.py

+ 8 - 4
requirements.txt

@@ -1,9 +1,13 @@
+click==6.6
+couchdbkit==0.6.5
 Flask==0.11.1
 Flask-Session==0.3.0
+http-parser==0.8.3
+itsdangerous==0.24
 Jinja2==2.8
 MarkupSafe==0.23
+numpy==1.11.3
+pika==0.10.0
+restkit==4.2.2
+socketpool==0.5.3
 Werkzeug==0.11.11
-argparse==1.2.1
-click==6.6
-itsdangerous==0.24
-wsgiref==0.1.2

+ 22 - 10
src/api/index.py

@@ -10,13 +10,14 @@
 from flask import Flask, session, request, redirect, Response
 from flask.templating import render_template
 from flask_session import Session
-
+import time
 import sys
 import os
 import json
 import re
 import monitor
 import Queue
+from utils.transport import *
 PARAMS  = {'context':''}
 if len(sys.argv) > 1:
 	
@@ -54,10 +55,14 @@ f.close()
 
 #
 #
-from threading import Timer,Thread
-ProcessQueue = Queue.LifoQueue()
-mthread = monitor.Monitor(HANDLERS,ProcessQueue,'processes')
+from threading import Thread, RLock
+p  = {'uri':'http://dev.the-phi.com:5984','dbname':'monitor','uid':'logs','filename':'logs.JSON'}
+factory = DataSourceFactory()
+gWriter = factory.instance(type='CouchdbWriter',args=p)
+gReader = factory.instance(type='CouchdbReader',args=p)
+mthread = monitor.Monitor(HANDLERS,gWriter,'processes',)
 mthread.start()
+
 #(Timer(10,mthread.run)).start()
 #mthread = Process(target=monitor.Monitor,args=(HANDLERS,ProcessQueue,'processes'))
 #mthread.start()
@@ -76,14 +81,21 @@ def procs(id):
 def trends ():
 	id = request.args.get('id')
 	# key = request.args.get('key')
+	global mthread
+	# mLock.acquire()
+	
+	time.sleep(2)	
+	doc = gReader.read()
+	doc['row']
 	handler = monitor.mapreducer()
-	r = handler.filter(id,mthread.logs)
-	print r
-	if 'kate' in r:
-		for item in r['kate']:
-			print item['hour'],item['minute']
+	r = handler.filter(id,logs)
 	r = handler.run(r,handler.mapper,handler.reducer)
-			
+	# mLock.release()
+	if 'Google Chrome' in r:
+		for item in r['Google Chrome']:
+			print item['hour'],item['minute']
+	
+
 	return json.dumps(r)
 @app.route('/dashboard')
 def dashboard():

+ 17 - 17
src/monitor.py

@@ -201,35 +201,35 @@ class DetailProcess(Analysis):
 		return ma
 
 class Monitor (Thread):
-	def __init__(self,pConfig,pQueue,id='processes') :
+	def __init__(self,pConfig,pWriter,id='processes') :
 		Thread.__init__(self)
 		
-		self.config 		= pConfig[id]
-		self.queue	= pQueue;
+		self.config 	= pConfig[id]
+		self.writer	= pWriter;
 		self.logs	= []
 		self.handler = self.config['class']
 		self.mconfig = self.config['config']
-		self.lock = RLock()
+		
+	def stop(self):
+		self.keep_running = False
 	def run(self):
 		r = {}
-		
-		while True:
+		self.keep_running = True
+		lock = RLock()
+		while self.keep_running:
+			
 			for label in self.mconfig:
-				
-					
-					self.handler.init(self.mconfig[label])
-					r[label] = self.handler.composite()
-					
-			self.queue.put(r)
-			#self.logs.append(r)
+				lock.acquire()
+				self.handler.init(self.mconfig[label])
+				r = self.handler.composite()
+				self.writer.write(label=label,row = r)
+				lock.release()		
 			
-			self.prune()
 			
-			self.queue.task_done()
-			self.logs.append(self.queue.get(block=False))
+			self.prune()
 			HALF_HOUR = 60*1
 			time.sleep(HALF_HOUR)
-		
+		print "Stopped ..."
 	def prune(self) :
 		MAX_ENTRIES = 100
 		if len(self.logs) > MAX_ENTRIES :