Browse Source

refactoring data-collector

Steve Nyemba 7 years ago
parent
commit
fcbe36030b
12 changed files with 148 additions and 88 deletions
  1. 0 0
      .gitignore
  2. 0 0
      .gitmodules
  3. 2 12
      init.sh
  4. 0 0
      requirements.txt
  5. 69 29
      src/data-collector.py
  6. 76 46
      src/monitor.py
  7. 0 0
      src/utils/__init__.py
  8. 1 1
      src/utils/params.py
  9. 0 0
      test/TestML.py
  10. 0 0
      test/TestServerMonitor.py
  11. 0 0
      test/data.csv
  12. 0 0
      test/demo.py

+ 0 - 0
.gitignore


+ 0 - 0
.gitmodules


+ 2 - 12
init.sh

@@ -13,16 +13,6 @@ install(){
 	`sandbox/bin/pip freeze|sort |diff requirements.txt -|grep \<|grep -E " .+$" -o|sandbox/bin/pip install --upgrade` 
 	`sandbox/bin/pip freeze|sort |diff requirements.txt -|grep \<|grep -E " .+$" -o|sandbox/bin/pip install --upgrade` 
 
 
 
 
-}
-register(){
-	#
-	# $1 uid 	customer's identifier (email)
-	# $2 pid 	customer's plan identifier
-	#
-	curl -X POST https://the-phi.com/store/init/monitor-logs -H "uid:$1" -H"pid: $2"
-
-
-
 }
 }
 upgrade(){
 upgrade(){
 	git pull
 	git pull
@@ -46,9 +36,9 @@ stop(){
 status(){
 status(){
 	pid=`ps -eo pid,command|grep python|grep -E "$PWD"|grep data-collector|grep -E "^ {0,}[0-9]+" -m 1 -o`
 	pid=`ps -eo pid,command|grep python|grep -E "$PWD"|grep data-collector|grep -E "^ {0,}[0-9]+" -m 1 -o`
 	if [ "$pid" = "" ]; then 
 	if [ "$pid" = "" ]; then 
-		echo "DATA-COLLECTOR IS OFFLINE"
+		echo "Data Collector is Offline"
 	else
 	else
-		echo "DATA-COLLECTOR IS ONLINE $pid"
+		echo "Data Collector is  Online $pid"
 	fi
 	fi
 
 
 }
 }

+ 0 - 0
requirements.txt


+ 69 - 29
src/data-collector.py

@@ -18,8 +18,10 @@ import requests
 import pickle
 import pickle
 import json
 import json
 from threading import Thread, RLock
 from threading import Thread, RLock
-
-ENDPOINT="https://the-phi.com/monitor"
+import monitor
+import utils.agents.actor as actor
+from utils.agents.manager import Manager
+ENDPOINT="http://localhost/monitor"
 class Collector(Thread) :
 class Collector(Thread) :
 	def __init__(self):
 	def __init__(self):
 		Thread.__init__(self)
 		Thread.__init__(self)
@@ -30,47 +32,78 @@ class Collector(Thread) :
 			@param id	node identifier
 			@param id	node identifier
 			
 			
 		"""
 		"""
-		for id in ['apps','folders']:
-			if id in SYS_ARGS :
-				SYS_ARGS[id] = SYS_ARGS[id].split(',')
 		#
 		#
 		# Let's open the file with the key (nothing else should be in the file
 		# Let's open the file with the key (nothing else should be in the file
-		f = open(SYS_ARGS['key'])
-		SYS_ARGS['key'] = f.read()
-		f.close()
+		#f = open(SYS_ARGS['key'])
+		#SYS_ARGS['key'] = f.read()
+		#f.close()
 		
 		
 		headers = {"key":SYS_ARGS["key"],"id":SYS_ARGS["id"]} #,"scope":json.dumps(scope)}
 		headers = {"key":SYS_ARGS["key"],"id":SYS_ARGS["id"]} #,"scope":json.dumps(scope)}
-		headers['content-type'] = 'application/json'
+		self.plan = None
+		self.store= None
+		
+		#headers['content-type'] = 'application/json'
 		try:
 		try:
+			self.key = SYS_ARGS['key']
 			Logger.log(subject='Collector',object='api',action='request',value=ENDPOINT)
 			Logger.log(subject='Collector',object='api',action='request',value=ENDPOINT)
 			url 	= "/".join([ENDPOINT,"init/collector"])
 			url 	= "/".join([ENDPOINT,"init/collector"])
-			data = {}
-			for id in SYS_ARGS :
-				if id not in ['id','key'] :
-					data[id] = SYS_ARGS[id]
 			
 			
-			r	= requests.post(url,headers=headers,data=json.dumps(data))			
+			r	= requests.post(url,headers=headers)
+			
 			r 	= json.loads(r.text)
 			r 	= json.loads(r.text)
 			
 			
-			self.monitor = pickle.loads(r[0])
-			self.monitor.lock = RLock()
-			#:w
-			#self.monitor.set('lock',RLock())
-			Logger.log(subject='Collector',object='api',action='load',value='')
-		except Exception,e:
+			if r :
+				#
+				# Persisting plan and data-store ...
+				self.plan = r['plan']
+				self.store = r['store']
+			info = {"store":self.store,"plan":self.plan}
 			
 			
-			Logger.log(subject='Collector',object='api',action='error',value=str(e))
+			if info['plan'] is not None and info['store'] is not None:
+				info['plan'] = self.plan['name']			
+				info['store'] = self.store['args']['dbname']
+				_action = 'init'
+				self.initialize()
+			else:
+				info['plan'] = self.plan is not None
+				info['store']= self.store is not None
+				_action = 'init.error'
+			Logger.log(subject='collector',object='api',action=_action,value=info)
+		except Exception as e:			
+			print(e)
+			Logger.log(subject='collector',object='api',action='init.error',value=str(e))
 			self.monitor = None
 			self.monitor = None
+	def initialize(self):
+		"""
+			This function creates a monitoring object with the associated parameters from the plan
+			plan.metadata = {"agents":...,"folder_size":...,"delay":...,"limit":...,"actors":...}
+		"""
+		_agents = [monitor.DetailProcess(),monitor.FileWatch()]
+		_actors = [actor.Apps(),actor.Folders(),actor.Mailer()]
+		# Initialiing the agents with the parameter values we know of
+		r = []
+		for agent in _agents :
+			if agent.getName() in SYS_ARGS :
+				agent.init(SYS_ARGS[agent.getName()])
+				r.append(agent)
+		_agents = r 
 
 
+		
+		
+		config = {"store":self.store,"plan":self.plan}
+		self.manager = Manager()		
+		self.manager.init(node=SYS_ARGS['id'],agents=_agents,actors=_actors,config=config,key=self.key)
+		
 	def run(self):
 	def run(self):
 		"""
 		"""
 			This funtion runs the authorized features and 
 			This funtion runs the authorized features and 
 		"""
 		"""
 		#self.monitor.start()
 		#self.monitor.start()
 		
 		
-		Logger.log(subject='Collector',object='monitor',action='start',value='')
-		thread = Thread(target=self.monitor.run)
+		# Logger.log(subject='Collector',object='monitor',action='rpc',value=(self.manager is None) )
+		thread = Thread(target=self.manager.run)
 		thread.start()
 		thread.start()
+		# print self.manager
 		# print self.monitor.config['store']
 		# print self.monitor.config['store']
 		# for agent in self.pool :
 		# for agent in self.pool :
 		# 	try:
 		# 	try:
@@ -83,17 +116,24 @@ class Collector(Thread) :
 		# 	except Exception,e:
 		# 	except Exception,e:
 		# 		print e
 		# 		print e
 
 
-if __name__ == '__main__' and 'path' in SYS_ARGS:
+if __name__ == '__main__' :
 	#
 	#
 	#
 	#
+	if 'path' in SYS_ARGS :	
+		path = SYS_ARGS['path']
+		f = open(path)
+		p = json.loads(f.read())
+		f.close()
+		
+	else:
+		for id in ['apps','folders']:
+			if id in SYS_ARGS :
+				SYS_ARGS[id] = SYS_ARGS[id].split(',')
 	
 	
-	path = SYS_ARGS['path']
-	f = open(path)
-	p = json.loads(f.read())
-	f.close()
+		p = dict(SYS_ARGS)
 	Logger.init('data-collector')
 	Logger.init('data-collector')
 	SYS_ARGS = dict(SYS_ARGS,** p)
 	SYS_ARGS = dict(SYS_ARGS,** p)
 	thread = Collector()
 	thread = Collector()
 	thread.start()
 	thread.start()
 else:
 else:
-	print (h)
+	print (h)

+ 76 - 46
src/monitor.py

@@ -16,6 +16,8 @@ from threading import Thread, RLock
 import time
 import time
 import numpy as np
 import numpy as np
 from utils.ml import ML
 from utils.ml import ML
+import sys
+
 class Analysis:
 class Analysis:
 	def __init__(self):
 	def __init__(self):
 		self.logs = []
 		self.logs = []
@@ -30,6 +32,11 @@ class Analysis:
 		return {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute}
 		return {"month":d.month,"year":d.year, "day":d.day,"hour":d.hour,"minute":d.minute}
 	def getName(self):
 	def getName(self):
 		return self.__class__.__name__
 		return self.__class__.__name__
+	def reboot(self,row,conf) :
+		return False
+	def cleanup(self,text):
+		return re.sub('[^a-zA-Z0-9\s:]',' ',str(text)).strip()
+		
 
 
 """
 """
 	This class is designed to analyze environment variables. Environment variables can either be folders, files or simple values
 	This class is designed to analyze environment variables. Environment variables can either be folders, files or simple values
@@ -106,6 +113,8 @@ class Sandbox(Analysis):
 		return [row.replace('-',' ').replace('_',' ') for row in r if row.strip() != '']	
 		return [row.replace('-',' ').replace('_',' ') for row in r if row.strip() != '']	
 	def evaluate(self):
 	def evaluate(self):
 		pass
 		pass
+	def reboot(self,rows,limit=None) :
+		return sum([ len(item['missing']) for item in rows ]) > 0
 	"""
 	"""
 		This function returns the ratio of existing modules relative to the ones expected
 		This function returns the ratio of existing modules relative to the ones expected
 	"""
 	"""
@@ -148,14 +157,17 @@ class ProcessCounter(Analysis):
 		#n = sum(r)
 		#n = sum(r)
 		#return n/N
 		#return n/N
 		return dict(self.getNow(),**r)
 		return dict(self.getNow(),**r)
+	
 """
 """
 	This class returns an application's both memory and cpu usage
 	This class returns an application's both memory and cpu usage
 """
 """
 class DetailProcess(Analysis):
 class DetailProcess(Analysis):
 	def __init__(self):
 	def __init__(self):
 		Analysis.__init__(self)
 		Analysis.__init__(self)
+	
 	def init (self,names):
 	def init (self,names):
 		#Analysis.init(self)
 		#Analysis.init(self)
+		
 		self.names = names;
 		self.names = names;
 	def getName(self):
 	def getName(self):
 		return "apps"
 		return "apps"
@@ -167,42 +179,52 @@ class DetailProcess(Analysis):
 			return list(g.groups())+['1']+[name]
 			return list(g.groups())+['1']+[name]
 		else:
 		else:
 			return ''
 			return ''
-	def evaluate(self,name) :
-		cmd	= "ps -eo pmem,pcpu,vsize,command|grep -E \":app\""
-		handler = subprocess.Popen(cmd.replace(":app",name),shell=True,stdout=subprocess.PIPE)
-		ostream = handler.communicate()[0].split('\n')
-		#xstr = ostream	
-		ostream = [ self.split(name,row) for row in ostream if row != '' and 'grep' not in row]
-		if len(ostream) == 0 or len(ostream[0]) < 4 :
-			ostream = [['0','0','0','0',name]]
-		r = []
-		for row in ostream :
-			#
-			# Though the comm should only return the name as specified,
-			# On OSX it has been observed that the fully qualified path is sometimes returned (go figure)
-			#
-			row =  [float(value) for value in row if value.strip() != '' and name not in value ] +[re.sub('\$|^','',name)]
-			r.append(row)
+	def reboot(self,rows,conf=None) :
+		return np.sum([int(item['label']=='crash') for item in rows]) > 0
+	def parse(self,row,fields):
+		"""
+			The last field should be the command in its integrity
+			@pre len(fields) > len(row)
+		"""
+		r = {}
+		
+		now = self.getNow()
+		r['date'] = now
+		row = [term for term in row.split() if term.strip() != '']
+		for name in fields :
+			index = fields.index(name)
+
+			r[name] = row[index] if row else 0
+			if name not in ['user','cmd','status','pid'] :
+				r[name] = float(r[name])
+		r[name] = row[index: ] if row else []
 		#
 		#
-		# At this point we should aggregate results
-		# The aggregation is intended for applications with several processes (e.g: apache2)
+		# Let's set the status give the data extracted
 		#
 		#
-		if len(r) > 1:
-			m = None
-			for row in r:
-				if m is None:
-					m = row
-				else:
-					m[3] += row[3]
-					m[0] += row[0]
-					m[1] += row[1]
-					m[2] += row[2]
-			m[0] = round((m[0] / m[3]),2)
-			m[1] = round((m[1] / m[3]),2)
-			m[2] = round((m[2] / m[3]),2)
-
-			r = [m]
+		if r['status'] == 0 :
+			r['status'] = 'crash'
+		elif 'Z' in r['status'] :
+			r['status'] = 'zombie'
+		elif r['memory_usage'] > 0 and r['cpu_usage'] > 0:
+			r['status'] = 'running'
+		else:
+			r['status'] = 'idle'
 		return r
 		return r
+	
+	def evaluate(self,name=None) :
+		if name is None :
+			name = ".*"
+		fields = ["user","pid","memory_usage","cpu_usage","memory_available","status","cmd"]
+		cmd = "ps -eo user,pid,pmem,pcpu,vsize,stat,command|grep -Ei \":app\"".replace(":app",name)
+		handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
+		logs	= handler.communicate()[0].split('\n')
+		logs	= [row for row in logs if (row.strip() != '') and ('grep -Ei' in row )== False ]
+		
+		if len(logs) == 0:
+			return [dict(self.parse('',fields),**{'label':name}) ]
+		else :
+			return [dict(self.parse(row,fields),**{'label':name}) for row in logs  if row.strip() != '' and 'grep' not in row and '-Ei' not in row]
+
 	def status(self,row):
 	def status(self,row):
 		x = row['memory_usage']
 		x = row['memory_usage']
 		y = row['cpu_usage']
 		y = row['cpu_usage']
@@ -213,20 +235,17 @@ class DetailProcess(Analysis):
 			return "idle"
 			return "idle"
 		else:
 		else:
 			return "crash"
 			return "crash"
-	def format(self,row):
-		r= {"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"proc_count":row[3],"label":row[4]}
-		status = self.status(r)
-		r['status'] = status
-		return r
+	#def format(self,row):
+	#	r= {"memory_usage":row[0],"cpu_usage":row[1],"memory_available":row[2]/1000,"proc_count":row[3],"label":self.cleanup(row[4])}
+	#	status = self.status(r)
+	#	r['status'] = status
+	#	return r
 
 
 	def composite(self):
 	def composite(self):
 		ma = []
 		ma = []
-		now = self.getNow()
 		for name in self.names:
 		for name in self.names:
-			
-			matrix = self.evaluate(name)
-			
-			ma += [ dict(now, **self.format(row)) for row in matrix]
+			row = self.evaluate(name)
+			ma += row
 		
 		
 		return ma
 		return ma
 """
 """
@@ -237,6 +256,7 @@ class FileWatch(Analysis):
 	def __init__(self):
 	def __init__(self):
 		pass
 		pass
 	def init(self,folders):
 	def init(self,folders):
+		print folders
 		self.folders = folders;
 		self.folders = folders;
 	def getName(self):
 	def getName(self):
 		return "folders"
 		return "folders"
@@ -277,12 +297,22 @@ class FileWatch(Analysis):
 
 
 	def evaluate(self,path):
 	def evaluate(self,path):
 		cmd = "find  :path -print0|xargs -0 ls -ls |awk '{print $6,$7,$8,$9,$10}'".replace(":path",path)
 		cmd = "find  :path -print0|xargs -0 ls -ls |awk '{print $6,$7,$8,$9,$10}'".replace(":path",path)
-		
 		handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
 		handler = subprocess.Popen(cmd,shell=True,stdout=subprocess.PIPE)
 		ostream = handler.communicate()[0].split('\n')
 		ostream = handler.communicate()[0].split('\n')
-		
+		ostream = [row for row in ostream if row.strip() != '']		
+		print cmd
+		print ostream[0]
+		print ostream[1]
 		#return [self.split(stream) for stream in ostream if stream.strip() != '' and '.DS_Store' not in stream and 'total' not in stream]
 		#return [self.split(stream) for stream in ostream if stream.strip() != '' and '.DS_Store' not in stream and 'total' not in stream]
-		return [self.split(stream) for stream in ostream if path not in stream and not set(['','total','.DS_Store']) & set(stream.split(' '))]
+		#return [self.split(stream) for stream in ostream if path not in stream and not set(['','total','.DS_Store']) & set(stream.split(' '))]
+		return []
+	def toMB(self,size):
+		m = {'GB':1000,'TB':1000000}
+		v,u = size.split(' ')
+		return round(float(v)* m[u.upper()],2)
+
+	def reboot(self,rows,limit) :
+		return np.sum([ int(self.toMB(item['size']) > self.toMB(limit)) for item in rows]) > 0
 	def composite(self):
 	def composite(self):
 		d = [] #-- vector of details (age,size)
 		d = [] #-- vector of details (age,size)
 		
 		

+ 0 - 0
src/utils/__init__.py


+ 1 - 1
src/utils/params.py

@@ -27,6 +27,6 @@ class Logger :
 		logging.basicConfig(filename=name,level=logging.INFO,format="%(message)s")
 		logging.basicConfig(filename=name,level=logging.INFO,format="%(message)s")
 	@staticmethod
 	@staticmethod
 	def log(**args) :
 	def log(**args) :
-		args['date'] = datetime.now().strftime('%d-%m-%Y %M:%H:%S')
+		args['date'] = datetime.now().strftime('%d-%m-%Y %H:%M:%S')
 		logging.info(json.dumps(args))
 		logging.info(json.dumps(args))
 		
 		

+ 0 - 0
test/TestML.py


+ 0 - 0
test/TestServerMonitor.py


+ 0 - 0
test/data.csv


+ 0 - 0
test/demo.py