data-collector.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. """
  2. This is the implementation of a data collection agent
  3. The agent's role is intended to :
  4. - collect data associated with folder and processes
  5. - The agent will also perform various learning tasks
  6. Usage:
  7. python --path <config> --delay xxx --procs p1,p2,p3 --folders path1,path2
  8. """
  9. from threading import Thread, RLock
  10. from utils.params import PARAMS
  11. import os
  12. import json
  13. import time
  14. from datetime import datetime
  15. from utils.transport import *
  16. import monitor
  17. class ICollector(Thread) :
  18. def __init__(self) :
  19. Thread.__init__(self)
  20. self.folders = None
  21. self.procs = None
  22. self.config = None
  23. self.pool = []
  24. self.lock = RLock()
  25. self.factory = DataSourceFactory()
  26. self.init()
  27. self.name = 'data-collector@'+self.id
  28. def init(self):
  29. #
  30. # data store configuration (needs to be in a file)
  31. #
  32. path = PARAMS['path']
  33. if os.path.exists(path) :
  34. f = open(path)
  35. self.config = json.loads(f.read())
  36. #if 'store' in self.config :
  37. # self.config = self.config['store']
  38. f.close()
  39. self.id = self.config['id'] #PARAMS['id']
  40. if 'folders' in self.config : #PARAMS :
  41. folders = self.config['folders'] #PARAMS['folders'].split(',')
  42. self.register('monitor.FileWatch',folders)
  43. if 'procs' in self.config : #PARAMS :
  44. procs = self.config['procs'] #PARAMS['procs'].split(',')
  45. self.register('monitor.DetailProcess',procs)
  46. self.quit = False
  47. #self.DELAY = PARAMS['delay']*60
  48. self.DELAY = self.config['delay']
  49. """
  50. This function returns an instance of a data collector class :
  51. ProcessDetails, FileWatch, ... provided the class name
  52. """
  53. def register(self,className,params) :
  54. try:
  55. agent = eval(className+"()")
  56. agent.init(params)
  57. self.pool.append( agent )
  58. except Exception,e:
  59. print e
  60. def stop(self):
  61. self.quit = True
  62. def run(self):
  63. write_class = self.config['store']['class']['write']
  64. read_args = self.config['store']['args']
  65. DELAY = self.config['delay'] * 60
  66. while self.quit == False:
  67. for thread in self.pool :
  68. id = "@".join([thread.getName(),self.id])
  69. data = thread.composite()
  70. label = thread.getName()
  71. row = {}
  72. if label == 'folders':
  73. row = [ dict({"id":self.id}, **_row) for _row in data]
  74. else:
  75. label = id
  76. row = data
  77. self.lock.acquire()
  78. store = self.factory.instance(type=write_class,args=read_args)
  79. store.flush(size=200)
  80. store.write(label=label,row=row)
  81. self.lock.release()
  82. if 'MONITOR_CONFIG_PATH' in os.environ :
  83. break
  84. print '\t *** ',str(datetime.today()),' ** '
  85. time.sleep(DELAY)
  86. print ' *** Exiting ',self.name
  87. # read_class=self.config['class']['read']
  88. # store = self.factory.instance(type=write_class,args=read_args)
  89. # store.flush()
  90. if __name__ == '__main__':
  91. thread = ICollector()
  92. # thread.daemon = True
  93. thread.start()