瀏覽代碼

TR - added s3 handling (reader)

Steve L. Nyemba 8 年之前
父節點
當前提交
3e77175d06
共有 1 個文件被更改,包括 83 次插入13 次删除
  1. 83 13
      transport.py

+ 83 - 13
transport.py

@@ -11,6 +11,9 @@ from couchdbkit import Server
 import re
 from csv import reader
 from datetime import datetime
+import boto
+import botocore
+from smart_open import smart_open
 """
 	@TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data
 """
@@ -629,12 +632,81 @@ class CouchdbWriter(Couchdb,Writer):
 		name = '-'.join([document['_id'] , now,'.json'])			
 		self.dbase.save_doc(document)
 		self.dbase.put_attachment(document,content,name,'application/json')
+class s3 :
+        """
+		@TODO: Implement a search function for a file given a bucket??
+	"""
+        def __init__(self,args) :
+        	"""
+			This function will extract a file or set of files from s3 bucket provided
+			@param access_key
+			@param secret_key
+			@param path		location of the file
+			@param filter		filename or filtering elements
+		"""
+		try:
+			self.s3 = boto.connect_s3(args['access_key'],args['secret_key'])
+			self.bucket = self.s3.get_bucket(args['bucket']) if 'bucket' in args else None
+			# self.path = args['path']
+			self.filter = args['filter'] if 'filter' in args else None
+			self.filename = args['file'] if 'file' in args else None
+			
+		except Exception as e :
+			self.s3 = None
+			self.bucket = None
+			print e
+	def buckets(self):
+        	"""
+			This function is a wrapper around the bucket list of buckets for s3
+
+		"""
+		return self.s3.get_all_buckets()
+		
+        	
+class s3Reader(s3,Reader) :
+        """	
+		Because s3 contains buckets and files, reading becomes a tricky proposition :
+		- list files		if file is None
+		- stream content	if file is Not None
+		@TODO: support read from all buckets, think about it
+	"""
+	def __init__(self,args) :
+        	s3.__init__(self,args)
+	def files(self):
+		r = []
+        	try:
+			return [item.name for item in self.bucket if item.size > 0]
+		except Exception as e:
+			pass
+		return r
+	def stream(self,limit=-1):
+        	"""
+			At this point we should stream a file from a given bucket
+		"""
+		key = self.bucket.get_key(self.filename.strip())
+		if key is None :
+        		yield None
+		else:
+        		count = 0
+			with smart_open(key) as remote_file:
+				for line in remote_file:
+        				if count == limit and limit > 0 :
+        					break
+					yield line
+					count += 1
+	def read(self,limit=-1) :
+        	if self.filename is None :
+        		# 
+			# returning the list of files because no one file was specified.
+        		return self.files()
+		else:
+        		return self.stream(10)
 """
 	This class acts as a factory to be able to generate an instance of a Reader/Writer
 	Against a Queue,Disk,Cloud,Couchdb 
 	The class doesn't enforce parameter validation, thus any error with the parameters sent will result in a null Object
 """
-class DataSourceFactory:
+class Factory:
 	def instance(self,**args):
 		source = args['type']		
 		params = args['args']
@@ -659,6 +731,10 @@ class DataSourceFactory:
 		except Exception,e:
 			print ['Error ',e]
 		return anObject
+class s3Writer(s3,Writer) :
+        def __init__(self,args) :
+        	s3.__init__(self,args)
+	
 """
 	This class implements a data-source handler that is intended to be used within the context of data processing, it allows to read/write anywhere transparently.
 	The class is a facade to a heterogeneous class hierarchy and thus simplifies how the calling code interacts with the class hierarchy
@@ -671,16 +747,10 @@ class DataSource:
 		return self.Input.read(size)
 	def write(self,**args):
 		self.Output.write(**args)
-#p = {}
-#p['host'] = 'dev.the-phi.com'
-#p['uid'] = 'nyemba@gmail.com'
-#p['qid'] = 'repair'
-#factory = DataSourceFactory()
-#o =  factory.instance(type='QueueReader',args=p)		
-#print o is None
-#q = QueueWriter(host='dev.the-phi.com',uid='nyemba@gmail.com')
-#q.write(object='steve')
-#q.write(object='nyemba')
-#q.write(object='elon')
-
+# conf = json.loads(open('conf.json').read())
+# x = s3Reader( dict(conf,**{'bucket':'com.phi.sample.data','file':'Sample-Spreadsheet-5000-rows.csv'}))
 
+# r = x.read()
+# for item in r :
+# 	print item
+#print buckets[1].get_key('Sample-Spreadsheet-5000-rows.csv')