python botos s3 aws

news2024/11/19 10:05:36

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html

AWS是亚马逊的云服务,其提供了非常丰富的套件,以及支持多种语言的SDK/API。本文针对其S3云储存服务的Python SDK(boto3)的使用进行介绍。

关键词:AWS,S3,Python,boto3,endpoint,client

背景

AWS是一整套亚马逊云服务套件(云存储及其上的基础设施和服务),包括云存储(主要是对象存储)、微服务、数据库等,其中S3对象存储受到众多国内开发者的欢迎。AWS提供了包括console、client、sdk等多种方式进行连接使用,并支持包括python在内的许多语言。为了便捷地在Python程序内使用S3对象存储,我们考虑两种途径:

  • 在子进程中召唤aws client命令行程序;
  • 在python中调用boto3模块的api调用服务

其中boto3途径由于和python语言(和其他语言)有较好的适配,因此更适合开发者使用。此外,处于安全考虑,开发者可能只能获得AWS的有限访问权限,比如endpoint,这使得aws官方教程中的一些范例不可用。比如,访问对象存储至少存在三种方式:Resource、Session、Client,而借助endpoint我们只能访问client,这限制了开发者权限、无法使用高级功能的同时,也提高了数据操作的安全性。

本文将针对如何调用boto3和endpoint来实现aws S3服务的功能进行介绍。

相关信息

在正式介绍之前,有必要对aws及boto3的相关组件和功能进行介绍。

从AWS到S3

从AWS到其中的S3服务的关系链可以简单地描述为:AWS -> VPC -> S3 -> Endppoint -> EC2

意思是AWS到私有云(VPC)到S3存储到EC2服务实例,Endpoint则是S3到EC2的桥梁。如下图所示:

“Amazon Virtual Private Cloud (Amazon VPC),简单理解,就是在云上建个大楼,大楼里面的网络、门禁,安检等都一应俱全,我们根据需要在大楼里选择房间(创建ec2)办公,这个房间自己也有相应的门禁系统”

参考:https://www.bioaws.com/blogs/2020-02-02-vpc-endpoint-s3/

AWS的命令行client

aws提供了一个便捷的命令行程序以供使用,其需要先去官网下载一个zip安装包,然后解压安装即可:

安装aws cli:How to Install AWS CLI on Ubuntu 20.04

curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install

配置awscli

aws configure
# 输入access key和security key:后两项可以忽略(假如只需要使用S3的话)

连接S3存储桶

# view folder
aws [option] --endpoint-url [endpoint_url] s3 [action] s3://[bucket]
# download single file
aws [option] --endpoint-url [endpoint_url] s3 cp s3://[bucket]/[file_path] [local_path]
# download folder
aws [option] --endpoint-url [endpoint_url] s3 sync s3://[bucket]/[folder_path] [local_path]

参考:通过 AWS CLI 使用高级别 (s3) 命令

boto3: python sdk

SimpleQueueService(SQS)
Amazon Simple Queue Service (Amazon SQS) 是一种完全托管的消息队列服务,可以轻松解耦和扩展微服务、分布式系统和无服务器应用程序。 Amazon SQS 在分布式应用程序组件之间移动数据并帮助您解耦这些组件。

import boto3

'''send messages'''
# Get the service resource
sqs = boto3.resource('sqs')
# Get the queue
queue = sqs.get_queue_by_name(QueueName='test')
# Create a new message
response = queue.send_message(MessageBody='world')
# The response is NOT a resource, but gives you a message ID and MD5
print(response.get('MessageId'))
print(response.get('MD5OfMessageBody'))

'''process messages'''
# Process messages by printing out body and optional author name
for message in queue.receive_messages(MessageAttributeNames=['Author']):
    # Get the custom author message attribute if it was set
    author_text = ''
    if message.message_attributes is not None:
        author_name = message.message_attributes.get('Author').get('StringValue')
        if author_name:
            author_text = ' ({0})'.format(author_name)
    # Print out the body and author (if set)
    print('Hello, {0}!{1}'.format(message.body, author_text))
    # Let the queue know that the message is processed
    message.delete()

Resource

资源表示 Amazon Web Services (AWS) 的面向对象的接口。 它们提供了比服务客户端进行的原始低级调用更高级别的抽象。

每个资源实例都有许多属性和方法。 这些在概念上可以分为标识符、属性、操作、引用、子资源和集合。资源本身也可以在概念上分为服务资源(如 sqs、s3、ec2 等)和单个资源(如 sqs.Queue 或 s3.Bucket)。 服务资源没有标识符或属性。 否则,两者共享相同的组件。

# Get resources from the default session
sqs = boto3.resource('sqs')
s3 = boto3.resource('s3')

'''example'''
# S3 Object (bucket_name and key are identifiers)
obj = s3.Object(bucket_name='boto3', key='test.py')
print(obj.bucket_name)print(obj.key)

# S3 Object attributes
obj.last_modifie
dobj.e_tag

# S3 Object actions
obj = s3.Object(bucket_name='boto3', key='test.py')
response = obj.get()
data = response['Body'].read()

# S3 sub-resources
obj = bucket.Object(key='new_file.txt')
print(obj.bucket_name)
print(obj.key)

# S3: Wait for a bucket to exist.
bucket.wait_until_exists()

资源实例不是线程安全的,不应跨线程或进程共享。 这些特殊类包含无法共享的附加元数据。 建议在多线程或多处理中为每个线程或进程创建一个新资源。

Collections

集合为一组资源提供了一个可迭代的接口。

Session

会话管理有关特定配置的状态。 会话通常存储以下内容:

  • 证书
  • AWS 区域
  • 与您的个人资料相关的其他配置
'''Using the default session'''
sqs = boto3.client('sqs')
s3 = boto3.resource('s3')

'''Create your own session'''
my_session = boto3.session.Session()
# Now we can create low-level clients or resource clients from our custom session
sqs = my_session.client('sqs')
s3 = my_session.resource('s3')

与 Resource 对象类似,Session 对象不是线程安全的,不应在线程和进程之间共享。 建议在多线程或多处理中为每个线程或进程创建一个新的 Session 对象。

Client

客户端为 AWS 提供了一个低级接口,其方法与服务 API 的映射接近 1:1。 所有服务操作均由客户端支持。

import boto3
sqs = boto3.client('sqs')

# It is also possible to access the low-level client from an existing resource:
# Create the resource
sqs_resource = boto3.resource('sqs')
# Get the client from the resource
sqs = sqs_resource.meta.client

# send messages
response = sqs.send_message(QueueUrl='...', MessageBody='...')
# handling messages
response = sqs.list_queues()
for url in response.get('QueueUrls', []):
    print(url)
# waiters
sqs.waiter_names

多处理:虽然客户端是线程安全的,但由于它们的网络实现,它们不能跨进程共享。 这样做可能会导致调用服务时响应顺序不正确。

共享元数据:客户端通过一些属性(即元、异常和服务员名称)向最终用户公开元数据。 这些是可以安全阅读的,但任何突变都不应该被认为是线程安全的

自定义 Botocore 事件:Botocore(构建 Boto3 库)允许高级用户提供他们自己的自定义事件挂钩,这些挂钩可以与 boto3 的客户端交互。 大多数用户不需要使用这些接口,但是那些不需要仔细审查的用户不应再考虑他们的客户端线程安全。

参考:Botocore Events - botocore 1.27.25 documentation

import boto3.session
from concurrent.futures import ThreadPoolExecutor

def do_s3_task(client, task_definition):
    # Put your thread-safe code here

def my_workflow():
    # Create a session and use it to make our client
    session = boto3.session.Session()
    s3_client = session.client('s3')

    # Define some work to be done, this can be anything
    my_tasks = [ ... ]

    # Dispatch work tasks with our s3_client
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = [executor.submit(do_s3_task, s3_client, task) for task in my_tasks]

Endpoint (AWS PrivateLink for S3)

在将 S3 客户端配置为使用接口 VPC 终端节点时,请务必注意,只有终端节点中指定的资源类型才能使用该客户端进行寻址( only the resource type specified in the endpoint can be addressed)。 访问存储桶和访问点需要实例化两个客户端,每个资源类型一个。

import boto3
s3_client = boto3.client(
    service_name='s3',
    endpoint_url='https://bucket.vpce-abc123-abcdefgh.s3.us-east-1.vpce.amazonaws.com')

Paginators

一些 AWS 操作返回的结果不完整,需要后续请求才能获得整个结果集。 发送后续请求以在前一个请求中断的地方继续的过程称为分页。

Error handling

Boto3 提供了许多功能来帮助导航您在与 AWS 服务交互时可能遇到的错误和异常。

1. 确定要捕获的异常

  • Botocore 异常
  • AWS 服务异常:AWS 服务异常被底层的 botocore 异常 ClientError 捕获。有关您正在使用的服务的错误响应的完整列表,请参阅各个服务的 AWS 文档。

2. 使用低级客户端时捕获异常

3. 解析错误响应并从 AWS 服务中捕获异常

4. 从错误响应中辨别有用信息

try:
    client.some_api_call(SomeParam='some_param')

except botocore.exceptions.ClientError as error:
    # Put your error handling logic here
    raise error

except botocore.exceptions.ParamValidationError as error:
    raise ValueError('The parameters you provided are incorrect: {}'.format(error))

'''Error message structure
{
    'Error': {
        'Code': 'SomeServiceException',
        'Message': 'Details/context around the exception or error'
    },
    'ResponseMetadata': {
        'RequestId': '1234567890ABCDEF',
        'HostId': 'host ID data will appear here as a hash',
        'HTTPStatusCode': 400,
        'HTTPHeaders': {'header metadata key/values will appear here'},
        'RetryAttempts': 0
    }
}

except botocore.exceptions.ClientError as err:
    if err.response['Error']['Code'] == 'InternalError': # Generic error
        print('Error Message: {}'.format(err.response['Error']['Message']))
'''

使用boto3操作S3

准备工作

考虑以下问题:

1. 你有什么访问权限? 在使用low-level client处理存储桶时,必须使用具有访问 ID/密钥的端点(Endpoint with access id / key)。 如果您想在 AWS 中尝试其他高级服务,请检查您的授权。 所拥有的 AWS 权限决定使用什么链接方式

2. 您处理的文件的大小。 请参考file-size limitation和File transfer configuration。 操作文件的大小

  • 本地-> AWS:多部分
  • AWS -> 本地:分页器

3.错误处理策略。 异常处理

  • 单文件进程中断:同名再次上传并重写
  • 文件夹进程中断:检查数据库,上传和重写
  • 重复处理:检查数据库,上传和重写
  • 线程安全:创建单个客户端来处理所有文件

4. 快速api参考。 API查询

  • S3 — Boto3 Docs 1.24.24 文档
  • 代码样例:Amazon S3 examples
  • 会话参考 ‒ Boto3 Docs 1.24.24 文档

查询/上传/下载/删除 操作step-by-step

本节介绍了如何利用endpoint连接存储并操作。

  1. Configure aws
aws configure
# -> input aws_access_key_id & aws_secret_access_key

然后你会在 ~/.aws/credentials 找到你的配置文件。也可以忽略这一步,在python程序中设置。

2. Create an s3 client & explore buckects(查询)

s3_client = boto3.client(service_name='s3', endpoint_url=aws_s3_endpoint_url)
response = s3_client.list_buckets()
buckets = [bucket['Name'] for bucket in response['Buckets']]
print(buckets)

3. Upload files(上传,覆盖写)

s3默认采用覆盖写模式,如果希望避免这一问题,可以认为设置version属性进行控制。

response = s3_client.upload_file(local_file_path, bucket_name, target_path_in_bucket)

4. Upload large files(上传大文件)

在上传、下载或复制文件或 S3 对象时,适用于 Python 的 AWS 开发工具包会自动管理重试以及multipart 和非multipart 传输。 通过使用非常适合大多数场景的合理默认设置来执行管理操作。 为了处理特殊情况,可以配置默认设置以满足要求。

# using simple upload
self.client.upload_file(local_file_path, bucket_name, target_path_in_bucket)

# using multi-part upload to extend size limitation
GB = 1024 ** 3
config = TransferConfig(multipart_threshold=5*GB)
self.client.upload_file(local_file_path, bucket_name, target_path_in_bucket, Config=config)

速度对比:

Test fileDefault uploadSpecified multi-part upload
2GB221MB/s150~212MB/s
13GB232MB/s224~240MB/s

总体而言,自行配置的速度不一定比默认自适应的速度高,而高也高不到哪里去。

5. Download files(下载)

# download single file
self.client.download_file(bucket_name, target_path_in_bucket, local_file_path)

# download single file as object
with open('FILE_NAME', 'wb') as f: # binary mode only
    s3.download_fileobj('BUCKET_NAME', 'OBJECT_NAME', f)

# download folder
list_objects_v2() -> download_file()

6. Delete files in buckets(删除)

self.client.delete_object(Bucket=bucket_name, Key=target_path_in_bucket)

7. Using Calback as ProgressBar(监控进度条)

使用进度条来监控操作状态。

示例来自 Uploading files ‒ Boto3 Docs 1.24.25 documentation 和 How can I increase my AWS s3 upload speed when using boto3?

方法一:官方示例,速度程序运行降速80%!

import threading

class ProgressPercentage(object):

    def __init__(self, filename):
        self._filename = filename
        self._size = float(os.path.getsize(filename))
        self._seen_so_far = 0
        self._lock = threading.Lock()

    def __call__(self, bytes_amount):
        # To simplify, assume this is hooked up to a single filename
        with self._lock:
            self._seen_so_far += bytes_amount
            percentage = (self._seen_so_far / self._size) * 100
            sys.stdout.write(
                "\r%s  %s / %s  (%.2f%%)" % (
                    self._filename, self._seen_so_far, self._size,
                    percentage))
            sys.stdout.flush()

s3.upload_file(
    'FILE_NAME', 'BUCKET_NAME', 'OBJECT_NAME',
    Callback=ProgressPercentage('FILE_NAME'))

方法二:民间示例,程序运行降速10%

from tqdm import tqdm
import boto3.s3.transfer as s3transfer

 class Tool():
 
     def __init__():
         pass
         
     def client_upload_files(self, bucket, local_path, aws_path, progress_func):
        transfer_config = s3transfer.TransferConfig(
            use_threads=True,
            max_concurrency=10,
        )
        s3t = s3transfer.create_transfer_manager(
            self.client, transfer_config)
        s3t.upload(
            local_path, bucket, aws_path, 
            subscribers=[
                s3transfer.ProgressCallbackInvoker(progress_func),
            ]
        )
        s3t.shutdown()
 
 with tqdm(desc='upload', ncols=60,
      total=totalsize, unit='B', unit_scale=1) as pbar:
    tool.client_upload_files(
        bucket_name, large_file, large_target_file, pbar.update)

实验(坑)

aws的操作其实是非常繁琐的,因为它基本上很多事情都存在多种实现,而作为初学者难以判断这些实现有哪些坑,以下列举几个基本问题:

  • 为什么我无法使用session/resource/queue/DynamicDB等服务?
    • 因为没有权限或没有购买相关服务
  • 使用endpoint可以做什么事情?
    • endpoint用于创建low-level client,是aws服务最基础的api之一,基本上client和服务是一对一的关系,所以endpoint的权限是非常基本的,这取决于创建endpoint的时候所指定的权限。
  • 在实践中,client应该被call多次还是reuse?
    • 事实上,这取决于aws的收费政策!目前aws仅对服务类型和条件收费,对client的call次数不收费,因此原则上recall和reuse是等价的。然而,从程序规范而言,应该reuse。
  • 上传和下载有哪些关注点?
    • S3对不同层次的资源调用存在速度限制,对于low-level调用存在5GB/单个文件的限制,除非通过multi-part模式处理。然而,在boto3中对此进行了优化和封装,可以自动根据处理文件的大小选择合适的模式进行上传,最多支持5TB。
    • 文件的上传默认采用覆盖写。
    • client处理数据的时候是线程安全的,但无法处理突变的数据,因此不应该同时对S3中的单个数据同时读写。
  • 线程安全:
    • 最佳策略是,认为所有类型的连接都不是线程安全的!以免出现意外的错误。
    • 由于不是线程安全的,因此最好不要使用多线程,至少不要使用互相沟通的多线程,除非你对boto3的属性非常熟悉。
  • 查询数据:
    • 尽管S3采用了桶存储策略,但创建了一张数据表来存储所有的meta信息,因此可以使用SQL语句进行检索。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1594052.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

云仓酒庄综合品酒师培训盛大开幕,数名爱好者共襄盛举

春意盎然的四月,广东顺德保利假日酒店迎来了一场特殊的盛宴——云仓酒庄首届《综合品酒师》培训盛大开幕。来自全国各地的品酒爱好者齐聚一堂,共襄此次品酒文化的盛宴。此次培训不仅是对品酒文化的深度挖掘,更是酒类销售行业专业化、规范化发…

LeetCode 热题 100 题解(二):双指针部分(2)| 滑动窗口部分(1)

题目四:接雨水(No. 43) 题目链接:https://leetcode.cn/problems/trapping-rain-water/description/?envTypestudy-plan-v2&envIdtop-100-liked 难度:困难 给定 n 个非负整数表示每个宽度为 1 的柱子的高度图&am…

程序员见了走不动道!十几款封装好的界面炫酷的登录页组件,太酷了(文末有项目源码)

今天给大家推荐一个漂亮的 React 登录页组件。内含十几款封装好的登录页,界面炫酷、即插即用,用来快速构建登录页的 React 组件,简直不要太酷了! React Login Pages React Login Pages 提供基于基础组件的封装登录页面组件&#…

【通信原理笔记】【三】——3.8 载波同步

文章目录 前言一、正弦信号二、载波同步2.1、平方环法2.2、科斯塔斯环法(castas)2.3 相位模糊 总结 前言 不管是幅度调制还是角度调制,都离不开正弦信号,其中相干解调都要求恢复发送端使用的正弦载波信号,这一节就来深…

Win11 使用 WSL2 安装 linux 子系统 ubuntu,删除 linux 子系统 ubuntu

Win11 使用 WSL2 安装 linux 子系统 ubuntu,删除 linux 子系统 ubuntu 1、用 部署映像服务和管理工具 dism.exe 命令,开启 WSL2 按【WIN R】,打开【运行】,输入:【cmd】,管理员打开【命令行提示符】。 …

无人机/飞控--ArduPilot、PX4学习记录(5)

这几天看dronekit,做无人机失控保护。 PX4官网上的经典案例,我做了很多注解,把代码过了一遍。 无人机具体执行了: 先起飞,飞至正上空10m->向北移动10m->向东移动10m->向南移动10m->向西移动10m->回到初…

【日常记录】【CSS】SASS循环的使用

文章目录 1、引言2、安装3、举例4、参考链接 1、引言 目前在任何项目框架中,都会有css 预处理器,目前一般使用 sass、less 这俩其中之一,它可以简化css的书写 Sass 是一款强化 CSS 的辅助工具,它在 CSS 语法的基础上增加了变量 (v…

内网渗透-Earthworm的简单使用(内网穿透工具)

Earthworm的简单介绍(一) 文章目录 EarthWorm下载地址1. 普通网络 1.1 跳板机存在公网IP 1.1.1 网络环境1.1.2 使用方法1.1.3 流量走向 1.2 跳板机不存在公网IP,可出网 1.2.1 网络环境1.2.2 使用方法1.2.3 流量走向 2. 二级网络 2.1 一级跳…

【随笔】Git 基础篇 -- 远程仓库 git clone(二十五)

💌 所属专栏:【Git】 😀 作  者:我是夜阑的狗🐶 🚀 个人简介:一个正在努力学技术的CV工程师,专注基础和实战分享 ,欢迎咨询! 💖 欢迎大…

刷题之Leetcode707题(超级详细)

707.设计链表 力扣题目链接(opens new window)https://leetcode.cn/problems/design-linked-list/ 题意: 在链表类中实现这些功能: get(index):获取链表中第 index 个节点的值。如果索引无效,则返回-1。addAtHead(val)&#x…

【数据库】加 Redis 就无懈可击? —— 缓存雪崩、击穿、穿透的破解之道

一般来说,目前的系统设计上为了缓解数据库峰值压力,都会增加 Redis 作为第一道屏障,但是其依然存在一些不足。总结起来是三大问题,分别是缓存雪崩、缓存击穿和缓存穿透。本文旨在说清楚三个问题的原因及相应的防范策略。 以 Redis…

计算机基础知识-第9章-存储的本质(2)——硬盘和文件系统基础知识

一、机械硬盘的原理 概括来说,硬盘的工作原理是利用特定的磁粒子的极性来记录数据。磁头在读取数据时,将磁力子的不同极性转换成不同的电脉冲信号,再利用数据转换器将这些原始信号变成电脑可以使用的数据,写的操作正好与此相反。…

react 初学增删改查购物车案例

界面 代码 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta name"viewport" content"widthdevice-width, initial-scale1.0" /><title>react-购物车案例</title><…

gpt在线网页版最全收录

ChatGPT镜像 今天在知乎看到一个问题&#xff1a;“平民不参与内测的话没有账号还有机会使用ChatGPT吗&#xff1f;” 从去年GPT大火到现在&#xff0c;关于GPT的消息铺天盖地&#xff0c;真要有心想要去用&#xff0c;途径很多&#xff0c;别的不说&#xff0c;国内GPT的镜像…

我的新书,在西西弗书店上架了!

大家好&#xff0c;我是程序员小灰。今天告诉大家一个好消息&#xff0c;我的新书在西西弗书店上架了&#xff01; 熟悉小灰的朋友都知道&#xff0c;我以前是京东的一名程序员&#xff0c;现在全职投入到IT领域的自媒体创作。在2019年&#xff0c;我出版了人生中的第一本书《漫…

eclipse 取消生成注释 TODO Auto-generated method stub

eclipse 取消生成注释 // TODO Auto-generated method stub 基本步骤 windows -> preferencesJava -> Code Style -> Code TemplatesCode -> Method body -> 编辑删除 // ${todo} Auto-generated method stub参考材料 Eclipse 中取消生成 TODO Auto-generated…

Unity类银河恶魔城学习记录12-14 p136 Merge Skill Tree with Sword skill源代码

Alex教程每一P的教程原代码加上我自己的理解初步理解写的注释&#xff0c;可供学习Alex教程的人参考 此代码仅为较上一P有所改变的代码 【Unity教程】从0编程制作类银河恶魔城游戏_哔哩哔哩_bilibili CharacterStats.cs using System.Collections; using System.Collections.…

RTC的基本概念以及相关例程

实时时钟(RTC) 北京时间跟伦敦时间错8个小时 BKP简介 BKP本质上是RAM存储器&#xff0c;没有掉电不丢失的能力。 VBAT的作用就是&#xff0c;当VDD断电时&#xff0c;BKP会切换到VBAT供电&#xff0c;这样可以继续维持BKP里面的数据&#xff0c;如果VDD断电&#xff0c;VBAT也…

《猎灵online》游戏完整源码(源码+客户端+服务端+文档+工具),云盘下载

《猎灵》是一款由国内知名开发运营开发的大型3D魔幻网游&#xff0c;《猎灵》研发团队突破诸多瓶颈&#xff0c;首创“全自由无限制PK”&#xff0c;让玩家拒绝无意义等待&#xff0c;自由作战不受任何束缚&#xff0c;真正的实现想战就战&#xff0c;游戏创建了六界神魔乱斗的…

蓝桥杯嵌入式模板(cubemxkeil5)

LED 引脚PC8~PC15&#xff0c;默认高电平&#xff08;灭&#xff09;。 此外还要配置PD2为输出引脚&#xff08;控制LED锁存&#xff09; &#xff0c;默认低电平&#xff08;锁住&#xff09;&#xff01;&#xff01;&#xff01; #include "led.h"void led_disp…