data-collector.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. """
  2. This is the implementation of a data collection agent
  3. The agent's role is intended to :
  4. - collect data associated with folder and processes
  5. - The agent will also perform various learning tasks
  6. Usage:
  7. python --path <config> --delay xxx --procs p1,p2,p3 --folders path1,path2
  8. """
  9. from threading import Thread, RLock
  10. from utils.params import PARAMS
  11. import os
  12. import json
  13. import time
  14. from datetime import datetime
  15. from utils.transport import *
  16. import monitor
  17. class ICollector(Thread) :
  18. def __init__(self) :
  19. Thread.__init__(self)
  20. self.folders = None
  21. self.procs = None
  22. self.config = None
  23. self.pool = []
  24. self.lock = RLock()
  25. self.factory = DataSourceFactory()
  26. self.init()
  27. self.name = 'data-collector@'+self.id
  28. def init(self):
  29. #
  30. # data store configuration (needs to be in a file)
  31. #
  32. path = PARAMS['path']
  33. if os.path.exists(path) :
  34. f = open(path)
  35. self.config = json.loads(f.read())
  36. #if 'store' in self.config :
  37. # self.config = self.config['store']
  38. f.close()
  39. self.id = self.config['id'] #PARAMS['id']
  40. if 'folders' in self.config : #PARAMS :
  41. folders = self.config['folders'] #PARAMS['folders'].split(',')
  42. self.register('monitor.FileWatch',folders)
  43. if 'procs' in self.config : #PARAMS :
  44. procs = self.config['procs'] #PARAMS['procs'].split(',')
  45. self.register('monitor.DetailProcess',procs)
  46. self.quit = False
  47. #self.DELAY = PARAMS['delay']*60
  48. self.DELAY = self.config['delay']
  49. #
  50. # we need to instanciate the actor orchestrator
  51. #
  52. """
  53. This function returns an instance of a data collector class :
  54. ProcessDetails, FileWatch, ... provided the class name
  55. """
  56. def register(self,className,params) :
  57. try:
  58. agent = eval(className+"()")
  59. agent.init(params)
  60. self.pool.append( agent )
  61. except Exception,e:
  62. print e
  63. def stop(self):
  64. self.quit = True
  65. def run(self):
  66. write_class = self.config['store']['class']['write']
  67. read_args = self.config['store']['args']
  68. DELAY = self.config['delay'] * 60
  69. while self.quit == False:
  70. for thread in self.pool :
  71. id = "@".join([thread.getName(),self.id])
  72. data = thread.composite()
  73. label = thread.getName()
  74. row = {}
  75. if label == 'folders':
  76. row = [ dict({"id":self.id}, **_row) for _row in data]
  77. else:
  78. label = id
  79. row = data
  80. #
  81. # At this point we should check for the status and if it prompts an action
  82. # @TODO Use a design pattern for this ... (Aggregation?)
  83. # - submit the row to Event for analysis
  84. # - The event orchestrator will handle things from this point on
  85. #
  86. message = {}
  87. message['to'] = thread.getName()
  88. message['content'] = row
  89. qwriter = QueueWriter(host=self.config['api'],uid=self.config['key'],qid=self.id)
  90. qwriter.write(label=self.id,row = message)
  91. qwriter.close()
  92. self.lock.acquire()
  93. store = self.factory.instance(type=write_class,args=read_args)
  94. store.flush(size=200)
  95. store.write(label=label,row=row)
  96. self.lock.release()
  97. if 'MONITOR_CONFIG_PATH' in os.environ :
  98. break
  99. print '\t *** ',str(datetime.today()),' ** '
  100. time.sleep(DELAY)
  101. print ' *** Exiting ',self.name
  102. # read_class=self.config['class']['read']
  103. # store = self.factory.instance(type=write_class,args=read_args)
  104. # store.flush()
  105. if __name__ == '__main__':
  106. thread = ICollector()
  107. # thread.daemon = True
  108. thread.start()