前言
在训练模型完成后,想把模型应用起来,比如模型可以部署到项目中,实时接收RTSP视频流进行识别检测,一旦达到自己所设置的置信度阈值(例如大于0.5),系统就会实时把报警信息发送给服务端,由服务端进行实时响应和策略执行,很像是个企业级应用流程 。
比如:我们在yolov8平台训练完“火焰”模型,想要实时检测并把识别结果发送给服务端进行报警,一旦出现火焰,立即发送服务端进行报警。如下图所示:
智能识别系统(python端)
首先,获取RTSP视屏流,可以是单个视频流也可以是视频流数组,所有常量都可以通过配置文件的形式进行读取。在yolov8网络模型代码基础上进行调用模型,然后对实时的RTSP协议视屏流进行实时识别,一旦发现有异常,比如出现火焰,并且超过置信度阈值(0.5),就立马发送报警信息到服务端,以下是python代码,主要实现:
2.1.模型调用
# 加载YOLOv8模型
model = YOLO('model.pt')
2.2.RTSP协议视频流实时读取
def process_video_stream(rtsp_url):
"""
从RTSP流中读取视频帧并处理。
:param rtsp_url: RTSP流的URL
:return: None
"""
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
if not cap.isOpened():
logging.error(f"无法打开RTSP视频流:{rtsp_url}")
return
2.3.异常检测
def start_detection(rtsp_url):
"""
启动视频流检测线程。
:param rtsp_url: RTSP流的URL
"""
detection_thread = threading.Thread(target=process_video_stream, args=(rtsp_url,))
detection_thread.start()
2.4.实时发送报警信息
def send_alarm(alarm_data):
"""
发送报警信息到服务器。
:param alarm_data: 包含标签名称、数量、置信度和图像信息的字典
:return: None
"""
for endpoint in SERVER_ENDPOINTS:
try:
response = requests.post(endpoint, json=alarm_data)
if response.status_code == 200:
logging.info(f"报警信息成功发送到 {endpoint}")
else:
logging.error(f"报警信息发送到 {endpoint} 失败,状态码:{response.status_code}")
except Exception as e:
logging.error(f"发送报警信息到 {endpoint} 时发生错误:{e}")
2.5.发送报警信息格式
# 构造报警信息
class_names = [model.names[int(cls)] for cls in filtered_classes]
alarm_data = {
"labels": class_names,
"counts": len(filtered_boxes),
"confidences": filtered_confidences.tolist(),
"image": image_to_base64(frame),
"timestamp": timestamp # 添加时间戳字段
}
# 记录日志
logging.info(f"发送报警信息:标签={class_names}, 时间={timestamp}, 类别={filtered_classes}, 置信度={filtered_confidences.tolist()}")
# 发送报警信息
send_alarm(alarm_data)
完整代码参考如下:
import cv2
import torch
import threading
import requests
import time
from datetime import datetime
import json
import logging
import os
from queue import Queue
import base64
from concurrent.futures import ThreadPoolExecutor
from ultralytics import YOLO # 导入YOLO类
# 读取配置文件
def load_config(config_file='config.json'):
"""
从配置文件中加载配置信息。
:param config_file: 配置文件路径
:return: 配置信息字典
"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
config = json.load(f)
return config
except Exception as e:
logging.critical(f"无法加载配置文件:{e}")
raise
# 将图像转换为Base64编码的字符串
def image_to_base64(image):
"""
将图像转换为Base64编码的字符串。
:param image: 图像数组
:return: Base64编码的字符串
"""
try:
_, buffer = cv2.imencode('.jpg', image)
return base64.b64encode(buffer).decode('utf-8')
except Exception as e:
logging.error(f"图像转换失败:{e}")
return None
# 发送报警信息到服务器
def send_alarm(alarm_data, server_endpoints):
"""
发送报警信息到服务器。
:param alarm_data: 包含标签名称、数量、置信度和图像信息的字典
:param server_endpoints: 服务器端点列表
:return: None
"""
headers = {'Content-Type': 'application/json'}
try:
for endpoint in server_endpoints:
response = requests.post(endpoint, json=alarm_data, headers=headers,timeout=10)
if response.status_code == 200:
logging.info(f"报警信息成功发送到 {endpoint}")
else:
logging.error(f"报警信息发送到 {endpoint} 失败,状态码:{response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"发送报警信息到 {endpoint} 时发生网络错误:{e}")
except Exception as e:
logging.error(f"发送报警信息到 {endpoint} 时发生错误:{e}")
# 从RTSP流中读取视频帧并处理
def process_video_stream(rtsp_url, model, config, alarm_queue):
"""
从RTSP流中读取视频帧并处理。
:param rtsp_url: RTSP流的URL
:param model: YOLO模型
:param config: 配置信息字典
:param alarm_queue: 报警队列
:return: None
"""
try:
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
if not cap.isOpened():
logging.error(f"无法打开RTSP视频流:{rtsp_url}")
return
# 初始化重试计数器和重试时间
retry_count = 0
retry_start_time = time.time() # 记录开始重试的时间
# 初始化 FPS 计算器
fps_start_time = time.time()
fps_num_frames = 0
fps_window = []
while True:
# 检查是否超过了重试时间
if time.time() - retry_start_time > config['total_retry_duration'] * 60:
logging.error(f"重试时间超过 {config['total_retry_duration']} 分钟,停止处理。")
break
try:
# 读取视频帧
ret, frame = cap.read()
if not ret:
logging.warning(f"无法获取帧数据:{rtsp_url}")
# 释放资源并重新打开
cap.release()
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
if not cap.isOpened():
# 重试连接
for _ in range(config['max_retries_per_minute']):
if cap.isOpened():
break
logging.error(f"尝试重新打开RTSP视频流:{rtsp_url}")
cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
retry_count += 1
time.sleep(config['retry_interval']) # 每次重试间隔时间
else:
logging.error(f"无法重新打开RTSP视频流:{rtsp_url}")
time.sleep(60 - (time.time() - fps_start_time) % 60) # 等待到下一分钟
continue
continue
# 使用YOLOv8进行检测
results = model(frame, verbose=False)
# 获取检测结果
result = results[0]
boxes = result.boxes.to(model.device) # 将结果移动到同一设备
confidences = boxes.conf.cpu().numpy()
classes = boxes.cls.cpu().numpy().astype(int)
# 筛选置信度大于阈值的结果
high_conf_indices = confidences > config['confidence_threshold']
filtered_boxes = boxes.xyxy[high_conf_indices].cpu().numpy()
filtered_classes = classes[high_conf_indices]
filtered_confidences = confidences[high_conf_indices]
if len(filtered_boxes) > 0:
# 当前时间戳
current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S') # 格式化日期时间
# 构造报警信息
class_names = [model.names[int(cls)] for cls in filtered_classes] # 类名列表
alarm_data = {
"labels": class_names,
"counts": len(filtered_boxes),
"confidences": filtered_confidences.tolist(),
"image": image_to_base64(frame),
"timestamp": current_datetime # 添加格式化的日期时间字段
}
# 记录日志
logging.info(
f"发送报警信息:标签={class_names}, 时间={current_datetime}, 类别={filtered_classes}, 置信度={filtered_confidences.tolist()}")
# 将报警信息放入队列
alarm_queue.put(alarm_data)
# 计算 FPS
fps_num_frames += 1
if (time.time() - fps_start_time) > config['fps_interval']:
fps = fps_num_frames / (time.time() - fps_start_time)
fps_window.append(fps)
if len(fps_window) > config['fps_window_size']:
fps_window.pop(0)
avg_fps = sum(fps_window) / len(fps_window)
logging.info(f"当前 FPS:{avg_fps:.2f}")
fps_start_time = time.time()
fps_num_frames = 0
except cv2.error as e:
logging.error(f"OpenCV错误:{e}")
retry_count += 1
continue
except requests.exceptions.RequestException as e:
logging.error(f"网络请求错误:{e}")
retry_count += 1
continue
except torch.cuda.OutOfMemoryError as e:
logging.error(f"CUDA内存不足:{e}")
retry_count += 1
continue
except Exception as e:
logging.error(f"处理视频流时发生错误:{e}")
retry_count += 1
continue
cap.release()
except Exception as e:
logging.error(f"处理视频流时发生全局异常:{e}")
# 从队列中读取报警信息并发送到服务器
def send_alarms(alarm_queue, server_endpoints):
"""
从队列中读取报警信息并发送到服务器。
:param alarm_queue: 报警队列
:param server_endpoints: 服务器端点列表
:return: None
"""
try:
while True:
alarm_data = alarm_queue.get()
send_alarm(alarm_data, server_endpoints)
alarm_queue.task_done()
except Exception as e:
logging.error(f"发送报警信息时发生异常:{e}")
# 启动多个视频流的检测
def start_detection(rtsp_urls, config):
"""
启动多个视频流的检测。
:param rtsp_urls: RTSP流的URL列表
:param config: 配置信息字典
:return: None
"""
# 创建报警队列
alarm_queue = Queue(maxsize=config['max_workers'])
# 启动报警发送线程
executor = ThreadPoolExecutor(max_workers=config['max_workers'])
for _ in range(config['max_workers']):
executor.submit(send_alarms, alarm_queue, config['server_endpoints'])
# 加载YOLOv8模型,并优先使用GPU,如果没有GPU则使用CPU
device = 'cuda' if torch.cuda.is_available() else 'cpu'
logging.info(f"使用设备:{device}")
model = YOLO(config['model_path']).to(device)
# 启动多个检测线程
for rtsp_url in rtsp_urls:
detection_thread = threading.Thread(target=process_video_stream, args=(rtsp_url, model, config, alarm_queue))
detection_thread.start()
if __name__ == '__main__':
# 加载配置文件
config = load_config()
# 设置日志级别
logging.basicConfig(level=config['log_level'], format='%(asctime)s - %(levelname)s - %(message)s')
# 直接在主程序中启动多个视频流的检测
start_detection(config['rtsp_urls'], config)
配置文件代码:
{
"rtsp_urls": [
"rtsp://admin:XXXXXX@xx.xx.xx.xx:554/Streaming/Channels/101"
],
"server_endpoints": [
"http://localhost:8088/alarm"
],
"model_path": "yolov8n.pt",
"device": "cuda",
"retry_interval": 6,
"max_retries_per_minute": 10,
"total_retry_duration": 30,
"log_level": "INFO",
"confidence_threshold": 0.5,
"fps_interval": 1,
"fps_window_size": 10,
"max_workers": 5
}
服务端接收报警信息(Java端)
服务端是Java编写,主要是接收python端发送的报警信息,报警字段主要有:标签名称、置信度大小、图片、数量和时间等字段,python发送时会根据设置的server_endpoints地址进行发送。例如:http://localhost:8088/alarm。
Java服务端代码:
bean类:
package com.wei.demo1.demo;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
/**
* @BelongsProject: demo1
* @BelongsPackage: com.wei.demo1.demo
* @ClassName AlarmInfo
* @Author: weiq
* @CreateTime: 2024-09-13 14:01
* @Description: TODO
* @Version: 1.0
*/
public class AlarmInfo {
private String[] labels;
private Integer counts;
private String[] confidences;
private String image;
private String timestamp;
public String[] getLabels() {
return labels;
}
public void setLabels(String[] labels) {
this.labels = labels;
}
public Integer getCounts() {
return counts;
}
public void setCounts(Integer counts) {
this.counts = counts;
}
public String[] getConfidences() {
return confidences;
}
public void setConfidences(String[] confidences) {
this.confidences = confidences;
}
..........................
处理类:
package com.wei.demo1.demo;
import com.alibaba.fastjson.JSON;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
public class AlarmReceiver {
public static void main(String[] args) throws Exception {
HttpServer server = HttpServer.create(new InetSocketAddress(8088), 0);
server.createContext("/alarm", new AlarmHandler());
server.setExecutor(null); // creates a default executor
server.start();
System.out.println("Server started and listening on port 8088");
}
static class AlarmHandler implements HttpHandler {
@Override
public void handle(HttpExchange exchange) throws IOException {
if (!"POST".equals(exchange.getRequestMethod())) {
exchange.sendResponseHeaders(405, -1); // Method Not Allowed
return;
}
StringBuilder sb = new StringBuilder();
try (BufferedReader br = new BufferedReader(
new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8))) {
String line;
while ((line = br.readLine()) != null) {
sb.append(line);
}
} catch (IOException e) {
System.err.println("Error reading request body: " + e.getMessage());
exchange.sendResponseHeaders(500, -1); // Internal Server Error
return;
}
String body = sb.toString();
// System.out.println("Received POST data: " + body);
// 使用FastJSON解析JSON字符串
AlarmInfo alarmInfo = null;
try {
alarmInfo = JSON.parseObject(body, AlarmInfo.class);
} catch (Exception e) {
System.err.println("Failed to parse JSON object: " + e.getMessage());
exchange.sendResponseHeaders(400, 0); // Bad Request
return;
}
System.out.println("Received POST data: " + alarmInfo);
// 设置响应头和状态码
exchange.getResponseHeaders().add("Content-Type", "application/json");
exchange.sendResponseHeaders(200, body.length());
// 写入响应体
try (OutputStream responseBody = exchange.getResponseBody()) {
responseBody.write(body.getBytes(StandardCharsets.UTF_8));
}
}
}
}
测试效果
python端发送报警信息
Java服务端接收报警信息
部署
代码测试通过后,我们就可以进行项目完整打包部署工作了,Python项目一般可以可以通过Pyinstaller组件进行打包部署。
步骤一:安装必要的库
安装Pyinstaller库,命令如下:
pip install Pyinstaller -i https://pypi.tuna.tsinghua.edu.cn/simple
步骤二:创建spec文件
创建一个.spec
文件,比如命名为ods.spec
,并在其中添加如下内容来指定如何打包您的应用程序。这里的例子假设您的主Python脚本名为ods.py
。
# -*- mode: python ; coding: utf-8 -*-
from PyInstaller.utils.hooks import collect_data_files
import ultralytics
block_cipher = None
a = Analysis(
['ods.py'],
pathex=[],
binaries=[],
datas=collect_data_files('ultralytics'), # 收集Ultralytics库中的数据文件
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
)
pyz = PYZ(a.pure, a.zipped_data,
cipher=block_cipher)
exe = EXE(pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
name='ods',
debug=False,
strip=False,
upx=True,
runtime_tmpdir=None,
icon=['ods.jpg'],
console=True )
请确保替换ods.py
为您自己的主脚本名,并根据需要调整其他参数。
步骤三:运行spec文件打包
运行下面的命令来使用您创建的spec文件来打包应用程序:
pyinstaller main.spec
打包完成后,会自动生成dist文件,里面有一个exe可执行文件,双击打开运行即可。如果出现缺少配置文件的错误,可手动补充添加进去即可。如下图所示:
步骤四:打包运行测试
双击exe文件,测试运行,Java服务端会实时接收报警的信息,如下图所示:
完成!
后续更新策略执行的代码,敬请期待!!!