s3.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. """
  2. Data Transport - 1.0
  3. Steve L. Nyemba, The Phi Technology LLC
  4. This file is a wrapper around s3 bucket provided by AWS for reading and writing content
  5. TODO:
  6. - Address limitations that will properly read csv if it is stored with content type text/csv
  7. """
  8. from datetime import datetime
  9. import boto3
  10. # from boto.s3.connection import S3Connection, OrdinaryCallingFormat
  11. import numpy as np
  12. import botocore
  13. from smart_open import smart_open
  14. import sys
  15. import json
  16. from io import StringIO
  17. import pandas as pd
  18. import json
  19. class s3 :
  20. """
  21. @TODO: Implement a search function for a file given a bucket??
  22. """
  23. def __init__(self,**args) :
  24. """
  25. This function will extract a file or set of files from s3 bucket provided
  26. @param access_key
  27. @param secret_key
  28. @param path location of the file
  29. @param filter filename or filtering elements
  30. """
  31. try:
  32. self._client = boto3.client('s3',aws_access_key_id=args['access_key'],aws_secret_access_key=args['secret_key'],region_name=args['region'])
  33. self._bucket_name = args['bucket']
  34. self._file_name = args['file']
  35. self._region = args['region']
  36. except Exception as e :
  37. print (e)
  38. pass
  39. def has(self,**_args):
  40. _found = None
  41. try:
  42. if 'file' in _args and 'bucket' in _args:
  43. _found = self.meta(**_args)
  44. elif 'bucket' in _args and not 'file' in _args:
  45. _found = self._client.list_objects(Bucket=_args['bucket'])
  46. elif 'file' in _args and not 'bucket' in _args :
  47. _found = self.meta(bucket=self._bucket_name,file = _args['file'])
  48. except Exception as e:
  49. _found = None
  50. pass
  51. return type(_found) == dict
  52. def meta(self,**args):
  53. """
  54. This function will return information either about the file in a given bucket
  55. :name name of the bucket
  56. """
  57. _bucket = self._bucket_name if 'bucket' not in args else args['bucket']
  58. _file = self._file_name if 'file' not in args else args['file']
  59. _data = self._client.get_object(Bucket=_bucket,Key=_file)
  60. return _data['ResponseMetadata']
  61. def close(self):
  62. self._client.close()
  63. class Reader(s3) :
  64. """
  65. Because s3 contains buckets and files, reading becomes a tricky proposition :
  66. - list files if file is None
  67. - stream content if file is Not None
  68. @TODO: support read from all buckets, think about it
  69. """
  70. def __init__(self,**_args) :
  71. super().__init__(**_args)
  72. def _stream(self,**_args):
  73. """
  74. At this point we should stream a file from a given bucket
  75. """
  76. _object = self._client.get_object(Bucket=_args['bucket'],Key=_args['file'])
  77. _stream = None
  78. try:
  79. _stream = _object['Body'].read()
  80. except Exception as e:
  81. pass
  82. if not _stream :
  83. return None
  84. if _object['ContentType'] in ['text/csv'] :
  85. return pd.read_csv(StringIO(str(_stream).replace("\\n","\n").replace("\\r","").replace("\'","")))
  86. else:
  87. return _stream
  88. def read(self,**args) :
  89. _name = self._file_name if 'file' not in args else args['file']
  90. _bucket = args['bucket'] if 'bucket' in args else self._bucket_name
  91. return self._stream(bucket=_bucket,file=_name)
  92. class Writer(s3) :
  93. """
  94. """
  95. def __init__(self,**_args) :
  96. super().__init__(**_args)
  97. #
  98. #
  99. if not self.has(bucket=self._bucket_name) :
  100. self.make_bucket(self._bucket_name)
  101. def make_bucket(self,bucket_name):
  102. """
  103. This function will create a folder in a bucket,It is best that the bucket is organized as a namespace
  104. :name name of the folder
  105. """
  106. self._client.create_bucket(Bucket=bucket_name,CreateBucketConfiguration={'LocationConstraint': self._region})
  107. def write(self,_data,**_args):
  108. """
  109. This function will write the data to the s3 bucket, files can be either csv, or json formatted files
  110. """
  111. content = 'text/plain'
  112. if type(_data) == pd.DataFrame :
  113. _stream = _data.to_csv(index=False)
  114. content = 'text/csv'
  115. elif type(_data) == dict :
  116. _stream = json.dumps(_data)
  117. content = 'application/json'
  118. else:
  119. _stream = _data
  120. file = StringIO(_stream)
  121. bucket = self._bucket_name if 'bucket' not in _args else _args['bucket']
  122. file_name = self._file_name if 'file' not in _args else _args['file']
  123. self._client.put_object(Bucket=bucket, Key = file_name, Body=_stream,ContentType=content)
  124. pass