""" This is the implementation of a data collection agent The agent's role is intended to : - collect data associated with folder and processes - The agent will also perform various learning tasks Usage: python --path --delay xxx --procs p1,p2,p3 --folders path1,path2 """ from threading import Thread, RLock from utils.params import PARAMS import os import json import time from datetime import datetime from utils.transport import * import monitor class ICollector(Thread) : def __init__(self) : Thread.__init__(self) self.folders = None self.procs = None self.config = None self.pool = [] self.lock = RLock() self.factory = DataSourceFactory() self.init() self.name = 'data-collector@'+self.id def init(self): # # data store configuration (needs to be in a file) # path = PARAMS['path'] if os.path.exists(path) : f = open(path) self.config = json.loads(f.read()) #if 'store' in self.config : # self.config = self.config['store'] f.close() self.id = self.config['id'] #PARAMS['id'] if 'folders' in self.config : #PARAMS : folders = self.config['folders'] #PARAMS['folders'].split(',') self.register('monitor.FileWatch',folders) if 'procs' in self.config : #PARAMS : procs = self.config['procs'] #PARAMS['procs'].split(',') self.register('monitor.DetailProcess',procs) self.quit = False #self.DELAY = PARAMS['delay']*60 self.DELAY = self.config['delay'] """ This function returns an instance of a data collector class : ProcessDetails, FileWatch, ... provided the class name """ def register(self,className,params) : try: agent = eval(className+"()") agent.init(params) self.pool.append( agent ) except Exception,e: print e def stop(self): self.quit = True def run(self): write_class = self.config['store']['class']['write'] read_args = self.config['store']['args'] DELAY = self.config['delay'] * 60 while self.quit == False: for thread in self.pool : id = "@".join([thread.getName(),self.id]) data = thread.composite() label = thread.getName() row = {} if label == 'folders': row = [ dict({"id":self.id}, **_row) for _row in data] else: label = id row = data self.lock.acquire() store = self.factory.instance(type=write_class,args=read_args) store.flush(size=200) store.write(label=label,row=row) self.lock.release() if 'MONITOR_CONFIG_PATH' in os.environ : break print '\t *** ',str(datetime.today()),' ** ' time.sleep(DELAY) print ' *** Exiting ',self.name # read_class=self.config['class']['read'] # store = self.factory.instance(type=write_class,args=read_args) # store.flush() if __name__ == '__main__': thread = ICollector() # thread.daemon = True thread.start()