123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375 |
- """
- This is a RESTful interface implemented using Flask micro framework.
- The API is driven by configuration that is organized in terms of the monitoring classes
- The API is both restful and websocket/socketio enabled.
- We designed the classes to be reusable (and powered by labels):
- 'monitoring-type':
- 'class':'<class-name>'
- 'config':<labeled-class-specific-configuration>'
- @TODO:
- - In order to make this Saas we need to have the configuration be session driven
- - Add socketio, so that each section of the dashboard updates independently
- """
- from flask import Flask, session, request, redirect, Response
- from flask.templating import render_template
- from flask_session import Session
- import time
- import sys
- import os
- import json
- import re
- import monitor
- import Queue
- from utils.transport import *
- from utils.workers import ThreadManager, Factory
- from utils.ml import ML,AnomalyDetection,AnalyzeAnomaly
- import utils.params as SYS_ARGS
- import atexit
- app = Flask(__name__)
- app.config['SECRET_KEY'] = '!h8-[0v8]247-4-360'
- #app.secret_key = 'A0Zr98j/3yX R~XHH!jmN]LWX=?RT'
- PARAMS = SYS_ARGS.PARAMS
- f = open(PARAMS['path'])
- CONFIG = json.loads(f.read())
- f.close()
- #
- #
- #from threading import Thread, RLock
- p = CONFIG['store']['args']
- class_read = CONFIG['store']['class']['read']
- class_write= CONFIG['store']['class']['write']
- factory = DataSourceFactory()
- # gReader = factory.instance(type=class_read,args=p)
- @app.route('/1/get/apps')
- def get_apps():
- r = []
- try:
- gReader = factory.instance(type=class_read,args=p)
- r = gReader.view('summary/app_names',key=p['uid'])
- except Exception,e:
- print (e)
- return json.dumps(r)
- @app.route('/1/get/summary/<id>')
- def get_summary(id):
- r = []
- try:
- gReader = factory.instance(type=class_read,args=p)
- #if id == 'apps_resources' :
- # r = gReader.view('summary/app_resources',key=p['uid'])
- #else:
- # r = gReader.view('summary/folder_size',key=p['uid'])
- id='summary/'+id.strip()
- print p
- print id
- r = gReader.view(id,key=p['uid'])
-
- except Exception,e:
- print (e)
- return json.dumps(r)
- @app.route("/1/sys/usage/trend")
- def get_usage_trend():
- """
- This function returns trends for the 24 most recent observations in the logs
- """
- r = {}
- try:
- gReader = factory.instance(type=class_read,args=p)
- r = gReader.view('summary/resource_usage_trend',key=p['uid'])
- except Exception,e:
- print (e)
- return json.dumps(r)
- @app.route("/1/app/usage/trend")
- def get_usage_detail():
- """
- This function returns trends for the 24 most recent observations in the logs
- """
- r = {}
- try:
- gReader = factory.instance(type=class_read,args=p)
- r = gReader.view('summary/app_resource_usage_details',key=p['uid'])
- except Exception,e:
- print (e)
- return json.dumps(r)
-
- @app.route('/get/<id>')
- def procs(id):
- try:
- gReader = factory.instance(type=class_read,args=p)
- data = gReader.read()
- ahandler = AnalyzeAnomaly()
- learn = {}
- if 'learn' in data :
- for row in data['learn'] :
- label = row['label']
- learn[label] = row
- r = {}
- for label in data :
- if label not in ['learn','folders'] :
- index = len(data[label]) - 1
- row = data[label][index]
- r[label] = row
- #
- # Let us determine if this is a normal operation or not
- # We will update the status of the information ...
- #
- for row in r[label] :
- index = r[label].index(row)
- if row['label'] in learn:
- id = row['label']
- px = ahandler.predict([row],learn[id])
- if px :
- # row['anomaly'] = px[1]==1
- print ""
- print label,' *** ',index
- row = dict(row,**px)
- r[label][index] =row
- #
- # @TODO:
- # Compile a report here that will be sent to the mailing list
- #
- except Exception, e:
- print e
- r = []
- return json.dumps(r)
- """
- This function/endpoint will assess n-virtual environments and return the results
- @TODO: Should this be stored for future mining (I don't think so but could be wrong)
- """
- @app.route('/sandbox')
- def sandbox():
- global CONFIG
- if 'sandbox' in CONFIG: #CONFIG['monitor']:
- #handler = HANDLERS['sandbox']['class']
- #conf = HANDLERS['sandbox']['config']
- r = []
- # p = Factory.instance('sandbox',CONFIG)
- handler = monitor.Sandbox()
- conf = CONFIG['sandbox']
- for id in conf:
- try:
- handler.init(conf[id])
- r.append (dict(handler.composite(),**{"label":id}))
- except Exception,e:
- pass
- else:
- r = []
- return json.dumps(r)
- @app.route('/trends')
- def trends ():
- id = request.args.get('id')
- app = request.args.get('app').strip()
- p = CONFIG['store']['args']
- class_read = CONFIG['store']['class']['read']
- gReader = factory.instance(type=class_read,args=p)
- r = gReader.read()
- if id in r:
- r = r[id] #--matrix
- series = []
- for row in r:
- series += [item for item in row if str(item['label'])== app]
- if len(series) > 12 :
- beg = len(series) - 8
- series = series[beg:]
- return json.dumps(series)
- else:
- return "[]"
- @app.route('/download',methods=['POST'])
- def requirements():
- stream = request.form['missing']
- print stream
- stream = "\n".join(json.loads(stream))
- headers = {"content-disposition":"attachment; filename=requirements.txt"}
- return Response(stream,mimetype='text/plain',headers=headers)
- @app.route('/dashboard')
- def dashboard():
- context = PARAMS['context']
- if 'title' in PARAMS :
- title = PARAMS['title']
- else:
- title = 'Dashboard'
- apps = []
- try:
- gReader = factory.instance(type=class_read,args=p)
- apps = gReader.view('summary/app_names',key=p['uid'])
- except Exception, e:
- print (e)
- return render_template('dashboard.html',context=context,title=title,app_names=apps)
- @app.route('/upgrade')
- def upgrade():
- context = PARAMS['context']
- if 'title' in PARAMS :
- title = PARAMS['title']
- else:
- title = 'Upgrade'
- return render_template('upgrade.html',context=context,title=title)
- @app.route('/user')
- def user():
- context = PARAMS['context']
- if 'title' in PARAMS :
- title = PARAMS['title']
- else:
- title = 'Upgrade'
- return render_template('user.html',context=context,title=title)
- """
- This function is designed to trigger learning for anomaly detection
- @TODO: forward this to a socket i.e non-blocking socket
- """
- @app.route('/anomalies/get')
- def learn():
- global CONFIG
- p = CONFIG['store']['args']
- class_read = CONFIG['store']['class']['read']
- gReader = factory.instance(type=class_read,args=p)
- d = gReader.read()
- if 'learn' in d :
- info = d['learn']
- del d['learn']
- else :
- info = []
- r = []
- if 'id' in request.args:
- id = request.args['id']
- d = d[id]
- params = {}
- for item in info:
- label = item['label']
- params[label] = item
- #apps = list(set(ML.Extract(['label'],d)))
- r = []
- if params :
- #
- # If we have parameters available
- p = AnomalyDetection()
- apps = params.keys()
- for name in apps :
- if name not in params:
- continue
- _info = params[name]
- try:
- xo = ML.Filter('label',name,d)
- except Exception,e:
- xo = []
- #print name,e
- if len(xo) == 0:
- continue
- xo = [xo[ len(xo) -1]]
- value = p.predict(xo,_info)[0]
- if len(value):
- report = dict(_info,**{'predicton':value})
- r.append(report)
- #print app,value
- #if value is not None:
- # r.append(value)
- return json.dumps(r)
- """
- This function returns anomalies for a given context or group of processes
- The information returned is around precision/recall and f-score and parameters
- """
- @app.route('/anomalies/status')
- def anomalies_status():
- global CONFIG
- p = CONFIG['store']['args']
- class_read = CONFIG['store']['class']['read']
- gReader = factory.instance(type=class_read,args=p)
- d = gReader.read()
- if 'learn' in d :
- info = d['learn']
- del d['learn']
- else :
- info = []
- print info
- r = []
- if 'id' in request.args:
- id = request.args['id']
- r = info
- return json.dumps(r)
- @app.route('/folders')
- def get_folders():
- global CONFIG
- p = CONFIG['store']['args']
- class_read = CONFIG['store']['class']['read']
- gReader = factory.instance(type=class_read,args=p)
- d = gReader.read()
- if 'folders' in d:
- d = d['folders']
- hosts = set([row[0]['id'] for row in d])
- m = {}
- for id in hosts:
- for row in d:
- if id == row[0]['id'] :
- m[id] = row
- d = m.values()
- for row in d:
- print row[0]['id']
- # index = len(d) - 1
- # d = d[index]
- # m = {}
- # for row in d :
- # key = row.keys()[0]
- # row = row[key]
- # if key not in m:
- # r.append(row)
- # m[key] = len(r) -1
- # else:
- # index = m[key]
- # r[index] = row
- # d = r
- else:
- d = []
- return json.dumps(d)
- if __name__== '__main__':
- # ThreadManager.start(CONFIG)
- if 'port' not in SYS_ARGS.PARAMS :
- SYS_ARGS.PARAMS['port'] = 8484
- PORT = int(SYS_ARGS.PARAMS['port'])
- app.run(host='0.0.0.0' ,port=PORT,debug=True,threaded=True)
|