Browse Source

Merge branch 's3' of steve/data-transport into master

Steve L. Nyemba 7 years ago
parent
commit
3443ec8da8
2 changed files with 121 additions and 14 deletions
  1. 35 0
      requirements.txt
  2. 86 14
      transport.py

+ 35 - 0
requirements.txt

@@ -0,0 +1,35 @@
+asn1crypto==0.23.0
+boto==2.48.0
+boto3==1.4.7
+botocore==1.7.17
+bz2file==0.98
+certifi==2017.7.27.1
+cffi==1.11.0
+chardet==3.0.4
+click==6.7
+couchdbkit==0.6.5
+cryptography==2.0.3
+docutils==0.14
+enum34==1.1.6
+Flask==0.12.2
+futures==3.1.1
+http-parser==0.8.3
+idna==2.6
+ipaddress==1.0.18
+itsdangerous==0.24
+Jinja2==2.9.6
+jmespath==0.9.3
+MarkupSafe==1.0
+numpy==1.13.1
+pika==0.11.0
+pycparser==2.18
+pyOpenSSL==17.3.0
+python-dateutil==2.6.1
+requests==2.18.4
+restkit==4.2.2
+s3transfer==0.1.11
+six==1.11.0
+smart-open==1.5.3
+socketpool==0.5.3
+urllib3==1.22
+Werkzeug==0.12.2

+ 86 - 14
transport.py

@@ -11,6 +11,9 @@ from couchdbkit import Server
 import re
 from csv import reader
 from datetime import datetime
+import boto
+import botocore
+from smart_open import smart_open
 """
 	@TODO: Write a process by which the class automatically handles reading and creating a preliminary sample and discovers the meta data
 """
@@ -629,12 +632,81 @@ class CouchdbWriter(Couchdb,Writer):
 		name = '-'.join([document['_id'] , now,'.json'])			
 		self.dbase.save_doc(document)
 		self.dbase.put_attachment(document,content,name,'application/json')
+class s3 :
+        """
+		@TODO: Implement a search function for a file given a bucket??
+	"""
+        def __init__(self,args) :
+        	"""
+			This function will extract a file or set of files from s3 bucket provided
+			@param access_key
+			@param secret_key
+			@param path		location of the file
+			@param filter		filename or filtering elements
+		"""
+		try:
+			self.s3 = boto.connect_s3(args['access_key'],args['secret_key'])
+			self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None
+			# self.path = args['path']
+			self.filter = args['filter'] if 'filter' in args else None
+			self.filename = args['file'] if 'file' in args else None
+			
+		except Exception as e :
+			self.s3 = None
+			self.bucket = None
+			print e
+	def buckets(self):
+        	"""
+			This function is a wrapper around the bucket list of buckets for s3
+
+		"""
+		return self.s3.get_all_buckets()
+		
+        	
+class s3Reader(s3,Reader) :
+        """	
+		Because s3 contains buckets and files, reading becomes a tricky proposition :
+		- list files		if file is None
+		- stream content	if file is Not None
+		@TODO: support read from all buckets, think about it
+	"""
+	def __init__(self,args) :
+        	s3.__init__(self,args)
+	def files(self):
+		r = []
+        	try:
+			return [item.name for item in self.bucket if item.size > 0]
+		except Exception as e:
+			pass
+		return r
+	def stream(self,limit=-1):
+        	"""
+			At this point we should stream a file from a given bucket
+		"""
+		key = self.bucket.get_key(self.filename.strip())
+		if key is None :
+        		yield None
+		else:
+        		count = 0
+			with smart_open(key) as remote_file:
+				for line in remote_file:
+        				if count == limit and limit > 0 :
+        					break
+					yield line
+					count += 1
+	def read(self,limit=-1) :
+        	if self.filename is None :
+        		# 
+			# returning the list of files because no one file was specified.
+        		return self.files()
+		else:
+        		return self.stream(10)
 """
 	This class acts as a factory to be able to generate an instance of a Reader/Writer
 	Against a Queue,Disk,Cloud,Couchdb 
 	The class doesn't enforce parameter validation, thus any error with the parameters sent will result in a null Object
 """
-class DataSourceFactory:
+class Factory:
 	def instance(self,**args):
 		source = args['type']		
 		params = args['args']
@@ -659,6 +731,10 @@ class DataSourceFactory:
 		except Exception,e:
 			print ['Error ',e]
 		return anObject
+class s3Writer(s3,Writer) :
+        def __init__(self,args) :
+        	s3.__init__(self,args)
+	
 """
 	This class implements a data-source handler that is intended to be used within the context of data processing, it allows to read/write anywhere transparently.
 	The class is a facade to a heterogeneous class hierarchy and thus simplifies how the calling code interacts with the class hierarchy
@@ -671,16 +747,12 @@ class DataSource:
 		return self.Input.read(size)
 	def write(self,**args):
 		self.Output.write(**args)
-#p = {}
-#p['host'] = 'dev.the-phi.com'
-#p['uid'] = 'nyemba@gmail.com'
-#p['qid'] = 'repair'
-#factory = DataSourceFactory()
-#o =  factory.instance(type='QueueReader',args=p)		
-#print o is None
-#q = QueueWriter(host='dev.the-phi.com',uid='nyemba@gmail.com')
-#q.write(object='steve')
-#q.write(object='nyemba')
-#q.write(object='elon')
-
-
+conf = json.loads(open('config.json').read())
+#x = s3Reader( dict(conf,**{'bucket':'com.phi.sample.data','file':'Sample-Spreadsheet-5000-rows.csv'}))
+x = s3Reader(conf)
+print conf
+print x.bucket.get_all_keys()
+# r = x.read()
+# for item in r :
+# 	print item
+#print buckets[1].get_key('Sample-Spreadsheet-5000-rows.csv')