123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- #import multiprocessing
- from threading import Thread, RLock
- from utils import transport
- from utils.ml import AnomalyDetection
- import time
- import monitor
- import sys
- import os
- """
- This class is intended to collect data given a configuration
- """
- class Top(Thread):
- def __init__(self,_config,lock):
- Thread.__init__(self)
- self.lock = lock
- self.reader_class = _config['store']['class']['read']
- self.write_class = _config['store']['class']['write']
- self.rw_args = _config['store']['args']
- self.factory = transport.DataSourceFactory()
-
- self.name = 'Zulu-Top'
- self.quit = False
-
- className = ''.join(['monitor.',_config['monitor']['processes']['class'],'()'])
- self.handler = eval(className)
- self.config = _config['monitor']['processes']['config']
- def stop(self):
- self.quit = True
- def run(self):
- while self.quit == False:
- for label in self.config :
- self.lock.acquire()
- gwriter = self.factory.instance(type=self.write_class,args=self.rw_args)
- apps = self.config[label]
- self.handler.init(apps)
- r = self.handler.composite()
- gwriter.write(label=label,row=r)
- time.sleep(5)
- self.lock.release()
- if 'MONITOR_CONFIG_PATH' in os.environ:
- #
- # This suggests we are in development mode
- #
- break
- ELLAPSED_TIME = 60*30
- time.sleep(ELLAPSED_TIME)
- print "Exiting ",self.name
-
- class Learner(Thread) :
-
- """
- This function expects paltform config (store,learner)
- It will leverage store and learner in order to operate
- """
- def __init__(self,config,lock):
- Thread.__init__(self)
- self.name = 'Zulu-Learner'
- self.lock = lock
- self.reader_class = config['store']['class']['read']
- self.write_class = config['store']['class']['write']
- self.rw_args = config['store']['args']
- self.features = config['learner']['anomalies']['features']
- self.yo = config['learner']['anomalies']['label']
- self.apps = config['learner']['anomalies']['apps']
- self.factory = transport.DataSourceFactory()
- self.quit = False
-
- def stop(self):
- self.quit = True
- """
- This function will initiate learning every (x-hour)
- If there is nothing to learn the app will simply go to sleep
- """
- def run(self):
- reader = self.factory.instance(type=self.reader_class,args=self.rw_args)
- data = reader.read()
- #
- # This is the motherload of innefficiency ...
- #
- while self.quit == False:
- r = {}
- for key in data :
- logs = data[key]
-
- for app in self.apps:
-
- handler = AnomalyDetection()
- value = handler.learn(logs,'label',app,self.features,self.yo)
-
- if value is not None:
-
- if key not in r:
- r[key] = {}
- r[key][app] = value
- #
- # At this point we've already learnt every thing we need to learn
- #
-
- if r.keys() :
-
- self.lock.acquire()
- writer = self.factory.instance(type=self.write_class,args=self.rw_args)
- writer.write(label='learn',row=r)
- self.lock.release()
- if 'MONITOR_CONFIG_PATH' in os.environ:
- #
- # This suggests we are in development mode
- #
- break
- TIME_ELLAPSED = 60*120 #-- Every 2 hours
- time.sleep(TIME_ELLAPSED)
- print "Exiting ",self.name
-
- """
- This class is a singleton designed to start quit dependent threads
- * monitor is designed to act as a data collection agent
- * learner is designed to be a learner i.e machine learning model(s)
- @TODO:
- - How to move them to processes that can be read by the os (that would allow us to eat our own dog-food)
- - Additionally we also need to have a pruning thread, to control the volume of data we have to deal with.This instills the "will to live" in the application
- """
- class ThreadManager:
-
- Pool = {}
- @staticmethod
- def start(config):
- lock = RLock()
- ThreadManager.Pool['monitor'] = Top(config,lock)
- ThreadManager.Pool['learner'] = Learner(config,lock)
- for id in ThreadManager.Pool :
- thread = ThreadManager.Pool[id]
- thread.start()
- @staticmethod
- def stop():
- for id in ThreadManager.Pool :
- thread = ThreadManager.Pool[id]
- thread.stop()
- @staticmethod
- def status():
- r = {}
- for id in ThreadManager.Pool :
- thread = ThreadManager.Pool[id]
- r[id] = thread.isAlive()
-
-
-
- class Factory :
- """
- This function will return an instance of an object in the specified in the configuration file
- """
- @staticmethod
- def instance(id,config):
- if id in config['monitor'] :
- className = config['monitor'][id]['class']
- ref = "".join(["monitor.",className,"()"])
- ref = eval(ref)
- return {"class":ref,"config":config['monitor'][id]["config"]}
- else:
- return None
-
|