Jelajahi Sumber

refactor data-collector, initializes remotely with plan

Steve Nyemba 7 tahun lalu
induk
melakukan
62f3afb897

+ 0 - 0
src/utils/agents/__init__.py


+ 254 - 0
src/utils/agents/actor.py

@@ -0,0 +1,254 @@
+"""
+	This class is designed to be an actor class i.e it will undertake certain actions given an event detected
+	The platform has 2 main sections (detection & analysis).
+	Action Types (Actors):
+		- Alert : Sends an email or Webhook
+		- Apps 	: Kill, Start
+		- Folder: Archive, Delete (all, age, size)
+        By design we are to understand that a message is structured as follows:
+            {to,from,content} with content either being an arbitrary stream (or JSON)
+	@TODO: 
+		- upgrade to python 3.x
+"""
+import json
+from threading import Thread
+import os
+import shutil
+import subprocess
+import re
+from monitor import ProcessCounter
+from utils.transport import QueueListener, QueueWriter, QueueReader
+from utils.params import PARAMS
+from ngram import NGram as ng
+import smtplib
+from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
+
+class Actor():
+    @staticmethod
+    def instance(name,args):
+        """
+            This function is a singleton that acts as a factory object for all the instances of this subclass
+            @param name name of the class to instantiate
+            @param args arguments to be passed in {configuration}
+        """
+        o = None
+        try:
+            o = eval("".join([name,"()"]))
+            o.init(args)
+        except Exception,e:
+            print str(e)
+        return o
+    def __init__(self):        
+        """
+            Initializing the class with configuration. The configuration will be specific to each subclass
+            @param args arguments the class needs to be configured
+        """
+        
+        pass
+    def getName(self):
+        return self.__class__.__name__.lower()
+
+    # def getIdentifier(self):
+    #     return self.__class__.__name__.lower()
+
+    def init(self,args):
+        self.config = args
+        
+    def isValid(self,item):
+        return False
+
+    def execute(self,cmd):
+        stream = None
+        try:
+            # subprocess.call (cmd,shell=False)
+            out = subprocess.Popen(cmd,stdout=subprocess.PIPE)
+            print out
+            #stream = handler.communicate()[0]
+        except Exception,e:
+            pass      
+    def post(self,**args):    
+        pass
+class Apps(Actor) :
+    """
+	    This class is designed to handle application, restart, if need be.
+	    conf{app-name:{args}}
+    """
+    def __init__(self):
+        Actor.__init__(self)
+        self.ng = None
+    def init(self,config) :
+        """
+            This function will initialize the the actor with applications and associated arguments
+            @param args {"apps_o":"","app_x":params}
+        """
+        Actor.init(self,config)        
+        self.ng = ng(self.config.keys())
+    
+    
+    def can_start(self,name):
+        """
+            This function is intended to determine if it is possible to boot an application
+
+        """
+        items = self.ng.search(name) if self.ng is not None else []
+        if len(items) == 0 :
+            return False
+        else:
+            return items[0][1] > 0.01
+
+    def startup(self,name) :
+        """
+            This function is intended to start a program given the configuration
+        """        
+        items   = self.ng.search(name)[0]
+        app = items[0]
+        args = self.config[app]
+        
+        cmd = " ".join([app,args,"&"    ])        
+        self.execute([app,args])
+        
+    def kill(self,name) :
+        """ 
+            kill processes given the name, The function will not be case sensitive and partial names are accepted
+            @NOTE: Make sure the reference to the app is not ambiguous
+        """
+        args = "".join(['ps -eo pid,command|grep -E -i "',name.lower(),'"|grep -E "^ {0,}[0-9]+" -o|xargs kill -9'])        
+        #self.execute([args])
+        subprocess.call([args],shell=True)
+        
+    def analyze(self,logs) :
+        """
+            This function is designed to analyze a few logs and take appropriate action
+            @param logs logs of application/process data; folder analysis or sandbox analysis
+        """
+        for item in logs :
+            name = item['label']
+            if self.can_start(name) :
+                self.startup(name)
+            #
+            
+
+class Mailer (Actor):
+    """
+        This class is a mailer agent
+    """
+    def __init__(self):
+        Actor.__init__(self)
+    """
+        conf = {uid:<account>,host:<host>,port:<port>,password:<password>}
+    """
+    def init(self,conf) :
+        self.uid = conf['uid']
+
+
+        try:
+
+            self.handler = smtplib.SMTP_SSL(conf['host'],conf['port'])
+            r = self.handler.login(self.uid,conf['password'])
+            #
+            # @TODO: Check the status of the authentication
+            # If not authenticated the preconditions have failed
+            #
+        except Exception,e:
+            print str(e)
+            self.handler = None
+            pass
+
+
+    def send(self,**args) :
+        subject = args['subject']
+        message = args['message']
+        to	= args['to']
+        if '<' in message and '>' in message :
+            message = MIMEText(message,'html')
+        else:
+            message = MIMEText(message,'plain')
+        message['From'] = self.uid
+        message['To']	= to
+        message['Subject'] = subject
+        return self.handler.sendmail(self.uid,to,message.as_string())
+    def close(self):
+        self.handler.quit()
+
+            
+class Folders(Actor):
+    def __init__(self):
+        Actor.__init__(self)
+    """
+        This is designed to handle folders i.e cleaning/archiving the folders
+        if the user does NOT have any keys to cloud-view than she will not be able to archive
+        {threshold:value}
+        @params threshold   in terms of size, or age. It will be applied to all folders
+    """
+    
+    def init(self,args):
+        #self.lfolders   = args['folders'] #config['folders']
+        #self.action     = args['action'] #{clear,archive} config['actions']['folders']
+        self.threshold  = self.get_size( args['threshold']) #self.config['threshold'])
+        
+    
+    def archive(self,item):
+        """
+            This function will archive all files in a given folder
+            @pre : isValid
+        """
+        folder = item['label']
+	name = folder.split(os.sep)
+	name = name[len(name)-1]
+        signature='-'.join([name,str(item['date']),str(item['count']),'files'])
+        tarball=os.sep.join([folder,'..',signature])
+        shutil.make_archive(tarball,'tar',folder)
+        self.clean(item)
+        #
+        # @TODO: The archive can be uploaded to the cloud or else where
+        #   @param id   cloud service idenfier {dropbox,box,google-drive,one-drive}
+        #   @param key  authorization key for the given service
+        #
+        pass
+    
+    def clean(self,item):
+        """
+            This function consists in deleting files from a given folder
+        """
+        rpath = item['label']
+        files = os.listdir(item['label'])
+        for name in list(files) :
+            path = os.sep.join([item['label'],name])
+            if os.path.isdir(path) :
+                shutil.rmtree(path)
+            else:
+                os.remove(path)
+        #
+        # 
+
+    def get_size(self,value):
+        """
+            converts size values into MB and returns the value without units
+        """
+        units = {'MB':1000,'GB':1000000,'TB':1000000000} # converting to kb
+        key = set(units.keys()) & set(re.split('(\d+)',value.replace(' ','').upper()))
+        
+        
+        if len(key) == 0:
+            return -1
+        key = key.pop()
+        return float(value.upper().replace('MB','').strip()) * units[key]
+    
+    def can_clean(self,item):        
+        """
+            This function returns whether the following :
+            p : folder exists
+            q : has_reached threashold
+        """
+        p = os.path.exists(item['label']) and item['label'] in self.lfolders    
+        q = self.get_size(item['size']) >= self.threshold
+        return p and q
+    
+    def analyze(self,logs):
+        r = {'clean':self.clean,'archive':self.archive}
+        self.lfolders = [ folder['label'] for folder in logs]
+        for item in logs :
+            if self.can_clean(item) :
+                self.archive(item)
+                #self.clean(item)

+ 0 - 0
src/utils/agents/learner.py


+ 256 - 0
src/utils/agents/manager.py

@@ -0,0 +1,256 @@
+"""
+	Features :
+		- data collection
+		- detection, reboot	(service)
+		- respond to commands	(service)
+"""
+#from threading import Thread, RLock
+from __future__ import division
+import os
+import json
+import time
+from datetime import datetime
+from utils.transport import *
+import monitor
+import requests
+class Manager() :
+	def version(self):
+		return 1.1
+	"""
+		
+		delay : <value>
+		limit : <value>
+		scope : apps,folders,learner,sandbox
+	"""
+	def __init__(self):
+		self.factory	= DataSourceFactory()
+	def set(self,name,value):
+		setattr(name,value)
+	def init(self,**args) :
+		self.id		= args['node']
+		self.agents 	= args['agents']
+		self.config	= dict(args['config'])
+		self.key	= args['key']
+		self.actors	= args['actors']
+		self.plan	= self.config['plan']
+		
+		self.DELAY  = int(self.plan['metadata']['delay'])
+		
+		self.update()	#-- Initializing status information
+
+	def update(self) :
+		"""
+			This method inspect the plans for the current account and makes sure it can/should proceed
+			The user must be subscribed and to the service otherwise this is not going to work
+		"""
+		# url="https://the-phi.com/store/status/monitor"
+		# r = requests.get(url,headers={"uid":self.key})
+		# plans = json.loads(r.text)
+		# meta =  [item['metadata'] for item in plans if item['status']=='active' ]
+		
+		meta = self.plan['metadata']
+		
+		if meta :
+			self.DELAY = 60* int(meta['delay'])
+			self.LIMIT = int(meta['limit'])
+			#dbname = [item['name'] for item in plans if int(item['metadata']['limit']) == self.LIMIT][0]
+			#self.config['store']['args']['dbname'] = dbname
+
+		else:
+			self.DELAY = -1
+			self.LIMIT = -1
+	
+		#self.filter(meta)
+		
+		self.agents  = self.filter('agents',meta,self.agents)
+		self.actors = self.filter('actors',meta,self.actors)
+		#self.setup(meta)
+
+	def filter_collectors(self,meta) :
+		"""
+			remove collectors that are not specified by the plan
+			Note that the agents (collectors) have already been initialized ?
+		"""
+		values = meta['agents'].replace(' ','').split(',')
+		self.agents = [agent for agent in self.agents if agent.getName() in values]
+
+		
+	def filter_actors(self,meta):
+		"""
+			removes actors that are NOT specified by the subscription plan
+			Note that the actor have already been instatiated and need initialization
+		"""
+		values = meta['actors'].replace(' ','').split('.')
+		self.actors = [actor for actor in self.actors if actor.getName() in values]
+	
+	def filter(self,id,meta,objects):
+		values = meta[id].replace(' ','').split(',')
+		return [item for item in objects if item.getName() in values]
+	def __filter(self,meta) :
+		scope = []
+		lactors= []
+		for item in meta :
+			scope = scope + item['scope'].split(',')
+			if 'actors' in item :
+				lactors= lactors + item['actors'].split(',')
+		self.agents = [agent for agent in self.agents if agent.getName() in scope]
+		if len(lactors) == 0 :
+			self.actors = []
+		self.actors = [ actor for actor in self.actors if actor.getIdentifier() in lactors]
+		if len(self.actors) > 0 :
+			#
+			# We should configure the actors accordingly and make sure they are operational
+			#
+			
+			conf = {"apps":None}
+			#
+			# We need to get the configuration for the apps remotely
+			# 
+			read_class 	= self.config['store']['class']['read']
+			read_args	= self.config['store']['args']
+			couchdb	= self.factory.instance(type=read_class,args=read_args)
+			uinfo 	= couchdb.view('config/apps',key=self.key)
+			if 'apps' in uinfo :
+				conf['apps'] = uinfo['apps']
+			#
+			# Threshold will always give a default value
+			#
+			info = couchdb.view('config/folders',key=self.key)
+			threshold = info
+			conf['folder_threshold'] = threshold
+
+			mailer = None
+			for actor in self.actors :
+				id = actor.getIdentifier()
+				if id == "mailer" :
+					mailer = actor.Mailer()
+				if conf[id] is None :
+					continue
+				args = conf[id]
+				actor.init(args)
+			#
+			# Initializing the mailer
+			if mailer is not None and mailer in self.config:
+				mailer.init(self.config['mailer'])
+			
+		return meta
+	def setup(self,meta) :
+		conf = {"folders":None,"apps":None}
+		read_class 	= self.config['store']['class']['read']
+		read_args	= self.config['store']['args']
+		
+
+		couchdb	= self.factory.instance(type=read_class,args=read_args)
+		args 	= couchdb.view('config/apps',key=self.key)
+		if len(args.keys()) > 0 :
+			self.apply_setup('apps',args)
+		args = couchdb.view('config/folders',key=self.key)
+		
+		if 'folder_size' not in meta :
+			args['threshold'] = meta['folder_size']
+		self.apply_setup('folders',args)			
+			
+	def apply_setup(self,name,args) :
+		for actor in self.actors :
+			if args is not None and actor.getName() == name and len(args.keys()) > 0:								
+				actor.init(args)
+
+		
+	def __setup(self,meta):
+		#
+		# We should configure the actors accordingly and make sure they are operational
+		#
+		conf = {"folders":meta['folder_threshold'],"apps":None}
+		#
+		# We need to get the configuration for the apps remotely
+		# 
+		read_class 	= self.config['store']['class']['read']
+		read_args	= self.config['store']['args']
+		
+		couchdb	= self.factory.instance(type=read_class,args=read_args)
+		uinfo 	= couchdb.view('config/apps',key=self.key)
+		if 'apps' in uinfo :
+			conf['apps'] = uinfo['apps']
+		
+		for agent in self.agents :
+			agent.init(conf['apps'])
+
+		mailer = None
+		for actor in self.actors :
+			id = actor.getIdentifier()
+			if id == "mailer" :
+				mailer = actor.Mailer()
+			if conf[id] is None :
+				continue
+			args = conf[id]
+			actor.init(args)
+		#
+		# Initializing the mailer
+		if mailer is not None and mailer in self.config:
+			mailer.init(self.config['mailer'])
+				
+	def isvalid(self):
+		self.update()
+		return self.DELAY > -1 and self.LIMIT > -1
+	def post(self,row) :
+		"""
+			This function is designed to take appropriate action if a particular incident has been detected
+			@param label	
+			@param row	data pulled extracted
+		"""
+		message = {}
+		message['action'] = 'reboot'
+		message['node']	= label
+
+	def callback(self,channel,method,header,stream):
+		"""
+			This function enables the manager to be able to receive messages and delegate them to the appropriate actor
+			@channel
+		"""
+		print [channel,header]
+		message = json.loads(stream)
+		data = message['data']
+
+	def run(self):
+		#DELAY=35*60 #- 35 Minutes
+		#LIMIT=1000
+		COUNT = 0
+		COUNT_STOP 	= int(24*60/ self.DELAY)
+		write_class	= self.config['store']['class']['write']
+		read_args	= self.config['store']['args']
+		while True :
+			COUNT += 1
+			if COUNT > COUNT_STOP :
+				if self.isvalid() :
+					COUNT = 0
+				else:
+					break
+			for agent in self.agents :
+				data	= agent.composite()
+				label	= agent.getName()
+				node	= '@'.join([label,self.id])
+				row		= {}
+				if label == 'folders':
+					row = [ dict({"id":self.id}, **_row) for _row in data]					
+					
+				else:
+					#label = id
+					row = data
+				if type(row)==list and len(row) == 0 :
+					continue
+					
+				#
+				#
+				index = self.agents.index(agent)
+
+				if len(self.actors) > index and self.actors[index].getName() == agent.getName() :
+					actor = self.actors[index]
+					print actor.analyze(row)
+
+				# self.lock.acquire()
+				store = self.factory.instance(type=write_class,args=read_args)
+				store.flush(size=self.LIMIT)
+				store.write(label=node,row=row)
+				# self.lock.release()
+			time.sleep(self.DELAY)
+				

+ 709 - 0
src/utils/transport.py

@@ -0,0 +1,709 @@
+"""
+	This file implements data transport stuctures in order to allow data to be moved to and from anywhere
+	We can thus read data from disk and write to the cloud,queue, or couchdb or SQL
+"""
+from flask import request, session
+import os
+import pika
+import json
+import numpy as np
+from couchdbkit import Server
+import re
+from csv import reader
+from datetime import datetime
+"""
+	@TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data
+"""
+class Reader:
+	def __init__(self):
+		self.nrows = 0
+		self.xchar = None
+		
+	def row_count(self):		
+		content = self.read()
+		return np.sum([1 for row in content])
+	"""
+		This function determines the most common delimiter from a subset of possible delimiters. It uses a statistical approach to guage the distribution of columns for a given delimiter
+	"""
+	def delimiter(self,sample):
+		
+		m = {',':[],'\t':[],'|':[],'\x3A':[]} 
+		delim = m.keys()
+		for row in sample:
+			for xchar in delim:
+				if row.split(xchar) > 1:	
+					m[xchar].append(len(row.split(xchar)))
+				else:
+					m[xchar].append(0)
+				
+				
+					
+		#
+		# The delimiter with the smallest variance, provided the mean is greater than 1
+		# This would be troublesome if there many broken records sampled
+		#
+		m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1}
+		index = m.values().index( min(m.values()))
+		xchar = m.keys()[index]
+		
+		return xchar
+	"""
+		This function determines the number of columns of a given sample
+		@pre self.xchar is not None
+	"""
+	def col_count(self,sample):
+		
+		m = {}
+		i = 0
+		
+		for row in sample:
+			row = self.format(row)
+			id = str(len(row))
+			#id = str(len(row.split(self.xchar))) 
+			
+			if id not in m:
+				m[id] = 0
+			m[id] = m[id] + 1
+		
+		index = m.values().index( max(m.values()) )
+		ncols = int(m.keys()[index])
+		
+		
+		return ncols;
+	"""
+		This function will clean records of a given row by removing non-ascii characters
+		@pre self.xchar is not None
+	"""
+	def format (self,row):
+		
+		if isinstance(row,list) == False:
+			#
+			# We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary)
+			cols = self.split(row)
+			#cols = row.split(self.xchar)
+		else:
+			cols = row ;
+		return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols]
+		
+		#if isinstance(row,list) == False:
+		#	return (self.xchar.join(r)).format('utf-8')
+		#else:
+		#	return r
+	"""
+		This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes.
+		@pre : self.xchar is not None
+	""" 
+	def split (self,row):
+
+		pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"])
+		return re.findall(pattern,row.replace('\n',''))
+		
+class Writer:
+	
+	def format(self,row,xchar):
+		if xchar is not None and isinstance(row,list):
+			return xchar.join(row)+'\n'
+		elif xchar is None and isinstance(row,dict):
+			row = json.dumps(row)
+		return row
+	"""
+		It is important to be able to archive data so as to insure that growth is controlled
+		Nothing in nature grows indefinitely neither should data being handled.
+	"""
+	def archive(self):
+		pass
+	def flush(self):
+		pass
+	
+"""
+  This class is designed to read data from an Http request file handler provided to us by flask
+  The file will be heald in memory and processed accordingly
+  NOTE: This is inefficient and can crash a micro-instance (becareful)
+"""
+class HttpRequestReader(Reader):
+	def __init__(self,**params):
+		self.file_length = 0
+		try:
+			
+			#self.file = params['file']	
+			#self.file.seek(0, os.SEEK_END)
+			#self.file_length = self.file.tell()
+			
+			#print 'size of file ',self.file_length
+			self.content = params['file'].readlines()
+			self.file_length = len(self.content)
+		except Exception, e:
+			print "Error ... ",e
+			pass
+		
+	def isready(self):
+		return self.file_length > 0
+	def read(self,size =-1):
+		i = 1
+		for row in self.content:
+			i += 1
+			if size == i:
+				break
+			yield row
+		
+"""
+	This class is designed to write data to a session/cookie
+"""
+class HttpSessionWriter(Writer):
+	"""
+		@param key	required session key
+	"""
+	def __init__(self,**params):
+		self.session = params['queue']
+		self.session['sql'] = []
+		self.session['csv'] = []
+		self.tablename = re.sub('..+$','',params['filename'])
+		self.session['uid'] = params['uid']
+		#self.xchar = params['xchar']
+			
+		
+	def format_sql(self,row):
+		values = "','".join([col.replace('"','').replace("'",'') for col in row])
+		return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)		
+	def isready(self):
+		return True
+	def write(self,**params):
+		label = params['label']
+		row = params ['row']
+		
+		if label == 'usable':
+			self.session['csv'].append(self.format(row,','))
+			self.session['sql'].append(self.format_sql(row))
+		
+"""
+  This class is designed to read data from disk (location on hard drive)
+  @pre : isready() == True
+"""
+class DiskReader(Reader) :
+	"""
+		@param	path	absolute path of the file to be read
+	"""
+	def __init__(self,**params):
+		Reader.__init__(self)
+		self.path = params['path'] ;
+
+	def isready(self):
+		return os.path.exists(self.path) 
+	"""
+		This function reads the rows from a designated location on disk
+		@param	size	number of rows to be read, -1 suggests all rows
+	"""
+	def read(self,size=-1):
+		f = open(self.path,'rU') 
+		i = 1
+		for row in f:
+			
+			i += 1
+			if size == i:
+				break
+			yield row
+		f.close()
+"""
+	This function writes output to disk in a designated location
+"""
+class DiskWriter(Writer):
+	def __init__(self,**params):
+		if 'path' in params:
+			self.path = params['path']
+		else:
+			self.path = None
+		if 'name' in params:
+			self.name = params['name'];
+		else:
+			self.name = None
+		if os.path.exists(self.path) == False:
+			os.mkdir(self.path)
+	"""
+		This function determines if the class is ready for execution or not
+		i.e it determines if the preconditions of met prior execution
+	"""
+	def isready(self):
+		
+		p =  self.path is not None and os.path.exists(self.path)
+		q = self.name is not None 
+		return p and q
+	"""
+		This function writes a record to a designated file
+		@param	label	<passed|broken|fixed|stats>
+		@param	row	row to be written
+	"""
+	def write(self,**params):
+		label 	= params['label']
+		row 	= params['row']
+		xchar = None
+		if 'xchar' is not None:
+			xchar 	= params['xchar']
+		path = ''.join([self.path,os.sep,label])
+		if os.path.exists(path) == False:
+			os.mkdir(path) ;
+		path = ''.join([path,os.sep,self.name]) 
+		f = open(path,'a')
+		row = self.format(row,xchar);
+		f.write(row)
+		f.close()
+"""
+	This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)
+"""
+class MessageQueue:
+	def __init__(self,**params):
+		self.host= params['host']
+		self.uid = params['uid']
+		self.qid = params['qid']
+	
+	def isready(self):
+		#self.init()
+		resp =  self.connection is not None and self.connection.is_open
+		self.close()
+		return resp
+	def close(self):
+            if self.connection.is_closed == False :
+		self.channel.close()
+		self.connection.close()
+"""
+	This class is designed to publish content to an AMQP (Rabbitmq)
+	The class will rely on pika to implement this functionality
+
+	We will publish information to a given queue for a given exchange
+"""
+
+class QueueWriter(MessageQueue,Writer):
+	def __init__(self,**params):
+		#self.host= params['host']
+		#self.uid = params['uid']
+		#self.qid = params['queue']
+		MessageQueue.__init__(self,**params);
+		
+		
+	def init(self,label=None):
+		properties = pika.ConnectionParameters(host=self.host)
+		self.connection = pika.BlockingConnection(properties)
+		self.channel	= self.connection.channel()
+		self.info = self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
+		if label is None:
+			self.qhandler = self.channel.queue_declare(queue=self.qid,durable=True)	
+		else:
+			self.qhandler = self.channel.queue_declare(queue=label,durable=True)
+		
+		self.channel.queue_bind(exchange=self.uid,queue=self.qhandler.method.queue) 
+		
+
+
+	"""
+		This function writes a stream of data to the a given queue
+		@param object	object to be written (will be converted to JSON)
+		@TODO: make this less chatty
+	"""
+	def write(self,**params):
+		xchar = None
+		if  'xchar' in params:
+			xchar = params['xchar']
+		object = self.format(params['row'],xchar)
+		
+		label	= params['label']
+		self.init(label)
+		_mode = 2
+		if isinstance(object,str):
+			stream = object
+			_type = 'text/plain'
+		else:
+			stream = json.dumps(object)
+			if 'type' in params :
+				_type = params['type']
+			else:
+				_type = 'application/json'
+
+		self.channel.basic_publish(
+			exchange=self.uid,
+			routing_key=label,
+			body=stream,
+			properties=pika.BasicProperties(content_type=_type,delivery_mode=_mode)
+		);
+		self.close()
+
+	def flush(self,label):
+		self.init(label)
+		_mode = 1  #-- Non persistent
+		self.channel.queue_delete( queue=label);
+		self.close()
+		
+"""
+	This class will read from a queue provided an exchange, queue and host
+	@TODO: Account for security and virtualhosts
+"""
+class QueueReader(MessageQueue,Reader):
+	"""
+		@param	host	host
+		@param	uid	exchange identifier
+		@param	qid	queue identifier
+	"""
+	def __init__(self,**params):
+		#self.host= params['host']
+		#self.uid = params['uid']
+		#self.qid = params['qid']
+		MessageQueue.__init__(self,**params);
+		if 'durable' in params :
+			self.durable = True
+		else:
+			self.durable = False
+		self.size = -1
+		self.data = {}
+	def init(self,qid):
+		
+		properties = pika.ConnectionParameters(host=self.host)
+		self.connection = pika.BlockingConnection(properties)
+		self.channel	= self.connection.channel()
+		self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
+
+		self.info = self.channel.queue_declare(queue=qid,durable=True)
+	
+
+
+	"""
+		This is the callback function designed to process the data stream from the queue
+
+	"""
+	def callback(self,channel,method,header,stream):
+                
+		r = []
+		if re.match("^\{|\[",stream) is not None:
+			r = json.loads(stream)
+		else:
+			
+			r = stream
+		
+		qid = self.info.method.queue
+		if qid not in self.data :
+			self.data[qid] = []
+		
+		self.data[qid].append(r)
+		#
+		# We stop reading when the all the messages of the queue are staked
+		#
+		if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count:		
+			self.close()
+
+	"""
+		This function will read, the first message from a queue
+		@TODO: 
+		Implement channel.basic_get in order to retrieve a single message at a time
+		Have the number of messages retrieved be specified by size (parameter)
+	"""
+	def read(self,size=-1):
+		r = {}
+		self.size = size
+		#
+		# We enabled the reader to be able to read from several queues (sequentially for now)
+		# The qid parameter will be an array of queues the reader will be reading from
+		#
+		if isinstance(self.qid,basestring) :
+                    self.qid = [self.qid]
+		for qid in self.qid:
+			self.init(qid)
+			# r[qid] = []
+			
+			if self.info.method.message_count > 0:
+				
+				self.channel.basic_consume(self.callback,queue=qid,no_ack=False);
+				self.channel.start_consuming()
+			else:
+				
+				pass
+				#self.close()
+			# r[qid].append( self.data)
+		
+		return self.data
+class QueueListener(QueueReader):
+	def init(self,qid):
+		properties = pika.ConnectionParameters(host=self.host)
+		self.connection = pika.BlockingConnection(properties)
+		self.channel	= self.connection.channel()
+		self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True )
+
+		self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid)
+		
+		self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid)
+		#self.callback = callback
+	def read(self):
+    	
+		self.init(self.qid)
+		self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True);
+		self.channel.start_consuming()
+    		
+"""
+	This class is designed to write output as sql insert statements
+	The class will inherit from DiskWriter with minor adjustments
+	@TODO: Include script to create the table if need be using the upper bound of a learner
+"""
+class SQLDiskWriter(DiskWriter):
+	def __init__(self,**args):
+		DiskWriter.__init__(self,**args)
+		self.tablename = re.sub('\..+$','',self.name).replace(' ','_')
+	"""
+		@param label
+		@param row
+		@param xchar
+	"""
+	def write(self,**args):
+		label	= args['label']
+		row = args['row']
+		
+		if label == 'usable':
+			values = "','".join([col.replace('"','').replace("'",'') for col in row])
+			row = "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)
+
+			args['row']  = row
+		DiskWriter.write(self,**args)
+class Couchdb:
+	"""
+		@param	uri		host & port reference
+		@param	uid		user id involved
+
+		@param	dbname		database name (target)
+	"""
+	def __init__(self,**args):
+		uri 		= args['uri']
+		self.uid 	= args['uid']
+		dbname		= args['dbname']
+		self.server 	= Server(uri=uri) 
+		self.dbase	= self.server.get_db(dbname)
+		if self.dbase.doc_exist(self.uid) == False:
+			self.dbase.save_doc({"_id":self.uid})
+	"""
+		Insuring the preconditions are met for processing
+	"""
+	def isready(self):
+		p = self.server.info() != {}
+		if p == False or self.dbase.dbname not in self.server.all_dbs():
+			return False
+		#
+		# At this point we are sure that the server is connected
+		# We are also sure that the database actually exists
+		#
+		q = self.dbase.doc_exist(self.uid)
+		if q == False:
+			return False
+		return True
+	def view(self,id,**args):
+		r =self.dbase.view(id,**args)
+		r = r.all()		
+		return r[0]['value'] if len(r) > 0 else []
+		
+"""
+	This function will read an attachment from couchdb and return it to calling code. The attachment must have been placed before hand (otherwise oops)
+	@T: Account for security & access control
+"""
+class CouchdbReader(Couchdb,Reader):
+	"""
+		@param	filename	filename (attachment)
+	"""
+	def __init__(self,**args):
+		#
+		# setting the basic parameters for 
+		Couchdb.__init__(self,**args)
+		if 'filename' in args :
+			self.filename 	= args['filename']
+		else:
+			self.filename = None
+
+	def isready(self):
+		#
+		# Is the basic information about the database valid
+		#
+		p = Couchdb.isready(self)
+		
+		if p == False:
+			return False
+		#
+		# The database name is set and correct at this point
+		# We insure the document of the given user has the requested attachment.
+		# 
+		
+		doc = self.dbase.get(self.uid)
+		
+		if '_attachments' in doc:
+			r = self.filename in doc['_attachments'].keys()
+			
+		else:
+			r = False
+		
+		return r	
+	def stream(self):
+		content = self.dbase.fetch_attachment(self.uid,self.filename).split('\n') ;
+		i = 1
+		for row in content:
+			yield row
+			if size > 0 and i == size:
+				break
+			i = i + 1
+		
+	def read(self,size=-1):
+		if self.filename is not None:
+			self.stream()
+		else:
+			return self.basic_read()
+	def basic_read(self):
+		document = self.dbase.get(self.uid) 
+		del document['_id'], document['_rev']
+		return document
+"""
+	This class will write on a couchdb document provided a scope
+	The scope is the attribute that will be on the couchdb document
+"""
+class CouchdbWriter(Couchdb,Writer):		
+	"""
+		@param	uri		host & port reference
+		@param	uid		user id involved
+		@param	filename	filename (attachment)
+		@param	dbname		database name (target)
+	"""
+	def __init__(self,**args):
+
+		Couchdb.__init__(self,**args)
+		uri 		= args['uri']
+		self.uid 	= args['uid']
+		if 'filename' in args:
+			self.filename 	= args['filename']
+		else:
+			self.filename = None
+		dbname		= args['dbname']
+		self.server 	= Server(uri=uri) 
+		self.dbase	= self.server.get_db(dbname)
+		#
+		# If the document doesn't exist then we should create it
+		#
+
+	"""
+		write a given attribute to a document database
+		@param	label	scope of the row repair|broken|fixed|stats
+		@param	row	row to be written
+	"""
+	def write(self,**params):
+		
+		document = self.dbase.get(self.uid)
+		label = params['label']
+
+		
+		if 'row' in params :
+			row	= params['row']
+			row_is_list	= isinstance(row,list)
+			if label not in document :
+				document[label] = row if row_is_list else [row]
+			
+			elif isinstance(document[label][0],list) :
+				document[label].append(row)
+			else:
+				document[label] += row
+		else :
+			if label not in document :
+				document[label] = {}
+			if isinstance(params['data'],object) :
+				
+				document[label] = dict(document[label],**params['data'])
+			else:
+				document[label] = params['data']
+			
+		# if label not in document :
+		# 	document[label] = [] if isinstance(row,list) else {}
+		# if isinstance(document[label],list):
+		# 	document[label].append(row)
+		# else :
+		# 	document[label] = dict(document[label],**row)
+		self.dbase.save_doc(document)
+	def flush(self,**params) :
+		
+		size = params['size'] if 'size' in params else 0
+		has_changed = False	
+		document = self.dbase.get(self.uid)
+		for key in document:
+			if key not in ['_id','_rev','_attachments'] :
+				content = document[key]
+			else:
+				continue
+			if isinstance(content,list) and size > 0:
+				index = len(content) - size
+				content = content[index:]
+				document[key] = content
+				
+			else:
+				document[key] = {}
+				has_changed = True
+		
+		self.dbase.save_doc(document)
+			
+	def archive(self,params=None):
+		document = self.dbase.get(self.uid)
+		content = {}
+		_doc = {}
+		for id in document:
+			if id in ['_id','_rev','_attachments'] :
+				_doc[id] = document[id]
+			else:
+				content[id] = document[id]
+				
+		content = json.dumps(content)	
+		document= _doc
+		now = str(datetime.today())
+		
+		name = '-'.join([document['_id'] , now,'.json'])			
+		self.dbase.save_doc(document)
+		self.dbase.put_attachment(document,content,name,'application/json')
+"""
+	This class acts as a factory to be able to generate an instance of a Reader/Writer
+	Against a Queue,Disk,Cloud,Couchdb 
+	The class doesn't enforce parameter validation, thus any error with the parameters sent will result in a null Object
+"""
+class DataSourceFactory:
+	def instance(self,**args):
+		source = args['type']		
+		params = args['args']
+		anObject = None
+		
+		if source in ['HttpRequestReader','HttpSessionWriter']:
+			#
+			# @TODO: Make sure objects are serializable, be smart about them !!
+			#
+			aClassName = ''.join([source,'(**params)'])
+
+
+		else:
+			
+			stream = json.dumps(params)
+			aClassName = ''.join([source,'(**',stream,')'])
+		try:
+			
+			
+			anObject = eval( aClassName)
+			#setattr(anObject,'name',source)
+		except Exception,e:
+			print ['Error ',e]
+		return anObject
+"""
+	This class implements a data-source handler that is intended to be used within the context of data processing, it allows to read/write anywhere transparently.
+	The class is a facade to a heterogeneous class hierarchy and thus simplifies how the calling code interacts with the class hierarchy
+"""
+class DataSource:
+	def __init__(self,sourceType='Disk',outputType='Disk',params={}):
+		self.Input = DataSourceFactory.instance(type=sourceType,args=params)
+		self.Output= DataSourceFactory.instance(type=outputType,args=params)
+	def read(self,size=-1):
+		return self.Input.read(size)
+	def write(self,**args):
+		self.Output.write(**args)
+#p = {}
+#p['host'] = 'dev.the-phi.com'
+#p['uid'] = 'nyemba@gmail.com'
+#p['qid'] = 'repair'
+#factory = DataSourceFactory()
+#o =  factory.instance(type='QueueReader',args=p)		
+#print o is None
+#q = QueueWriter(host='dev.the-phi.com',uid='nyemba@gmail.com')
+#q.write(object='steve')
+#q.write(object='nyemba')
+#q.write(object='elon')
+
+