座右铭:怎么简单怎么来,以实现功能为主。
欢迎大家关注公众号与我交流
环境安装
pip install -U minio
示例代码
import os
from minio import Minio
from loguru import logger
from datetime import timedelta
class Client:
'''
endpoint: ip:port
access_key: your username
secret_key: your password
secure: is or not is https
'''
def __init__(self, endpoint, access_key, secret_key, secure=False):
self.client = Minio(endpoint=endpoint, access_key=access_key, secret_key=secret_key, secure=secure)
def list_buckets(self):
return self.client.list_buckets()
def make_bucket(self, bucket_name):
if not self.client.bucket_exists(bucket_name):
return self.client.make_bucket(bucket_name=bucket_name)
else:
return logger.warning(f"Bucket {bucket_name} already exists")
def upload_file(self, bucket_name, object_name, file_path):
return self.client.fput_object(bucket_name=bucket_name, object_name=object_name, file_path=file_path)
def share_file(self, bucket_name, object_name, expires=None):
return self.client.presigned_get_object(bucket_name, object_name, expires=timedelta(days=expires))
if __name__ == "__main__":
image_path = "/home/abc/imgs"
user = Client("ip:port", "username", "password") # 替换为你的 地址,用户名,密码
logger.info(user.list_buckets()) # 列出所有的 bucket
user.make_bucket("data") # 创建 bucket 命名为 data
for item in os.listdir(image_path): # 把本地 /home/abc/imgs 下的 所有文件上传文件到 bucket data/test/ 下面
logger.info(user.upload_file('data', f"test/{item}", f"{image_path}/{item}").object_name)
logger.info(user.share_file("data", "test/1702286610145.jpg", 7)) # 会生成一个有效期为7天时间的访问链接