""" This file is intended to perfom certain machine learning tasks based on numpy We are trying to keep it lean that's why no sklearn involved yet @TODO: Create factory method for the learners implemented here Improve preconditions (size of the dataset, labels) """ from __future__ import division import numpy as np class ML: @staticmethod def Filter (attr,value,data) : # # @TODO: Make sure this approach works across all transport classes # We may have a potential issue of how the data is stored ... it may not scale # value = ML.CleanupName(value) #return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value] #return [[item for item in row if item[attr] == value][0] for row in data] # # We are making the filtering more rescillient, i.e if an item doesn't exist we don't have to throw an exception # This is why we expanded the loops ... fully expressive but rescilient # r = [] for row in data : for item in row : if attr in item and item[attr] == value: r.append(item) return r @staticmethod def Extract(lattr,data): if isinstance(lattr,basestring): lattr = [lattr] return [[row[id] for id in lattr] for row in data] @staticmethod def CleanupName(value) : return value.replace('$','').replace('.+','') """ Implements a multivariate anomaly detection @TODO: determine computationally determine epsilon """ class AnomalyDetection: def split(self,data,index=-1,threshold=0.65) : N = len(data) # if N < LIMIT: # return None end = int(N*threshold) train = data[:end] test = data[end:] return {"train":train,"test":test} """ @param key field name by which the data will be filtered @param value field value for the filter @param features features to be used in the analysis @param labels used to assess performance @TODO: Map/Reduce does a good job at filtering """ def learn(self,data,key,value,features,label): if len(data) < 10: return None xo = ML.Filter(key,value,data) if len(xo) < 10 : return None # attr = conf['features'] # label= conf['label'] yo= ML.Extract([label['name']],xo) xo = ML.Extract(features,xo) yo = self.getLabel(yo,label) # # @TODO: Insure this can be finetuned, training size matters for learning. It's not obvious to define upfront # xo = self.split(xo) yo = self.split(yo) p = self.gParameters(xo['train']) has_cov = np.linalg.det(p['cov']) if p else False #-- making sure the matrix is invertible if xo['train'] and has_cov : E = 0.001 ACCEPTABLE_FSCORE = 0.6 fscore = 0 # # We need to find an appropriate epsilon for the predictions # The appropriate epsilon is one that yields an f-score [0.5,1[ # __operf__ = None perf = None for i in range(0,10): Epsilon = E + (2*E*i) if p is None : return None # # At this point we've got enough data for the parameters # We should try to fine tune epsilon for better results # px = self.gPx(p['mean'],p['cov'],xo['test'],Epsilon) __operf__ = self.gPerformance(px,yo['test']) if __operf__['fscore'] == 1 : continue if perf is None : perf = __operf__ elif perf['fscore'] < __operf__['fscore'] and __operf__['fscore'] > ACCEPTABLE_FSCORE : perf = __operf__ perf['epsilon'] = Epsilon # # At this point we are assuming we came out of the whole thing with an acceptable performance # The understanding is that error drives performance thus we reject fscore==1 # if perf and perf['fscore'] > ACCEPTABLE_FSCORE : return {"label":value,"parameters":p,"performance":perf} else: return None return None """ This function determines if the preconditions for learning are met For that parameters are passed to the function p """ def canLearn(self,p) : pass def getLabel(self,yo,label_conf): return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ] """ This function will compute the probability density function given a particular event/set of events The return value is [px,yo] @pre xu.shape[0] == sigma[0] == sigma[1] """ def gPx(self,xu,sigma,data,EPSILON=0.01): n = len(data[0]) r = [] a = (2*(np.pi)**(n/2))*np.linalg.det(sigma)**0.5 # EPSILON = np.float64(EPSILON) test = np.array(data) for row in test: row = np.array(row) d = np.matrix(row - xu) d.shape = (n,1) b = np.exp((-0.5*np.transpose(d)) * (np.linalg.inv(sigma)*d)) px = float(b/a) r.append([px,int(px < EPSILON)]) return r """ This function uses stored learnt information to predict on raw data In this case it will determin if we have an anomaly or not @param xo raw observations (matrix) @param info stored information about this """ def predict(self,xo,info): xo = ML.Extract(info['features'],xo) if not xo : return None sigma = info['parameters']['cov'] xu = info['parameters']['mean'] epsilon = info['performance']['epsilon'] return self.gPx(xu,sigma,xo,epsilon) """ This function computes performance metrics i.e precision, recall and f-score for details visit https://en.wikipedia.org/wiki/Precision_and_recall """ def gPerformance(self,test,labels) : N = len(test) tp = 0 # true positive fp = 0 # false positive fn = 0 # false negative tn = 0 # true negative for i in range(0,N): tp += 1 if (test[i][1]==labels[i] and test[i][1] == 1) else 0 fp += 1 if (test[i][1] != labels[i] and test[i][1] == 1) else 0 fn += 1 if (test[i][1] != labels[i] and test[i][1] == 0) else 0 tn += 1 if (test[i][1] == labels[i] and test[i][1] == 0) else 0 precision = tp / (tp + fp) if tp + fp > 0 else 1 recall = tp / (tp + fn) if tp + fp > 0 else 1 fscore = (2 * precision * recall)/ (precision + recall) return {"precision":precision,"recall":recall,"fscore":fscore} """ This function returns gaussian parameters i.e means and covariance The information will be used to compute probabilities """ def gParameters(self,train) : n = len(train[0]) m = np.transpose(np.array(train)) u = np.array([ np.mean(m[i][:]) for i in range(0,n)]) if np.sum(u) == 0: return None r = np.array([ np.sqrt(np.var(m[i,:])) for i in range(0,n)]) # # Before we normalize the data we must insure there's is some level of movement in this application # A lack of movement suggests we may not bave enough information to do anything # if 0 in r : return None # #-- Normalizing the matrix then we will compute covariance matrix # m = np.array([ (m[i,:] - u[i])/r[i] for i in range(0,n)]) sigma = np.cov(m) sigma = [ list(row) for row in sigma] return {"cov":sigma,"mean":list(u)} class AnalyzeAnomalies(AnomalyDetection): """ This analysis function will include a predicted status because an anomaly can either be - A downtime i.e end of day - A spike and thus a potential imminent crash @param xo matrix of variables @param info information about what was learnt """ def predict(self,xo,info): x = xo[len(xo)-1] r = AnomalyDetection.predict(x,info) # # In order to determine what the anomaly is we compute the slope (idle or crash) # The slope is computed using the covariance / variance of features # N = len(info['features']) xy = ML.Extract(info['features'],xo) xy = np.matrix(xy) vxy= [xy[:,i] for i in range(0,N)] print N,vxy.shape alpha = info['cov'] / vxy return r class Regression: parameters = {} @staticmethod def predict(xo): pass def __init__(self,config): pass