Browse Source

Improved transport with flush, archive functions. Put Regression on hold

Steve L. Nyemba 8 years ago
parent
commit
14bfd8ce3f
4 changed files with 41 additions and 6 deletions
  1. 1 1
      src/utils/agents/data-collector.py
  2. 15 2
      src/utils/agents/learner.py
  3. 6 1
      src/utils/ml.py
  4. 19 2
      src/utils/transport.py

+ 1 - 1
src/utils/agents/data-collector.py

@@ -85,7 +85,7 @@ class ICollector(Thread) :
 					row = data
 				self.lock.acquire()
 				store = self.factory.instance(type=write_class,args=read_args)
-				
+				store.flush(size=200)
 				store.write(label=label,row=row)
 				self.lock.release()
 			if 'MONITOR_CONFIG_PATH' in os.environ :

+ 15 - 2
src/utils/agents/learner.py

@@ -3,6 +3,7 @@
 """
 from __future__ import division
 import numpy as np
+from sklearn import linear_model
 from threading import Thread,RLock
 from utils.transport import *
 from utils.ml import AnomalyDetection,ML
@@ -13,10 +14,17 @@ class BaseLearner(Thread):
 		Thread.__init__(self)
 		path = PARAMS['path']
 		self.name = self.__class__.__name__.lower()
+		self.rclass= None
+		self.wclass= None
+		self.rw_args=None
 		if os.path.exists(path) :
 			f = open(path)
 			self.config = json.loads(f.read())
 			f.close()
+			self.rclass	= self.config['store']['class']['read']
+			self.wclass	= self.config['store']['class']['write']		
+			self.rw_args	= self.config['store']['args']
+
 		else:
 			self.config = None
 		self.lock = lock
@@ -98,7 +106,7 @@ class Anomalies(BaseLearner) :
 """		
 class Regression(BaseLearner):
 	def __init__(self,lock):
-		BaseLearner.__init__(self)
+		BaseLearner.__init__(self,lock)
 		self.folders 	= self.config['folders']
 		self.id 	= self.config['id']
 	def run(self):
@@ -109,7 +117,12 @@ class Regression(BaseLearner):
 			data = ML.Filter('id',self.id,data['folders'])
 			xo  	= ML.Extract(['date'],data)
 			yo	= ML.Extract(['count'],data)
-			numpy.linalg.lstsq(xo, yo, rcond=-1)
+			
+			
+			pass
+			# print np.var(xo,yo)
+
+			
 		
 
 

+ 6 - 1
src/utils/ml.py

@@ -42,7 +42,12 @@ class ML:
 	def Extract(lattr,data):
 		if isinstance(lattr,basestring):
 			lattr = [lattr]
-		return [[row[id] for id in lattr] for row in data]
+		# return  [[row[id] for id in lattr] for row in data]
+		r =  [[row[id] for id in lattr] for row in data]
+		if len(lattr) == 1 :
+			return [x[0] for x in r]
+		else:
+			return r
 	@staticmethod
 	def CleanupName(value) :
 		return value.replace('$','').replace('.+','')

+ 19 - 2
src/utils/transport.py

@@ -562,8 +562,25 @@ class CouchdbWriter(Couchdb,Writer):
 			document[label] = []
 		document[label].append(row)
 		self.dbase.save_doc(document)
-	
-	def flush(self,params=None):
+	def flush(self,params) :
+		
+		size = params['size']
+			
+		document = self.dbase.get(self.uid)
+		for key in documment:
+			if key not in ['_id','_rev','_attachments'] :
+				content = document[key]
+			else:
+				content = []
+			if isinstance(content,list):
+				index = len(content) - size
+				content = content[index:]
+				document[key] = content
+			else:
+				document[key] = {}
+		self.dbase.save_doc(document)
+			
+	def archive(self,params=None):
 		document = self.dbase.get(self.uid)
 		content = {}
 		_doc = {}