OSError [WinError 1455]页面文件太小,无法完成操作”解决方案“_深度学习_yangshejun-GitCode 开源社区 (csdn.net)
python对RabbitMQ的简单使用_python rabbitmq-CSDN博客
【Windows安装RabbitMQ详细教程】_rabbitmq windows-CSDN博客
Windows 10安装Minio 文件服务器-CSDN博客
发送到minio服务器
import collections
import os
import time
import cv2
import numpy as np
from ultralytics import YOLO
from threading import Thread, Lock
import minio
minio_client = minio.Minio('10.160.86.76:9000',
access_key='minioadmin',
secret_key='minioadmin',
secure=False)
shared_model_1 = YOLO("yolov8n.pt")
model_results = {shared_model_1: []}
lock = Lock()
os.makedirs('stream1/images', exist_ok=True)
os.makedirs('stream1/videos', exist_ok=True)
frame_rate = 30
record_length = 10 # seconds
target_class_value1 = 0 # e.g. class 1
results_list1 = collections.deque(maxlen=frame_rate * record_length)
def predict(model, image_path):
results = model.predict(image_path, show=True, stream=True)
last_save_time1 = time.time()
with lock:
model_results[model] = results
for result in model_results[model]:
print('model_results[model]', model_results[model])
if model == shared_model_1:
boxes = result.boxes
results_list1.append(result)
output_dir = 'stream1'
if boxes.cls.numel() != 0:
cls_tensor = boxes.cls
cls_list = cls_tensor.cpu().numpy().tolist()
else:
cls_list = [99999]
if target_class_value1 in cls_list and (
time.time() - last_save_time1) > record_length:
timestamp = int(time.time())
img_filepath = f"{output_dir}/images/{timestamp}.jpg"
cv2.imwrite(img_filepath, result.plot(boxes=True))
with open(img_filepath, 'rb') as file_data:
file_stat = os.stat(img_filepath)
minio_client.put_object('iot-db-img', f'{timestamp}.jpg', file_data,
file_stat.st_size)
last_save_time1 = time.time()
Thread(target=predict, args=(shared_model_1, "0")).start() # " "填写地址
import boto3
import pika
import time
# 初始化Minio客户端
s3 = boto3.client('s3',
endpoint_url='http://10.160.86.76:9000', # 这需要替换成你的Minio服务器地址
aws_access_key_id='minioadmin', # 这需要替换成你的access key
aws_secret_access_key='minioadmin') # 这需要替换成你的secret key
# 初始化RabbitMQ连接和通道
connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')) # 这需要替换成你的RabbitMQ服务器信息
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='image_urls')
# 获取bucket中的初始对象列表
response = s3.list_objects(Bucket='iot-db-img') # 你需要把bucket's name替换为你自己的
old_keys = {content['Key'] for content in response.get('Contents', [])}
while True:
# 循环获取新的对象列表
response = s3.list_objects(Bucket='iot-db-img')
new_keys = {content['Key'] for content in response.get('Contents', [])}
# 找出新添加的图片
added_keys = new_keys - old_keys
for key in added_keys:
image_url = f"http://10.160.86.76:9000/iot-db-img/{key}" # 这需要替换成你的Minio服务器和bucket信息
# 发送新图片的URL到RabbitMQ
channel.basic_publish(exchange='',
routing_key='image_urls',
body=image_url)
# 更新旧的图片列表
old_keys = new_keys
# 每次循环后休息一段时间以免对服务器造成过大压力
time.sleep(1)
# 关闭RabbitMQ连接
connection.close()
接收端测试
import os
import pika
import requests
# establishes a connection with RabbitMQ
connection = pika.BlockingConnection(pika.URLParameters('amqp://guest:guest@localhost:5672/%2F'))
channel = connection.channel()
# declare queue
channel.queue_declare(queue='image_urls')
def callback(ch, method, properties, body):
image_url = body.decode()
image_name = os.path.basename(image_url)
response = requests.get(image_url)
# Check if the response is ok
if response.status_code == 200:
# save the image to the local
with open(os.path.join(r'E:\ultralytics-main\rabbit\rabbit', image_name), 'wb') as f:
f.write(response.content)
print(f"Image saved: {image_name}")
else:
print(f"Unable to download image, response code: {response.status_code}, url: {image_url}")
ch.basic_ack(delivery_tag=method.delivery_tag)
# start the message consumer
channel.basic_consume(queue='image_urls', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()