Browse Source

data transport framework for rabbitmq, mongodb, couchdb, ...

Steve Nyemba 5 years ago
parent
commit
0b15351b8e

+ 19 - 0
setup.py

@@ -0,0 +1,19 @@
+"""
+This is a build file for the 
+"""
+from setuptools import setup, find_packages
+ 
+setup(
+    name = "data-transport",
+    version = "1.0",
+    author = "The Phi Technology LLC",
+    author_email = "steve@the-phi.com",
+    license = "MIT",
+    packages=['transport'],
+    install_requires = ['pymongo','numpy','cloudant','pika','boto','flask-session','smart_open'],
+
+    use_2to3=True,
+    convert_2to3_doctests=['src/your/module/README.txt'],
+    use_2to3_fixers=['your.fixers'],
+    use_2to3_exclude_fixers=['lib2to3.fixes.fix_import'],
+    )

+ 209 - 0
transport/__init__.py

@@ -0,0 +1,209 @@
+"""
+Data Transport - 1.0
+Steve L. Nyemba, The Phi Technology LLC
+
+This module is designed to serve as a wrapper to a set of supported data stores :
+    - couchdb
+    - mongodb
+    - Files (character delimited)
+    - Queues (RabbmitMq)
+    - Session (Flask)
+    - s3
+The supported operations are read/write and providing meta data to the calling code
+Requirements :
+	pymongo
+	boto
+	couldant
+The configuration for the data-store is as follows :
+	couchdb:
+		{
+			args:{
+				url:<url>,
+				username:<username>,
+				password:<password>,
+				dbname:<database>,
+				uid:<document id>
+			}
+		}
+	RabbitMQ:
+		{
+			
+		}
+	Mongodb:
+	{
+		args:{
+			host:<url>, #localhost:27017
+			username:<username>,
+			password:<password>,
+			dbname:<database>,
+			uid:<document id>s
+
+		}
+	}
+"""
+__author__ = 'The Phi Technology'
+import numpy as np
+import json
+import importlib 
+from common import Reader, Writer #, factory
+# import disk
+# import queue
+# import couch
+# import mongo
+# import s3
+class factory :
+	@staticmethod
+	def instance(**args):
+		"""
+		This class will create an instance of a transport when providing 
+		:type	name of the type we are trying to create
+		:args	The arguments needed to create the instance
+		"""
+		source = args['type']		
+		params = args['args']
+		anObject = None
+		
+		if source in ['HttpRequestReader','HttpSessionWriter']:
+			#
+			# @TODO: Make sure objects are serializable, be smart about them !!
+			#
+			aClassName = ''.join([source,'(**params)'])
+
+
+		else:
+			
+			stream = json.dumps(params)
+			aClassName = ''.join([source,'(**',stream,')'])
+		try:
+			anObject = eval( aClassName)
+			#setattr(anObject,'name',source)
+		except Exception,e:
+			print ['Error ',e]
+		return anObject
+
+# class Reader:
+# 	def __init__(self):
+# 		self.nrows = 0
+# 		self.xchar = None
+		
+# 	def row_count(self):		
+# 		content = self.read()
+# 		return np.sum([1 for row in content])
+# 	def delimiter(self,sample):
+# 		"""
+# 			This function determines the most common delimiter from a subset of possible delimiters. 
+# 			It uses a statistical approach (distribution) to guage the distribution of columns for a given delimiter
+			
+# 			:sample sample  string/content expecting matrix i.e list of rows
+# 		"""
+		
+# 		m = {',':[],'\t':[],'|':[],'\x3A':[]} 
+# 		delim = m.keys()
+# 		for row in sample:
+# 			for xchar in delim:
+# 				if row.split(xchar) > 1:	
+# 					m[xchar].append(len(row.split(xchar)))
+# 				else:
+# 					m[xchar].append(0)
+				
+				
+					
+# 		#
+# 		# The delimiter with the smallest variance, provided the mean is greater than 1
+# 		# This would be troublesome if there many broken records sampled
+# 		#
+# 		m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1}
+# 		index = m.values().index( min(m.values()))
+# 		xchar = m.keys()[index]
+		
+# 		return xchar
+# 	def col_count(self,sample):
+# 		"""
+# 		This function retirms the number of columns of a given sample
+# 		@pre self.xchar is not None
+# 		"""
+		
+# 		m = {}
+# 		i = 0
+		
+# 		for row in sample:
+# 			row = self.format(row)
+# 			id = str(len(row))
+# 			#id = str(len(row.split(self.xchar))) 
+			
+# 			if id not in m:
+# 				m[id] = 0
+# 			m[id] = m[id] + 1
+		
+# 		index = m.values().index( max(m.values()) )
+# 		ncols = int(m.keys()[index])
+		
+		
+# 		return ncols;
+# 	def format (self,row):
+# 		"""
+# 			This function will clean records of a given row by removing non-ascii characters
+# 			@pre self.xchar is not None
+# 		"""
+		
+# 		if isinstance(row,list) == False:
+# 			#
+# 			# We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary)
+# 			cols = self.split(row)
+# 			#cols = row.split(self.xchar)
+# 		else:
+# 			cols = row ;
+# 		return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols]
+		
+# 	def split (self,row):
+# 		"""
+# 			This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes.
+# 			@pre : self.xchar is not None
+# 		""" 
+
+# 		pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"])
+# 		return re.findall(pattern,row.replace('\n',''))
+
+
+# class Writer:
+	
+# 	def format(self,row,xchar):
+# 		if xchar is not None and isinstance(row,list):
+# 			return xchar.join(row)+'\n'
+# 		elif xchar is None and isinstance(row,dict):
+# 			row = json.dumps(row)
+# 		return row
+# 	"""
+# 		It is important to be able to archive data so as to insure that growth is controlled
+# 		Nothing in nature grows indefinitely neither should data being handled.
+# 	"""
+# 	def archive(self):
+# 		pass
+# 	def flush(self):
+# 		pass
+
+# class factory :
+# 	@staticmethod
+# 	def instance(**args):
+		
+# 		source = args['type']		
+# 		params = args['args']
+# 		anObject = None
+		
+# 		if source in ['HttpRequestReader','HttpSessionWriter']:
+# 			#
+# 			# @TODO: Make sure objects are serializable, be smart about them !!
+# 			#
+# 			aClassName = ''.join([source,'(**params)'])
+
+
+# 		else:
+			
+# 			stream = json.dumps(params)
+# 			aClassName = ''.join([source,'(**',stream,')'])
+# 		try:
+# 			anObject = eval( aClassName)
+# 			#setattr(anObject,'name',source)
+# 		except Exception,e:
+# 			print ['Error ',e]
+# 		return anObject

BIN
transport/__init__.pyc


+ 154 - 0
transport/common.py

@@ -0,0 +1,154 @@
+"""
+Data Transport - 1.0
+Steve L. Nyemba, The Phi Technology LLC
+
+This module is designed to serve as a wrapper to a set of supported data stores :
+    - couchdb
+    - mongodb
+    - Files (character delimited)
+    - Queues (RabbmitMq)
+    - Session (Flask)
+    - s3
+The supported operations are read/write and providing meta data to the calling code
+Requirements :
+	pymongo
+	boto
+	couldant
+
+"""
+__author__ = 'The Phi Technology'
+import numpy as np
+import json
+import importlib 
+# import couch
+# import mongo
+class Reader:
+	def __init__(self):
+		self.nrows = 0
+		self.xchar = None
+		
+	def row_count(self):		
+		content = self.read()
+		return np.sum([1 for row in content])
+	def delimiter(self,sample):
+		"""
+			This function determines the most common delimiter from a subset of possible delimiters. 
+			It uses a statistical approach (distribution) to guage the distribution of columns for a given delimiter
+			
+			:sample sample  string/content expecting matrix i.e list of rows
+		"""
+		
+		m = {',':[],'\t':[],'|':[],'\x3A':[]} 
+		delim = m.keys()
+		for row in sample:
+			for xchar in delim:
+				if row.split(xchar) > 1:	
+					m[xchar].append(len(row.split(xchar)))
+				else:
+					m[xchar].append(0)
+				
+				
+					
+		#
+		# The delimiter with the smallest variance, provided the mean is greater than 1
+		# This would be troublesome if there many broken records sampled
+		#
+		m = {id: np.var(m[id]) for id in m.keys() if m[id] != [] and int(np.mean(m[id]))>1}
+		index = m.values().index( min(m.values()))
+		xchar = m.keys()[index]
+		
+		return xchar
+	def col_count(self,sample):
+		"""
+		This function retirms the number of columns of a given sample
+		@pre self.xchar is not None
+		"""
+		
+		m = {}
+		i = 0
+		
+		for row in sample:
+			row = self.format(row)
+			id = str(len(row))
+			#id = str(len(row.split(self.xchar))) 
+			
+			if id not in m:
+				m[id] = 0
+			m[id] = m[id] + 1
+		
+		index = m.values().index( max(m.values()) )
+		ncols = int(m.keys()[index])
+		
+		
+		return ncols;
+	def format (self,row):
+		"""
+			This function will clean records of a given row by removing non-ascii characters
+			@pre self.xchar is not None
+		"""
+		
+		if isinstance(row,list) == False:
+			#
+			# We've observed sometimes fields contain delimiter as a legitimate character, we need to be able to account for this and not tamper with the field values (unless necessary)
+			cols = self.split(row)
+			#cols = row.split(self.xchar)
+		else:
+			cols = row ;
+		return [ re.sub('[^\x00-\x7F,\n,\r,\v,\b,]',' ',col.strip()).strip().replace('"','') for col in cols]
+		
+	def split (self,row):
+		"""
+			This function performs a split of a record and tries to attempt to preserve the integrity of the data within i.e accounting for the double quotes.
+			@pre : self.xchar is not None
+		""" 
+
+		pattern = "".join(["(?:^|",self.xchar,")(\"(?:[^\"]+|\"\")*\"|[^",self.xchar,"]*)"])
+		return re.findall(pattern,row.replace('\n',''))
+
+
+class Writer:
+	
+	def format(self,row,xchar):
+		if xchar is not None and isinstance(row,list):
+			return xchar.join(row)+'\n'
+		elif xchar is None and isinstance(row,dict):
+			row = json.dumps(row)
+		return row
+	"""
+		It is important to be able to archive data so as to insure that growth is controlled
+		Nothing in nature grows indefinitely neither should data being handled.
+	"""
+	def archive(self):
+		pass
+	def flush(self):
+		pass
+
+# class factory :
+# 	@staticmethod
+# 	def instance(**args):
+# 		"""
+# 		This class will create an instance of a transport when providing 
+# 		:type	name of the type we are trying to create
+# 		:args	The arguments needed to create the instance
+# 		"""
+# 		source = args['type']		
+# 		params = args['args']
+# 		anObject = None
+		
+# 		if source in ['HttpRequestReader','HttpSessionWriter']:
+# 			#
+# 			# @TODO: Make sure objects are serializable, be smart about them !!
+# 			#
+# 			aClassName = ''.join([source,'(**params)'])
+
+
+# 		else:
+			
+# 			stream = json.dumps(params)
+# 			aClassName = ''.join([source,'(**',stream,')'])
+# 		try:
+# 			anObject = eval( aClassName)
+# 			#setattr(anObject,'name',source)
+# 		except Exception,e:
+# 			print ['Error ',e]
+# 		return anObject

BIN
transport/common.pyc


+ 199 - 0
transport/couch.py

@@ -0,0 +1,199 @@
+"""
+Data-Transport
+Steve L. Nyemba, The Phi Technology
+
+This file is a wrapper around couchdb using IBM Cloudant SDK that has an interface to couchdb
+
+"""
+import cloudant
+import json
+from common import Reader,Writer
+class Couch:
+	"""
+		@param	url		host & port reference
+		@param	uid		user id involved
+
+		@param	dbname		database name (target)
+	"""
+	def __init__(self,**args):
+		url 		= args['url']
+		self.uid 	= args['uid']
+		dbname		= args['dbname']
+		if 'username' not in args and 'password' not in args :
+			self.server 	= cloudant.CouchDB(url=url)
+		else:
+			self.server = cloudant.CouchDB(args['username'],args['password'],url=url)
+		self.server.connect()
+
+		if dbname in self.server.all_dbs() :
+			self.dbase	= self.server.get(dbname,dbname,True)
+			#
+			# @TODO Check if the database exists ...
+			#
+			doc = cloudant.document.Document(self.dbase,self.uid) #self.dbase.get(self.uid)
+			if not doc.exists():
+				doc = self.dbase.create_document({"_id":self.uid})
+				doc.save()
+		else:
+			self.dbase = None
+	"""
+		Insuring the preconditions are met for processing
+	"""
+	def isready(self):
+		p = self.server.metadata() != {}
+		if p == False or not self.dbase:
+			return False
+		#
+		# At this point we are sure that the server is connected
+		# We are also sure that the database actually exists
+		#
+		doc = cloudant.document.Document(self.dbase,self.uid)
+		# q = self.dbase.all_docs(key=self.uid)['rows'] 
+		# if not q :
+		if not doc.exists():
+			return False
+		return True
+	
+	def view(self,**args):
+		"""
+			We are executing a view
+			:id	design document _design/xxxx (provide full name with _design prefix)
+			:view_name	name of the view i.e 
+			:key	key to be used to filter the content
+		"""
+		document = cloudant.design_document.DesignDocument(self.dbase,args['id'])
+		document.fetch()
+		params = {'group_level':1,'group':True}
+		if 'key' in  args :
+			params ['key'] = args['key']
+		elif 'keys' in args :
+			params['keys'] = args['keys']
+		return document.get_view(args['view_name'])(**params)['rows']
+		
+		
+
+		
+class CouchReader(Couch,Reader):
+	"""
+		This function will read an attachment from couchdb and return it to calling code. The attachment must have been placed before hand (otherwise oops)
+		@T: Account for security & access control
+	"""
+	def __init__(self,**args):
+		"""
+			@param	filename	filename (attachment)
+		"""
+		#
+		# setting the basic parameters for 
+		Couch.__init__(self,**args)
+		if 'filename' in args :
+			self.filename 	= args['filename']
+		else:
+			self.filename = None
+
+	# def isready(self):
+	# 	#
+	# 	# Is the basic information about the database valid
+	# 	#
+	# 	p = Couchdb.isready(self)
+		
+	# 	if p == False:
+	# 		return False
+	# 	#
+	# 	# The database name is set and correct at this point
+	# 	# We insure the document of the given user has the requested attachment.
+	# 	# 
+		
+	# 	doc = self.dbase.get(self.uid)
+		
+	# 	if '_attachments' in doc:
+	# 		r = self.filename in doc['_attachments'].keys()
+			
+	# 	else:
+	# 		r = False
+		
+	# 	return r	
+	def stream(self):
+		#
+		# @TODO Need to get this working ...
+		#
+		document = cloudant.document.Document(self.dbase,self.uid)
+		# content = self.dbase.fetch_attachment(self.uid,self.filename).split('\n') ;
+		content = self.get_attachment(self.filename)
+		for row in content:
+			yield row
+		
+	def read(self,size=-1):
+		if self.filename is not None:
+			self.stream()
+		else:
+			return self.basic_read()
+	def basic_read(self):
+		document = cloudant.document.Document(self.dbase,self.uid)
+		
+		# document = self.dbase.get(self.uid)
+		if document.exists() :			
+			document.fetch()
+			document = dict(document)
+			del document['_rev']
+		else:
+			document = {}
+		return document
+
+class CouchWriter(Couch,Writer):		
+	"""
+		This class will write on a couchdb document provided a scope
+		The scope is the attribute that will be on the couchdb document
+	"""
+	def __init__(self,**args):
+		"""
+			@param	uri		host & port reference
+			@param	uid		user id involved
+			@param	filename	filename (attachment)
+			@param	dbname		database name (target)
+		"""
+
+		Couch.__init__(self,**args)
+
+	def write(self,**params):
+		"""
+			write a given attribute to a document database
+			@param	label	scope of the row repair|broken|fixed|stats
+			@param	row	row to be written
+		"""
+		
+		# document = self.dbase.get(self.uid)
+		document = cloudant.document.Document(self.dbase,self.uid) #.get(self.uid)
+		if document.exists() is False :
+			document = self.dbase.create_document({"_id":self.uid})
+		label = params['label']
+		row	= params['row']
+		if label not in document :
+			document[label] = []
+		document[label].append(row)
+		document.save()
+		# self.dbase.bulk_docs([document])
+		# self.dbase.save_doc(document)
+			
+	def archive(self,params=None):
+		"""
+		This function will archive the document onto itself. 		
+		"""
+		# document = self.dbase.all_docs(self.uid,include_docs=True)
+		document = cloudant.document.Document(self.dbase,self.filename)
+		document.fetch()
+		content = {}
+		# _doc = {}
+		for id in document:
+			if  id not in ['_id','_rev','_attachments'] :
+				content[id] = document[id]
+				del document[id]
+				
+		content = json.dumps(content)	
+		# document= _doc
+		now = str(datetime.today())
+		
+		name = '-'.join([document['_id'] , now,'.json'])			
+		# self.dbase.bulk_docs([document])
+		# self.dbase.put_attachment(document,content,name,'application/json')
+		document.put_attachment(self.dbase,name,'application/json',content)
+		document.save()

BIN
transport/couch.pyc


BIN
transport/couchdb.pyc


+ 82 - 0
transport/disk.py

@@ -0,0 +1,82 @@
+import os
+from .__init__ import Reader,Writer
+import json
+
+class DiskReader(Reader) :
+	"""
+	This class is designed to read data from disk (location on hard drive)
+	@pre : isready() == True
+	"""
+
+	def __init__(self,**params):
+		"""
+			@param	path	absolute path of the file to be read
+		"""
+
+		Reader.__init__(self)
+		self.path = params['path'] ;
+
+	def isready(self):
+		return os.path.exists(self.path) 
+	def read(self,size=-1):
+		"""
+			This function reads the rows from a designated location on disk
+			@param	size	number of rows to be read, -1 suggests all rows
+		"""
+
+		f = open(self.path,'rU') 
+		i = 1
+		for row in f:
+			
+			i += 1
+			if size == i:
+				break
+			yield row
+		f.close()
+class DiskWriter(Writer):
+	"""
+		This function writes output to disk in a designated location
+	"""
+
+	def __init__(self,**params):
+		if 'path' in params:
+			self.path = params['path']
+		else:
+			self.path = None
+		if 'name' in params:
+			self.name = params['name'];
+		else:
+			self.name = None
+		if os.path.exists(self.path) == False:
+			os.mkdir(self.path)
+
+	def isready(self):
+		"""
+			This function determines if the class is ready for execution or not
+			i.e it determines if the preconditions of met prior execution
+		"""
+		
+		p =  self.path is not None and os.path.exists(self.path)
+		q = self.name is not None 
+		return p and q
+	def write(self,**params):
+		"""
+			This function writes a record to a designated file
+			@param	label	<passed|broken|fixed|stats>
+			@param	row	row to be written
+		"""
+
+		label 	= params['label']
+		row 	= params['row']
+		xchar = None
+		if 'xchar' is not None:
+			xchar 	= params['xchar']
+		path = ''.join([self.path,os.sep,label])
+		if os.path.exists(path) == False:
+			os.mkdir(path) ;
+		path = ''.join([path,os.sep,self.name]) 
+		f = open(path,'a')
+		row = self.format(row,xchar);
+		f.write(row)
+		f.close()
+		

BIN
transport/disk.pyc


+ 66 - 0
transport/mongo.py

@@ -0,0 +1,66 @@
+from pymongo import MongoClient
+# from transport import Reader,Writer
+from common import Reader, Writer
+import json
+class Mongo :
+    """
+    Basic mongodb functions are captured here
+    """
+    def __init__(self,**args):
+        """
+            :dbname     database name/identifier
+            :host   host and port of the database
+            :username   username for authentication
+            :password   password for current user
+        """
+        host = args['host']
+        
+        if 'user' in args and 'password' in args:        
+            self.client = MongoClient(host,
+                      username=args['username'] ,
+                      password=args['password'] ,
+                      authMechanism='SCRAM-SHA-256')
+        else:
+            self.client = MongoClient()                    
+        
+        self.uid    = args['uid']  #-- document identifier
+        self.dbname = args['dbname']
+        self.db = self.client[self.dbname]
+        
+    def isready(self):
+        p = self.dbname in self.client.list_database_names() 
+        q = self.uid in self.client[self.dbname].list_collection_names()
+        return p and q
+
+class MongoReader(Mongo,Reader):
+    """
+    This class will read from a mongodb data store and return the content of a document (not a collection)
+    """
+    def __init__(self,**args):
+        Mongo.__init__(self,**args)
+    def read(self,size=-1):
+        collection = self.db[self.uid]
+        return collection.find({})
+    def view(self,**args):
+        """
+        This function is designed to execute a view (map/reduce) operation
+        """
+        pass
+class MongoWriter(Mongo,Writer):
+    """
+    This class is designed to write to a mongodb collection within a database
+    """
+    def __init__(self,**args):
+        Mongo.__init__(self,**args)
+    def write(self,**args):
+        # document  = self.db[self.uid].find()
+        collection = self.db[self.uid]
+        collection.update_one()
+        self.db[self.uid].insert_one(args['row'])
+    def set(self,document):
+        collection = self.db[self.uid]
+        if collection.count_document() > 0 :
+            collection.delete({_id:self.uid})
+        
+        collecton.update_one({"_id":self.uid},document,True)
+

BIN
transport/mongo.pyc


+ 200 - 0
transport/queue.py

@@ -0,0 +1,200 @@
+import pika
+from datetime import datetime
+import re
+import json
+import os
+from common import Reader, Writer
+import json
+
+class MessageQueue:
+	"""
+		This class hierarchy is designed to handle interactions with a queue server using pika framework (our tests are based on rabbitmq)
+		:host	
+		:uid	identifier of the exchange
+		:qid	identifier of the queue
+	"""
+	def __init__(self,**params):
+		self.host= params['host']
+		self.uid = params['uid']
+		self.qid = params['qid']
+	
+	def isready(self):
+		#self.init()
+		resp =  self.connection is not None and self.connection.is_open
+		self.close()
+		return resp
+	def close(self):
+            if self.connection.is_closed == False :
+		self.channel.close()
+		self.connection.close()
+
+class QueueWriter(MessageQueue,Writer):
+	"""
+		This class is designed to publish content to an AMQP (Rabbitmq)
+		The class will rely on pika to implement this functionality
+
+		We will publish information to a given queue for a given exchange
+	"""
+	def __init__(self,**params):
+		#self.host= params['host']
+		#self.uid = params['uid']
+		#self.qid = params['queue']
+		MessageQueue.__init__(self,**params);
+		
+		
+	def init(self,label=None):
+		properties = pika.ConnectionParameters(host=self.host)
+		self.connection = pika.BlockingConnection(properties)
+		self.channel	= self.connection.channel()
+		self.info = self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
+		if label is None:
+			self.qhandler = self.channel.queue_declare(queue=self.qid,durable=True)	
+		else:
+			self.qhandler = self.channel.queue_declare(queue=label,durable=True)
+		
+		self.channel.queue_bind(exchange=self.uid,queue=self.qhandler.method.queue) 
+		
+
+
+	"""
+		This function writes a stream of data to the a given queue
+		@param object	object to be written (will be converted to JSON)
+		@TODO: make this less chatty
+	"""
+	def write(self,**params):
+		xchar = None
+		if  'xchar' in params:
+			xchar = params['xchar']
+		object = self.format(params['row'],xchar)
+		
+		label	= params['label']
+		self.init(label)
+		_mode = 2
+		if isinstance(object,str):
+			stream = object
+			_type = 'text/plain'
+		else:
+			stream = json.dumps(object)
+			if 'type' in params :
+				_type = params['type']
+			else:
+				_type = 'application/json'
+
+		self.channel.basic_publish(
+			exchange=self.uid,
+			routing_key=label,
+			body=stream,
+			properties=pika.BasicProperties(content_type=_type,delivery_mode=_mode)
+		);
+		self.close()
+
+	def flush(self,label):
+		self.init(label)
+		_mode = 1  #-- Non persistent
+		self.channel.queue_delete( queue=label);
+		self.close()
+		
+class QueueReader(MessageQueue,Reader):
+	"""
+	This class will read from a queue provided an exchange, queue and host
+	@TODO: Account for security and virtualhosts
+	"""
+
+	def __init__(self,**params):
+		"""
+			@param	host	host
+			@param	uid	exchange identifier
+			@param	qid	queue identifier
+		"""
+
+		#self.host= params['host']
+		#self.uid = params['uid']
+		#self.qid = params['qid']
+		MessageQueue.__init__(self,**params);
+		if 'durable' in params :
+			self.durable = True
+		else:
+			self.durable = False
+		self.size = -1
+		self.data = {}
+	def init(self,qid):
+		
+		properties = pika.ConnectionParameters(host=self.host)
+		self.connection = pika.BlockingConnection(properties)
+		self.channel	= self.connection.channel()
+		self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True)
+
+		self.info = self.channel.queue_declare(queue=qid,durable=True)
+	
+
+	def callback(self,channel,method,header,stream):
+		"""
+			This is the callback function designed to process the data stream from the queue
+
+		"""
+				
+		r = []
+		if re.match("^\{|\[",stream) is not None:
+			r = json.loads(stream)
+		else:
+			
+			r = stream
+		
+		qid = self.info.method.queue
+		if qid not in self.data :
+			self.data[qid] = []
+		
+		self.data[qid].append(r)
+		#
+		# We stop reading when the all the messages of the queue are staked
+		#
+		if self.size == len(self.data[qid]) or len(self.data[qid]) == self.info.method.message_count:		
+			self.close()
+
+	def read(self,size=-1):
+		"""
+		This function will read, the first message from a queue
+		@TODO: 
+			Implement channel.basic_get in order to retrieve a single message at a time
+			Have the number of messages retrieved be specified by size (parameter)
+		"""
+		r = {}
+		self.size = size
+		#
+		# We enabled the reader to be able to read from several queues (sequentially for now)
+		# The qid parameter will be an array of queues the reader will be reading from
+		#
+		if isinstance(self.qid,basestring) :
+					self.qid = [self.qid]
+		for qid in self.qid:
+			self.init(qid)
+			# r[qid] = []
+			
+			if self.info.method.message_count > 0:
+				
+				self.channel.basic_consume(self.callback,queue=qid,no_ack=False);
+				self.channel.start_consuming()
+			else:
+				
+				pass
+				#self.close()
+			# r[qid].append( self.data)
+
+		return self.data
+class QueueListener(QueueReader):
+	def init(self,qid):
+		properties = pika.ConnectionParameters(host=self.host)
+		self.connection = pika.BlockingConnection(properties)
+		self.channel	= self.connection.channel()
+		self.channel.exchange_declare(exchange=self.uid,type='direct',durable=True )
+
+		self.info = self.channel.queue_declare(passive=True,exclusive=True,queue=qid)
+		
+		self.channel.queue_bind(exchange=self.uid,queue=self.info.method.queue,routing_key=qid)
+		#self.callback = callback
+	def read(self):
+    	
+		self.init(self.qid)
+		self.channel.basic_consume(self.callback,queue=self.qid,no_ack=True);
+		self.channel.start_consuming()
+ 

BIN
transport/queue.pyc


+ 83 - 0
transport/s3.py

@@ -0,0 +1,83 @@
+from datetime import datetime
+import boto
+import botocore
+from smart_open import smart_open
+from common import Reader, Writer
+import json
+from common import Reader, Writer
+
+class s3 :
+	"""
+		@TODO: Implement a search function for a file given a bucket??
+	"""
+	def __init__(self,args) :
+		"""
+			This function will extract a file or set of files from s3 bucket provided
+			@param access_key
+			@param secret_key
+			@param path		location of the file
+			@param filter		filename or filtering elements
+		"""
+		try:
+			self.s3 = boto.connect_s3(args['access_key'],args['secret_key'])
+			self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None
+			# self.path = args['path']
+			self.filter = args['filter'] if 'filter' in args else None
+			self.filename = args['file'] if 'file' in args else None
+
+		except Exception as e :
+			self.s3 = None
+			self.bucket = None
+			print (e)
+
+	def buckets(self):
+		# def buckets(self):
+		pass
+		# """
+		# This function is a wrapper around the bucket list of buckets for s3
+		# """
+		# return self.s3.get_all_buckets()
+		
+        	
+class s3Reader(s3,Reader) :
+	"""	
+		Because s3 contains buckets and files, reading becomes a tricky proposition :
+		- list files		if file is None
+		- stream content	if file is Not None
+		@TODO: support read from all buckets, think about it
+	"""
+	def __init__(self,args) :
+			s3.__init__(self,args)
+	def files(self):
+		r = []
+		try:
+			return [item.name for item in self.bucket if item.size > 0]
+		except Exception as e:
+			pass
+		return r
+	def stream(self,limit=-1):
+		"""
+			At this point we should stream a file from a given bucket
+		"""
+		key = self.bucket.get_key(self.filename.strip())
+		if key is None :
+			yield None
+		else:
+			count = 0
+			with smart_open(key) as remote_file:
+				for line in remote_file:
+					if count == limit and limit > 0 :
+						break
+				yield line
+				count += 1
+	def read(self,limit=-1) :
+		if self.filename is None :
+			# 
+		# returning the list of files because no one file was specified.
+			return self.files()
+		else:
+			return self.stream(10)
+
+class s3Writer(s3,Writer) :
+        def __init__(self,args) :
+        	s3.__init__(self,args)

BIN
transport/s3.pyc


+ 66 - 0
transport/session.py

@@ -0,0 +1,66 @@
+from flask import request, session
+from datetime import datetime
+import re
+from common import Reader, Writer
+import json
+
+class HttpRequestReader(Reader):
+    """
+    This class is designed to read data from an Http request file handler provided to us by flask
+    The file will be heald in memory and processed accordingly
+    NOTE: This is inefficient and can crash a micro-instance (becareful)
+    """
+
+	def __init__(self,**params):
+		self.file_length = 0
+		try:
+			
+			#self.file = params['file']	
+			#self.file.seek(0, os.SEEK_END)
+			#self.file_length = self.file.tell()
+			
+			#print 'size of file ',self.file_length
+			self.content = params['file'].readlines()
+			self.file_length = len(self.content)
+		except Exception, e:
+			print "Error ... ",e
+			pass
+		
+	def isready(self):
+		return self.file_length > 0
+	def read(self,size =-1):
+		i = 1
+		for row in self.content:
+			i += 1
+			if size == i:
+				break
+			yield row
+		
+class HttpSessionWriter(Writer):
+    """
+        This class is designed to write data to a session/cookie
+    """
+	def __init__(self,**params):
+        """
+            @param key	required session key
+        """
+		self.session = params['queue']
+		self.session['sql'] = []
+		self.session['csv'] = []
+		self.tablename = re.sub('..+$','',params['filename'])
+		self.session['uid'] = params['uid']
+		#self.xchar = params['xchar']
+			
+		
+	def format_sql(self,row):
+		values = "','".join([col.replace('"','').replace("'",'') for col in row])
+		return "".join(["INSERT INTO :table VALUES('",values,"');\n"]).replace(':table',self.tablename)		
+	def isready(self):
+		return True
+	def write(self,**params):
+		label = params['label']
+		row = params ['row']
+		
+		if label == 'usable':
+			self.session['csv'].append(self.format(row,','))
+			self.session['sql'].append(self.format_sql(row))