Explorar el Código

Merge pull request #16 from lnyemba/v2.2.0

V2.2.0 - AWS-S3 Bug fix
Steve L. Nyemba hace 10 meses
padre
commit
b30dc0f023
Se han modificado 5 ficheros con 91 adiciones y 76 borrados
  1. 5 0
      README.md
  2. 1 1
      info/__init__.py
  3. 1 1
      setup.py
  4. 2 2
      transport/__init__.py
  5. 82 72
      transport/cloud/s3.py

+ 5 - 0
README.md

@@ -18,6 +18,11 @@ Within the virtual environment perform the following :
 
     pip install git+https://github.com/lnyemba/data-transport.git
 
+## Features
+
+    - read/write from over a dozen databases
+    - run ETL jobs seamlessly
+    - scales and integrates into shared environments like apache zeppelin; jupyterhub; SageMaker; ...
 
 ## What's new
 

+ 1 - 1
info/__init__.py

@@ -1,6 +1,6 @@
 __app_name__  = 'data-transport'
 __author__ = 'The Phi Technology'
-__version__= '2.2.0'
+__version__= '2.2.1'
 __email__  = "info@the-phi.com"
 __license__=f"""
 Copyright 2010 - 2024, Steve L. Nyemba

+ 1 - 1
setup.py

@@ -19,7 +19,7 @@ args    = {
 
     "packages": find_packages(include=['info','transport', 'transport.*'])}
 args["keywords"]=['mongodb','duckdb','couchdb','rabbitmq','file','read','write','s3','sqlite']
-args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql']
+args["install_requires"] = ['pyncclient','duckdb-engine','pymongo','sqlalchemy','pandas','typer','pandas-gbq','numpy','cloudant','pika','nzpy','termcolor','boto3','boto','pyarrow','google-cloud-bigquery','google-cloud-bigquery-storage','flask-session','smart_open','botocore','psycopg2-binary','mysql-connector-python','numpy','pymssql']
 args["url"] =   "https://healthcareio.the-phi.com/git/code/transport.git"
 args['scripts'] = ['bin/transport']
 # if sys.version_info[0] == 2 :

+ 2 - 2
transport/__init__.py

@@ -134,7 +134,7 @@ class get :
     """
     @staticmethod
     def reader (**_args):
-        if not _args or 'provider' not in _args:
+        if not _args or ('provider' not in _args and 'label' not in _args):
             _args['label'] = 'default'
         _args['context'] = 'read'
         return instance(**_args)
@@ -143,7 +143,7 @@ class get :
         """
         This function is a wrapper that will return a writer to a database. It disambiguates the interface
         """
-        if not _args :
+        if not _args or ('provider' not in _args and 'label' not in _args):
             _args['label'] = 'default'
         _args['context'] = 'write'
         return instance(**_args)

+ 82 - 72
transport/cloud/s3.py

@@ -3,10 +3,13 @@ Data Transport - 1.0
 Steve L. Nyemba, The Phi Technology LLC
 
 This file is a wrapper around s3 bucket provided by AWS for reading and writing content
+TODO:
+	- Address limitations that will properly read csv if it is stored with content type text/csv
 """
+
 from datetime import datetime
-import boto
-from boto.s3.connection import S3Connection, OrdinaryCallingFormat
+import boto3
+# from boto.s3.connection import S3Connection, OrdinaryCallingFormat
 import numpy as np
 import botocore
 from smart_open import smart_open
@@ -14,6 +17,7 @@ import sys
 
 import json
 from io import StringIO
+import pandas as pd
 import json
 
 class s3 :
@@ -29,46 +33,37 @@ class s3 :
 			@param filter		filename or filtering elements
 		"""
 		try:
-			self.s3 = S3Connection(args['access_key'],args['secret_key'],calling_format=OrdinaryCallingFormat())			
-			self.bucket = self.s3.get_bucket(args['bucket'].strip(),validate=False) if 'bucket' in args else None
-			# self.path = args['path']
-			self.filter = args['filter'] if 'filter' in args else None
-			self.filename = args['file'] if 'file' in args else None
-			self.bucket_name = args['bucket'] if 'bucket' in args else None
-
+			self._client = boto3.client('s3',aws_access_key_id=args['access_key'],aws_secret_access_key=args['secret_key'],region_name=args['region'])
+			self._bucket_name = args['bucket']	
+			self._file_name = args['file']
+			self._region = args['region']
 		except Exception as e :
-			self.s3 = None
-			self.bucket = None
 			print (e)
+			pass
+	def has(self,**_args):
+		_found = None
+		try:
+			if 'file' in _args and 'bucket' in _args:
+				_found = self.meta(**_args)
+			elif 'bucket' in _args and not 'file' in _args:
+				_found =  self._client.list_objects(Bucket=_args['bucket']) 
+			elif 'file' in _args and not 'bucket' in _args :
+				_found = self.meta(bucket=self._bucket_name,file = _args['file'])
+		except Exception as e:
+			_found = None
+			pass
+		return type(_found) == dict
 	def meta(self,**args):
 		"""
+		This function will return information either about the file in a given bucket
 		:name name of the bucket
 		"""
-		info = self.list(**args)
-		[item.open() for item in info]
-		return [{"name":item.name,"size":item.size} for item in info]
-	def list(self,**args):
-		"""
-		This function will list the content of a bucket, the bucket must be provided by the name
-		:name	name of the bucket
-		"""
-		return list(self.s3.get_bucket(args['name']).list())
-
-
-	def buckets(self):
-		#
-		# This function will return all buckets, not sure why but it should be used cautiously 
-		# based on why the s3 infrastructure is used
-		#
-		return [item.name for item in self.s3.get_all_buckets()]
-
-		# def buckets(self):
-		pass
-		# """
-		# This function is a wrapper around the bucket list of buckets for s3
-		# """
-		# return self.s3.get_all_buckets()
-		
+		_bucket = self._bucket_name if 'bucket' not in args else args['bucket']
+		_file =  self._file_name if 'file' not in args else args['file']
+		_data = self._client.get_object(Bucket=_bucket,Key=_file)
+		return _data['ResponseMetadata']
+	def close(self):
+		self._client.close()	
         	
 class Reader(s3) :
 	"""	
@@ -77,51 +72,66 @@ class Reader(s3) :
 		- stream content	if file is Not None
 		@TODO: support read from all buckets, think about it
 	"""
-	def __init__(self,**args) :
-			s3.__init__(self,**args)
-	def files(self):
-		r = []
-		try:
-			return [item.name for item in self.bucket if item.size > 0]
-		except Exception as e:
-			pass
-		return r
-	def stream(self,limit=-1):
+	def __init__(self,**_args) :
+			super().__init__(**_args)
+	
+	def _stream(self,**_args):
 		"""
 			At this point we should stream a file from a given bucket
 		"""
-		key = self.bucket.get_key(self.filename.strip())
-		if key is None :
-			yield None
+		_object = self._client.get_object(Bucket=_args['bucket'],Key=_args['file'])
+		_stream = None
+		try:
+			_stream = _object['Body'].read()
+		except Exception as e:
+			pass
+		if not _stream :
+			return None
+		if _object['ContentType'] in ['text/csv'] :
+			return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
 		else:
-			count = 0
-			with smart_open(key) as remote_file:
-				for line in remote_file:
-					if count == limit and limit > 0 :
-						break
-				yield line
-				count += 1
+			return _stream
+		
 	def read(self,**args) :
-		if self.filename is None :
-			# 
-		# returning the list of files because no one file was specified.
-			return self.files()
-		else:
-			limit = args['size'] if 'size' in args else -1
-			return self.stream(limit)
+		
+		_name = self._file_name if 'file' not in args else args['file']
+		_bucket = args['bucket'] if 'bucket' in args else self._bucket_name
+		return self._stream(bucket=_bucket,file=_name)
+		
 
 class Writer(s3) :
-
-	def __init__(self,**args) :
-		s3.__init__(self,**args)
-	def mkdir(self,name):
+	"""
+	
+	"""
+	def __init__(self,**_args) :
+		super().__init__(**_args)
+		#
+		# 
+		if not self.has(bucket=self._bucket_name) :
+			self.make_bucket(self._bucket_name)
+	def make_bucket(self,bucket_name):
 		"""
-		This function will create a folder in a bucket
+		This function will create a folder in a bucket,It is best that the bucket is organized as a namespace
 		:name name of the folder
 		"""
-		self.s3.put_object(Bucket=self.bucket_name,key=(name+'/'))
-	def write(self,content):
-		file = StringIO(content.decode("utf8"))
-		self.s3.upload_fileobj(file,self.bucket_name,self.filename)
+		
+		self._client.create_bucket(Bucket=bucket_name,CreateBucketConfiguration={'LocationConstraint': self._region})
+	def write(self,_data,**_args):
+		"""
+		This function will write the data to the s3 bucket, files can be either csv, or json formatted files
+		"""
+		content = 'text/plain'
+		if type(_data) == pd.DataFrame :
+			_stream = _data.to_csv(index=False)
+			content = 'text/csv'
+		elif type(_data) == dict :
+			_stream = json.dumps(_data)
+			content = 'application/json'
+		else:
+			_stream = _data
+		file = StringIO(_stream)
+		bucket = self._bucket_name if 'bucket' not in _args else _args['bucket']
+		file_name = self._file_name if 'file' not in _args else _args['file']
+		self._client.put_object(Bucket=bucket, Key = file_name, Body=_stream,ContentType=content)
 		pass