123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- """
- This is the implementation of a data collection agent
- The agent's role is intended to :
- - collect data associated with folder and processes
- - The agent will also perform various learning tasks
- Usage:
- python --path <config> --delay xxx --procs p1,p2,p3 --folders path1,path2
- """
- from threading import Thread, RLock
- from utils.params import PARAMS
- import os
- import json
- import time
- from datetime import datetime
- from utils.transport import *
- import monitor
- class ICollector(Thread) :
-
- def __init__(self) :
- Thread.__init__(self)
- self.folders = None
- self.procs = None
- self.config = None
- self.pool = []
- self.lock = RLock()
- self.factory = DataSourceFactory()
- self.init()
- self.name = 'data-collector@'+self.id
-
-
- def init(self):
-
-
- #
- # data store configuration (needs to be in a file)
- #
- path = PARAMS['path']
- if os.path.exists(path) :
- f = open(path)
- self.config = json.loads(f.read())
- #if 'store' in self.config :
- # self.config = self.config['store']
- f.close()
- self.id = self.config['id'] #PARAMS['id']
- if 'folders' in self.config : #PARAMS :
- folders = self.config['folders'] #PARAMS['folders'].split(',')
- self.register('monitor.FileWatch',folders)
- if 'procs' in self.config : #PARAMS :
- procs = self.config['procs'] #PARAMS['procs'].split(',')
- self.register('monitor.DetailProcess',procs)
-
- self.quit = False
- #self.DELAY = PARAMS['delay']*60
- self.DELAY = self.config['delay']
- #
- # we need to instanciate the actor orchestrator
- #
- """
- This function returns an instance of a data collector class :
- ProcessDetails, FileWatch, ... provided the class name
- """
- def register(self,className,params) :
- try:
-
- agent = eval(className+"()")
- agent.init(params)
- self.pool.append( agent )
- except Exception,e:
- print e
- def stop(self):
- self.quit = True
- def run(self):
- write_class = self.config['store']['class']['write']
- read_args = self.config['store']['args']
- DELAY = self.config['delay'] * 60
-
- while self.quit == False:
-
- for thread in self.pool :
- id = "@".join([thread.getName(),self.id])
-
- data = thread.composite()
- label = thread.getName()
- row = {}
- if label == 'folders':
- row = [ dict({"id":self.id}, **_row) for _row in data]
-
- else:
- label = id
- row = data
- #
- # At this point we should check for the status and if it prompts an action
- # @TODO Use a design pattern for this ... (Aggregation?)
- # - submit the row to Event for analysis
- # - The event orchestrator will handle things from this point on
- #
- message = {}
-
- message['to'] = thread.getName()
- message['content'] = row
- qwriter = QueueWriter(host=self.config['api'],uid=self.config['key'],qid=self.id)
- qwriter.write(label=self.id,row = message)
- qwriter.close()
-
- self.lock.acquire()
- store = self.factory.instance(type=write_class,args=read_args)
-
- store.flush(size=200)
-
- store.write(label=label,row=row)
- self.lock.release()
- if 'MONITOR_CONFIG_PATH' in os.environ :
- break
- print '\t *** ',str(datetime.today()),' ** '
- time.sleep(DELAY)
-
- print ' *** Exiting ',self.name
- # read_class=self.config['class']['read']
- # store = self.factory.instance(type=write_class,args=read_args)
- # store.flush()
-
- if __name__ == '__main__':
- thread = ICollector()
- # thread.daemon = True
- thread.start()
|