Source code for dp_tornado.helper.web.aws.s3

# -*- coding: utf-8 -*-


import io
import boto3

from dp_tornado.engine.helper import Helper as dpHelper


[docs]class S3Helper(dpHelper):
[docs] def upload(self, access_key_id, secret_access_key, region_name, bucket_name, src, dest, uploaded_check=True, ExtraArgs=None): if self.helper.misc.type.check.string(src): if not self.helper.io.path.is_file(src): return False elif self.helper.misc.system.py_version <= 2 and not isinstance(src, file): return False elif self.helper.misc.system.py_version >= 3 and not isinstance(src, io.IOBase): return False s3 = boto3.client( service_name='s3', region_name=region_name, aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key) if self.helper.misc.type.check.string(src): s3.upload_file(src, bucket_name, dest, ExtraArgs=ExtraArgs) elif src: s3.upload_fileobj(src, bucket_name, dest, ExtraArgs=ExtraArgs) if uploaded_check: return self.exists( access_key_id=access_key_id, secret_access_key=secret_access_key, region_name=region_name, bucket_name=bucket_name, key=dest) return True
[docs] def download(self, access_key_id, secret_access_key, region_name, bucket_name, src, dest, **kwargs): s3 = boto3.client( service_name='s3', region_name=region_name, aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key) self.helper.io.path.mkdir(self.helper.io.path.dirname(dest)) s3.download_file(bucket_name, src, dest, ExtraArgs=kwargs) return True if self.helper.io.path.is_file(dest) else False
[docs] def copy(self, access_key_id, secret_access_key, region_name, src_bucket_name, src_key, dest_bucket_name, dest_key, copied_check=True): s3 = boto3.client( service_name='s3', region_name=region_name, aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key) copy_source = { 'Bucket': src_bucket_name, 'Key': src_key } s3.copy(copy_source, dest_bucket_name, dest_key) if copied_check: return self.exists( access_key_id=access_key_id, secret_access_key=secret_access_key, region_name=region_name, bucket_name=dest_bucket_name, key=dest_key) return True
[docs] def browse(self, access_key_id, secret_access_key, region_name, bucket_name, prefix): try: s3 = boto3.client( service_name='s3', region_name=region_name, aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key) result = s3.list_objects(Bucket=bucket_name, Prefix=prefix) if 'Contents' not in result: return [] contents = [] for e in result['Contents']: if 'Key' not in e or 'Size' not in e: continue attrs = { 'Size': e['Size'], 'LastModified': e['LastModified'] if 'LastModified' in e else None, 'StorageClass': e['StorageClass'] if 'StorageClass' in e else None, 'ETag': e['ETag'] if 'ETag' in e else None } contents.append((e['Key'], attrs)) return contents except Exception as e: self.logging.exception(e) return False
[docs] def exists(self, key, access_key_id, secret_access_key, region_name, bucket_name): s3 = boto3.client( service_name='s3', region_name=region_name, aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key) try: return True if s3.head_object(Bucket=bucket_name, Key=key) else False except Exception: return False
[docs] def remove(self, access_key_id, secret_access_key, region_name, bucket_name, key=None, prefix=None, deleted_check=True): assert key or prefix assert not (key and prefix) try: s3 = boto3.client( service_name='s3', region_name=region_name, aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key) # with specified key, single key if key: deleted = s3.delete_object(Bucket=bucket_name, Key=key) # Check response if not deleted or 'ResponseMetadata' not in deleted \ or 'HTTPStatusCode' not in deleted['ResponseMetadata'] \ or deleted['ResponseMetadata']['HTTPStatusCode'] % 200 > 99: return False if deleted_check: if self.exists( access_key_id=access_key_id, secret_access_key=secret_access_key, region_name=region_name, bucket_name=bucket_name, key=key): return False return key, # with prefix, multiple keys elif prefix: keys = self.browse( access_key_id=self.ini.static.aws_id, secret_access_key=self.ini.static.aws_secret, region_name=self.ini.static.aws_region, bucket_name=self.ini.static.aws_bucket, prefix=prefix) if keys is False: return False elif not keys: return None delete = {'Objects': [{'Key': e[0]} for e in keys]} deleted = s3.delete_objects(Bucket=bucket_name, Delete=delete) # Check response if not deleted or 'Deleted' not in deleted \ or 'ResponseMetadata' not in deleted \ or 'HTTPStatusCode' not in deleted['ResponseMetadata'] \ or deleted['ResponseMetadata']['HTTPStatusCode'] % 200 > 99: return False return tuple([e['Key'] for e in deleted['Deleted'] if 'Key' in e]) return False except Exception as e: self.logging.exception(e) return False
[docs] def generate_presigned_post(self, access_key_id, secret_access_key, region_name, bucket_name, key, success_action_redirect=None, content_length_range=None, max_content_length=None, expires_in=6000, acl=None): s3 = boto3.client( service_name='s3', region_name=region_name, aws_access_key_id=access_key_id, aws_secret_access_key=secret_access_key) fields = {} conditions = [] if success_action_redirect: fields['success_action_redirect'] = success_action_redirect conditions.append({'success_action_redirect': success_action_redirect}) else: fields['success_action_status'] = '201' conditions.append({'success_action_status': '201'}) if acl: conditions.append({'acl': acl}) if content_length_range and len(content_length_range) == 2: conditions.append(["content-length-range", content_length_range[0], max_content_length]) elif max_content_length: conditions.append(["content-length-range", 0, max_content_length]) payload = s3.generate_presigned_post( Bucket=bucket_name, Key=key, Fields=fields, Conditions=conditions, ExpiresIn=expires_in) return { 'action': payload['url'], 'fields': [{'name': k, 'value': v} for k, v in payload['fields'].items()] }