workers.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. #import multiprocessing
  2. from threading import Thread, RLock
  3. from utils import transport
  4. from utils.ml import AnomalyDetection
  5. import time
  6. import monitor
  7. import sys
  8. import os
  9. """
  10. This class is intended to collect data given a configuration
  11. """
  12. class Top(Thread):
  13. def __init__(self,_config,lock):
  14. Thread.__init__(self)
  15. self.lock = lock
  16. self.reader_class = _config['store']['class']['read']
  17. self.write_class = _config['store']['class']['write']
  18. self.rw_args = _config['store']['args']
  19. self.factory = transport.DataSourceFactory()
  20. self.name = 'Zulu-Top'
  21. self.quit = False
  22. className = ''.join(['monitor.',_config['monitor']['processes']['class'],'()'])
  23. self.handler = eval(className)
  24. self.config = _config['monitor']['processes']['config']
  25. def stop(self):
  26. self.quit = True
  27. def run(self):
  28. while self.quit == False:
  29. for label in self.config :
  30. self.lock.acquire()
  31. gwriter = self.factory.instance(type=self.write_class,args=self.rw_args)
  32. apps = self.config[label]
  33. self.handler.init(apps)
  34. r = self.handler.composite()
  35. gwriter.write(label=label,row=r)
  36. time.sleep(5)
  37. self.lock.release()
  38. if 'MONITOR_CONFIG_PATH' in os.environ:
  39. #
  40. # This suggests we are in development mode
  41. #
  42. break
  43. ELLAPSED_TIME = 60*30
  44. time.sleep(ELLAPSED_TIME)
  45. print "Exiting ",self.name
  46. class Learner(Thread) :
  47. """
  48. This function expects paltform config (store,learner)
  49. It will leverage store and learner in order to operate
  50. """
  51. def __init__(self,config,lock):
  52. Thread.__init__(self)
  53. self.name = 'Zulu-Learner'
  54. self.lock = lock
  55. self.reader_class = config['store']['class']['read']
  56. self.write_class = config['store']['class']['write']
  57. self.rw_args = config['store']['args']
  58. self.features = config['learner']['anomalies']['features']
  59. self.yo = config['learner']['anomalies']['label']
  60. self.apps = config['learner']['anomalies']['apps']
  61. self.factory = transport.DataSourceFactory()
  62. self.quit = False
  63. def stop(self):
  64. self.quit = True
  65. """
  66. This function will initiate learning every (x-hour)
  67. If there is nothing to learn the app will simply go to sleep
  68. """
  69. def run(self):
  70. reader = self.factory.instance(type=self.reader_class,args=self.rw_args)
  71. data = reader.read()
  72. #
  73. # This is the motherload of innefficiency ...
  74. #
  75. while self.quit == False:
  76. r = {}
  77. for key in data :
  78. logs = data[key]
  79. for app in self.apps:
  80. handler = AnomalyDetection()
  81. value = handler.learn(logs,'label',app,self.features,self.yo)
  82. if value is not None:
  83. if key not in r:
  84. r[key] = {}
  85. r[key][app] = value
  86. #
  87. # At this point we've already learnt every thing we need to learn
  88. #
  89. if r.keys() :
  90. self.lock.acquire()
  91. writer = self.factory.instance(type=self.write_class,args=self.rw_args)
  92. writer.write(label='learn',row=r)
  93. self.lock.release()
  94. if 'MONITOR_CONFIG_PATH' in os.environ:
  95. #
  96. # This suggests we are in development mode
  97. #
  98. break
  99. TIME_ELLAPSED = 60*120 #-- Every 2 hours
  100. time.sleep(TIME_ELLAPSED)
  101. print "Exiting ",self.name
  102. """
  103. This class is a singleton designed to start quit dependent threads
  104. * monitor is designed to act as a data collection agent
  105. * learner is designed to be a learner i.e machine learning model(s)
  106. @TODO:
  107. - How to move them to processes that can be read by the os (that would allow us to eat our own dog-food)
  108. - Additionally we also need to have a pruning thread, to control the volume of data we have to deal with.This instills the "will to live" in the application
  109. """
  110. class ThreadManager:
  111. Pool = {}
  112. @staticmethod
  113. def start(config):
  114. lock = RLock()
  115. ThreadManager.Pool['monitor'] = Top(config,lock)
  116. ThreadManager.Pool['learner'] = Learner(config,lock)
  117. for id in ThreadManager.Pool :
  118. thread = ThreadManager.Pool[id]
  119. thread.start()
  120. @staticmethod
  121. def stop():
  122. for id in ThreadManager.Pool :
  123. thread = ThreadManager.Pool[id]
  124. thread.stop()
  125. @staticmethod
  126. def status():
  127. r = {}
  128. for id in ThreadManager.Pool :
  129. thread = ThreadManager.Pool[id]
  130. r[id] = thread.isAlive()
  131. class Factory :
  132. """
  133. This function will return an instance of an object in the specified in the configuration file
  134. """
  135. @staticmethod
  136. def instance(id,config):
  137. if id in config['monitor'] :
  138. className = config['monitor'][id]['class']
  139. ref = "".join(["monitor.",className,"()"])
  140. ref = eval(ref)
  141. return {"class":ref,"config":config['monitor'][id]["config"]}
  142. else:
  143. return None