123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258 |
- """
- This file is intended to perfom certain machine learning tasks based on numpy
- We are trying to keep it lean that's why no sklearn involved yet
- @TODO:
- Create factory method for the learners implemented here
- Improve preconditions (size of the dataset, labels)
- """
- from __future__ import division
- import numpy as np
- class ML:
- @staticmethod
- def Filter (attr,value,data) :
- #
- # @TODO: Make sure this approach works across all transport classes
- # We may have a potential issue of how the data is stored ... it may not scale
- #
- value = ML.CleanupName(value)
- #return [item[0] for item in data if item and attr in item[0] and item[0][attr] == value]
- #return [[item for item in row if item[attr] == value][0] for row in data]
- #
- # We are making the filtering more rescillient, i.e if an item doesn't exist we don't have to throw an exception
- # This is why we expanded the loops ... fully expressive but rescilient
- #
- r = []
- for row in data :
- for item in row :
- if attr in item and item[attr] == value:
- r.append(item)
- return r
- @staticmethod
- def Extract(lattr,data):
- if isinstance(lattr,basestring):
- lattr = [lattr]
- return [[row[id] for id in lattr] for row in data]
- @staticmethod
- def CleanupName(value) :
- return value.replace('$','').replace('.+','')
-
- """
- Implements a multivariate anomaly detection
- @TODO: determine computationally determine epsilon
- """
- class AnomalyDetection:
-
- def split(self,data,index=-1,threshold=0.65) :
- N = len(data)
- # if N < LIMIT:
- # return None
-
- end = int(N*threshold)
- train = data[:end]
- test = data[end:]
-
- return {"train":train,"test":test}
-
- """
- @param key field name by which the data will be filtered
- @param value field value for the filter
- @param features features to be used in the analysis
- @param labels used to assess performance
- @TODO: Map/Reduce does a good job at filtering
- """
- def learn(self,data,key,value,features,label):
-
-
- if len(data) < 10:
- return None
- xo = ML.Filter(key,value,data)
- if len(xo) < 10 :
- return None
- # attr = conf['features']
- # label= conf['label']
-
- yo= ML.Extract([label['name']],xo)
- xo = ML.Extract(features,xo)
- yo = self.getLabel(yo,label)
- #
- # @TODO: Insure this can be finetuned, training size matters for learning. It's not obvious to define upfront
- #
- xo = self.split(xo)
- yo = self.split(yo)
- p = self.gParameters(xo['train'])
- has_cov = np.linalg.det(p['cov']) if p else False #-- making sure the matrix is invertible
- if xo['train'] and has_cov :
- E = 0.001
- ACCEPTABLE_FSCORE = 0.6
- fscore = 0
- #
- # We need to find an appropriate epsilon for the predictions
- # The appropriate epsilon is one that yields an f-score [0.5,1[
- #
-
- __operf__ = None
- perf = None
- for i in range(0,10):
- Epsilon = E + (2*E*i)
-
- if p is None :
- return None
- #
- # At this point we've got enough data for the parameters
- # We should try to fine tune epsilon for better results
- #
-
- px = self.gPx(p['mean'],p['cov'],xo['test'],Epsilon)
-
-
- __operf__ = self.gPerformance(px,yo['test'])
- if __operf__['fscore'] == 1 :
- continue
- if perf is None :
- perf = __operf__
- elif perf['fscore'] < __operf__['fscore'] and __operf__['fscore'] > ACCEPTABLE_FSCORE :
- perf = __operf__
- perf['epsilon'] = Epsilon
- #
- # At this point we are assuming we came out of the whole thing with an acceptable performance
- # The understanding is that error drives performance thus we reject fscore==1
- #
-
- if perf and perf['fscore'] > ACCEPTABLE_FSCORE :
- return {"label":value,"parameters":p,"performance":perf}
- else:
- return None
- return None
- """
- This function determines if the preconditions for learning are met
- For that parameters are passed to the function
- p
- """
- def canLearn(self,p) :
- pass
- def getLabel(self,yo,label_conf):
- return [ int(len(set(item) & set(label_conf["1"]))>0) for item in yo ]
- """
- This function will compute the probability density function given a particular event/set of events
- The return value is [px,yo]
- @pre xu.shape[0] == sigma[0] == sigma[1]
- """
- def gPx(self,xu,sigma,data,EPSILON=0.01):
- n = len(data[0])
-
- r = []
- a = (2*(np.pi)**(n/2))*np.linalg.det(sigma)**0.5
- # EPSILON = np.float64(EPSILON)
- test = np.array(data)
- for row in test:
- row = np.array(row)
- d = np.matrix(row - xu)
- d.shape = (n,1)
-
- b = np.exp((-0.5*np.transpose(d)) * (np.linalg.inv(sigma)*d))
-
- px = float(b/a)
- r.append([px,int(px < EPSILON)])
- return r
- """
- This function uses stored learnt information to predict on raw data
- In this case it will determin if we have an anomaly or not
- @param xo raw observations (matrix)
- @param info stored information about this
- """
- def predict(self,xo,info):
-
- xo = ML.Extract(info['features'],xo)
-
- if not xo :
- return None
-
- sigma = info['parameters']['cov']
- xu = info['parameters']['mean']
- epsilon = info['performance']['epsilon']
- return self.gPx(xu,sigma,xo,epsilon)
- """
- This function computes performance metrics i.e precision, recall and f-score
- for details visit https://en.wikipedia.org/wiki/Precision_and_recall
- """
- def gPerformance(self,test,labels) :
- N = len(test)
- tp = 0 # true positive
- fp = 0 # false positive
- fn = 0 # false negative
- tn = 0 # true negative
- for i in range(0,N):
- tp += 1 if (test[i][1]==labels[i] and test[i][1] == 1) else 0
- fp += 1 if (test[i][1] != labels[i] and test[i][1] == 1) else 0
- fn += 1 if (test[i][1] != labels[i] and test[i][1] == 0) else 0
- tn += 1 if (test[i][1] == labels[i] and test[i][1] == 0) else 0
- precision = tp / (tp + fp) if tp + fp > 0 else 1
- recall = tp / (tp + fn) if tp + fp > 0 else 1
- fscore = (2 * precision * recall)/ (precision + recall)
- return {"precision":precision,"recall":recall,"fscore":fscore}
- """
- This function returns gaussian parameters i.e means and covariance
- The information will be used to compute probabilities
- """
- def gParameters(self,train) :
- n = len(train[0])
- m = np.transpose(np.array(train))
-
- u = np.array([ np.mean(m[i][:]) for i in range(0,n)])
- if np.sum(u) == 0:
- return None
- r = np.array([ np.sqrt(np.var(m[i,:])) for i in range(0,n)])
- #
- # Before we normalize the data we must insure there's is some level of movement in this application
- # A lack of movement suggests we may not bave enough information to do anything
- #
- if 0 in r :
- return None
- #
- #-- Normalizing the matrix then we will compute covariance matrix
- #
-
- m = np.array([ (m[i,:] - u[i])/r[i] for i in range(0,n)])
- sigma = np.cov(m)
- sigma = [ list(row) for row in sigma]
- return {"cov":sigma,"mean":list(u)}
- class AnalyzeAnomalies(AnomalyDetection):
- """
- This analysis function will include a predicted status because an anomaly can either be
- - A downtime i.e end of day
- - A spike and thus a potential imminent crash
- @param xo matrix of variables
- @param info information about what was learnt
- """
- def predict(self,xo,info):
- x = xo[len(xo)-1]
- r = AnomalyDetection.predict(x,info)
- #
- # In order to determine what the anomaly is we compute the slope (idle or crash)
- # The slope is computed using the covariance / variance of features
- #
- N = len(info['features'])
- xy = ML.Extract(info['features'],xo)
- xy = np.matrix(xy)
- vxy= [xy[:,i] for i in range(0,N)]
- print N,vxy.shape
- alpha = info['cov'] / vxy
- return r
- class Regression:
- parameters = {}
- @staticmethod
- def predict(xo):
- pass
-
- def __init__(self,config):
- pass
|