基于YOLOv8的RTSP视频流实时目标检测与告警系统设计与实现(超详细)

news2024/11/13 6:32:29

前言

在训练模型完成后,想把模型应用起来,比如模型可以部署到项目中,实时接收RTSP视频流进行识别检测,一旦达到自己所设置的置信度阈值(例如大于0.5),系统就会实时把报警信息发送给服务端,由服务端进行实时响应和策略执行,很像是个企业级应用流程 。

比如:我们在yolov8平台训练完“火焰”模型,想要实时检测并把识别结果发送给服务端进行报警,一旦出现火焰,立即发送服务端进行报警。如下图所示:

 智能识别系统(python端)

首先,获取RTSP视屏流,可以是单个视频流也可以是视频流数组,所有常量都可以通过配置文件的形式进行读取。在yolov8网络模型代码基础上进行调用模型,然后对实时的RTSP协议视屏流进行实时识别,一旦发现有异常,比如出现火焰,并且超过置信度阈值(0.5),就立马发送报警信息到服务端,以下是python代码,主要实现:

2.1.模型调用

   # 加载YOLOv8模型
    model = YOLO('model.pt')

2.2.RTSP协议视频流实时读取

def process_video_stream(rtsp_url):
    """
    从RTSP流中读取视频帧并处理。
    :param rtsp_url: RTSP流的URL
    :return: None
    """
    cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
    if not cap.isOpened():
        logging.error(f"无法打开RTSP视频流:{rtsp_url}")
        return

2.3.异常检测

def start_detection(rtsp_url):
    """
    启动视频流检测线程。
    :param rtsp_url: RTSP流的URL
    """
    detection_thread = threading.Thread(target=process_video_stream, args=(rtsp_url,))
    detection_thread.start()

2.4.实时发送报警信息

def send_alarm(alarm_data):
    """
    发送报警信息到服务器。
    :param alarm_data: 包含标签名称、数量、置信度和图像信息的字典
    :return: None
    """
    for endpoint in SERVER_ENDPOINTS:
        try:
            response = requests.post(endpoint, json=alarm_data)
            if response.status_code == 200:
                logging.info(f"报警信息成功发送到 {endpoint}")
            else:
                logging.error(f"报警信息发送到 {endpoint} 失败,状态码:{response.status_code}")
        except Exception as e:
            logging.error(f"发送报警信息到 {endpoint} 时发生错误:{e}")

2.5.发送报警信息格式

 # 构造报警信息
                class_names = [model.names[int(cls)] for cls in filtered_classes]
                alarm_data = {
                    "labels": class_names,
                    "counts": len(filtered_boxes),
                    "confidences": filtered_confidences.tolist(),
                    "image": image_to_base64(frame),
                    "timestamp": timestamp  # 添加时间戳字段
                }

                # 记录日志
                logging.info(f"发送报警信息:标签={class_names}, 时间={timestamp}, 类别={filtered_classes}, 置信度={filtered_confidences.tolist()}")

                # 发送报警信息
                send_alarm(alarm_data)

完整代码参考如下: 

import cv2
import torch
import threading
import requests
import time
from datetime import datetime
import json
import logging
import os
from queue import Queue
import base64
from concurrent.futures import ThreadPoolExecutor
from ultralytics import YOLO  # 导入YOLO类

# 读取配置文件
def load_config(config_file='config.json'):
    """
    从配置文件中加载配置信息。
    :param config_file: 配置文件路径
    :return: 配置信息字典
    """
    try:
        with open(config_file, 'r', encoding='utf-8') as f:
            config = json.load(f)
        return config
    except Exception as e:
        logging.critical(f"无法加载配置文件:{e}")
        raise

# 将图像转换为Base64编码的字符串
def image_to_base64(image):
    """
    将图像转换为Base64编码的字符串。
    :param image: 图像数组
    :return: Base64编码的字符串
    """
    try:
        _, buffer = cv2.imencode('.jpg', image)
        return base64.b64encode(buffer).decode('utf-8')
    except Exception as e:
        logging.error(f"图像转换失败:{e}")
        return None

# 发送报警信息到服务器
def send_alarm(alarm_data, server_endpoints):
    """
    发送报警信息到服务器。
    :param alarm_data: 包含标签名称、数量、置信度和图像信息的字典
    :param server_endpoints: 服务器端点列表
    :return: None
    """
    headers = {'Content-Type': 'application/json'}
    try:
        for endpoint in server_endpoints:
            response = requests.post(endpoint, json=alarm_data, headers=headers,timeout=10)
            if response.status_code == 200:
                logging.info(f"报警信息成功发送到 {endpoint}")
            else:
                logging.error(f"报警信息发送到 {endpoint} 失败,状态码:{response.status_code}")
    except requests.exceptions.RequestException as e:
        logging.error(f"发送报警信息到 {endpoint} 时发生网络错误:{e}")
    except Exception as e:
        logging.error(f"发送报警信息到 {endpoint} 时发生错误:{e}")
# 从RTSP流中读取视频帧并处理
def process_video_stream(rtsp_url, model, config, alarm_queue):
    """
    从RTSP流中读取视频帧并处理。
    :param rtsp_url: RTSP流的URL
    :param model: YOLO模型
    :param config: 配置信息字典
    :param alarm_queue: 报警队列
    :return: None
    """
    try:
        cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
        if not cap.isOpened():
            logging.error(f"无法打开RTSP视频流:{rtsp_url}")
            return

        # 初始化重试计数器和重试时间
        retry_count = 0
        retry_start_time = time.time()  # 记录开始重试的时间

        # 初始化 FPS 计算器
        fps_start_time = time.time()
        fps_num_frames = 0
        fps_window = []

        while True:
            # 检查是否超过了重试时间
            if time.time() - retry_start_time > config['total_retry_duration'] * 60:
                logging.error(f"重试时间超过 {config['total_retry_duration']} 分钟,停止处理。")
                break

             try:
                # 读取视频帧
                ret, frame = cap.read()
                if not ret:
                    logging.warning(f"无法获取帧数据:{rtsp_url}")
                    # 释放资源并重新打开
                    cap.release()
                    cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
                    if not cap.isOpened():
                        # 重试连接
                        for _ in range(config['max_retries_per_minute']):
                            if cap.isOpened():
                                break
                            logging.error(f"尝试重新打开RTSP视频流:{rtsp_url}")
                            cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
                            retry_count += 1
                            time.sleep(config['retry_interval'])  # 每次重试间隔时间
                        else:
                            logging.error(f"无法重新打开RTSP视频流:{rtsp_url}")
                            time.sleep(60 - (time.time() - fps_start_time) % 60)  # 等待到下一分钟
                 continue
                    continue

                # 使用YOLOv8进行检测
                results = model(frame, verbose=False)

                # 获取检测结果
                result = results[0]
                boxes = result.boxes.to(model.device)  # 将结果移动到同一设备
                confidences = boxes.conf.cpu().numpy()
                classes = boxes.cls.cpu().numpy().astype(int)

                # 筛选置信度大于阈值的结果
                high_conf_indices = confidences > config['confidence_threshold']
                filtered_boxes = boxes.xyxy[high_conf_indices].cpu().numpy()
                filtered_classes = classes[high_conf_indices]
                filtered_confidences = confidences[high_conf_indices]

                if len(filtered_boxes) > 0:
                    # 当前时间戳
                    current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')  # 格式化日期时间

                    # 构造报警信息
                    class_names = [model.names[int(cls)] for cls in filtered_classes]  # 类名列表
                    alarm_data = {
                        "labels": class_names,
                        "counts": len(filtered_boxes),
                        "confidences": filtered_confidences.tolist(),
                        "image": image_to_base64(frame),
                        "timestamp": current_datetime  # 添加格式化的日期时间字段
                    }
                    # 记录日志
                    logging.info(
                        f"发送报警信息:标签={class_names}, 时间={current_datetime}, 类别={filtered_classes}, 置信度={filtered_confidences.tolist()}")
                    # 将报警信息放入队列
                    alarm_queue.put(alarm_data)

               # 计算 FPS
                fps_num_frames += 1
                if (time.time() - fps_start_time) > config['fps_interval']:
                    fps = fps_num_frames / (time.time() - fps_start_time)
                    fps_window.append(fps)
                    if len(fps_window) > config['fps_window_size']:
                        fps_window.pop(0)
                    avg_fps = sum(fps_window) / len(fps_window)
                    logging.info(f"当前 FPS:{avg_fps:.2f}")
                    fps_start_time = time.time()
                    fps_num_frames = 0

            except cv2.error as e:
                logging.error(f"OpenCV错误:{e}")
                retry_count += 1
                continue
            except requests.exceptions.RequestException as e:
                logging.error(f"网络请求错误:{e}")
                retry_count += 1
                continue
            except torch.cuda.OutOfMemoryError as e:
                logging.error(f"CUDA内存不足:{e}")
                retry_count += 1
                continue
            except Exception as e:
                logging.error(f"处理视频流时发生错误:{e}")
                retry_count += 1
                continue
              
      cap.release()
    except Exception as e:
        logging.error(f"处理视频流时发生全局异常:{e}")

# 从队列中读取报警信息并发送到服务器
def send_alarms(alarm_queue, server_endpoints):
    """
    从队列中读取报警信息并发送到服务器。
    :param alarm_queue: 报警队列
    :param server_endpoints: 服务器端点列表
    :return: None
    """
    try:
        while True:
            alarm_data = alarm_queue.get()
            send_alarm(alarm_data, server_endpoints)
            alarm_queue.task_done()
    except Exception as e:
        logging.error(f"发送报警信息时发生异常:{e}")

# 启动多个视频流的检测
def start_detection(rtsp_urls, config):
    """
    启动多个视频流的检测。
    :param rtsp_urls: RTSP流的URL列表
    :param config: 配置信息字典
    :return: None
    """
    # 创建报警队列
    alarm_queue = Queue(maxsize=config['max_workers'])

    # 启动报警发送线程
    executor = ThreadPoolExecutor(max_workers=config['max_workers'])
    for _ in range(config['max_workers']):
        executor.submit(send_alarms, alarm_queue, config['server_endpoints'])

    # 加载YOLOv8模型,并优先使用GPU,如果没有GPU则使用CPU
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    logging.info(f"使用设备:{device}")
    model = YOLO(config['model_path']).to(device)

    # 启动多个检测线程
    for rtsp_url in rtsp_urls:
        detection_thread = threading.Thread(target=process_video_stream, args=(rtsp_url, model, config, alarm_queue))
        detection_thread.start()

if __name__ == '__main__':
    # 加载配置文件
    config = load_config()

    # 设置日志级别
    logging.basicConfig(level=config['log_level'], format='%(asctime)s - %(levelname)s - %(message)s')

    # 直接在主程序中启动多个视频流的检测
    start_detection(config['rtsp_urls'], config)

配置文件代码:

{
    "rtsp_urls": [
        "rtsp://admin:XXXXXX@xx.xx.xx.xx:554/Streaming/Channels/101"
    ],
    "server_endpoints": [
        "http://localhost:8088/alarm"
    ],
    "model_path": "yolov8n.pt",
    "device": "cuda",
    "retry_interval": 6,
    "max_retries_per_minute": 10,
    "total_retry_duration": 30,
    "log_level": "INFO",
    "confidence_threshold": 0.5,
    "fps_interval": 1,
    "fps_window_size": 10,
    "max_workers": 5
}

服务端接收报警信息(Java端)

服务端是Java编写,主要是接收python端发送的报警信息,报警字段主要有:标签名称、置信度大小、图片、数量和时间等字段,python发送时会根据设置的server_endpoints地址进行发送。例如:http://localhost:8088/alarm。

Java服务端代码:

bean类:

package com.wei.demo1.demo;

import java.util.Arrays;
import java.util.Date;
import java.util.List;

/**
 * @BelongsProject: demo1
 * @BelongsPackage: com.wei.demo1.demo
 * @ClassName AlarmInfo
 * @Author: weiq
 * @CreateTime: 2024-09-13  14:01
 * @Description: TODO
 * @Version: 1.0
 */
public class AlarmInfo {
    private String[] labels;
    private Integer  counts;
    private String[] confidences;
    private String image;
    private String timestamp;

    public String[] getLabels() {
        return labels;
    }

    public void setLabels(String[] labels) {
        this.labels = labels;
    }

    public Integer getCounts() {
        return counts;
    }

    public void setCounts(Integer counts) {
        this.counts = counts;
    }

    public String[] getConfidences() {
        return confidences;
    }

    public void setConfidences(String[] confidences) {
        this.confidences = confidences;
    }
    ..........................

处理类:

package com.wei.demo1.demo;
import com.alibaba.fastjson.JSON;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;

public class AlarmReceiver {

    public static void main(String[] args) throws Exception {
        HttpServer server = HttpServer.create(new InetSocketAddress(8088), 0);
        server.createContext("/alarm", new AlarmHandler());
        server.setExecutor(null); // creates a default executor
        server.start();
        System.out.println("Server started and listening on port 8088");
    }

   static class AlarmHandler implements HttpHandler {
        @Override
        public void handle(HttpExchange exchange) throws IOException {
            if (!"POST".equals(exchange.getRequestMethod())) {
                exchange.sendResponseHeaders(405, -1); // Method Not Allowed
                return;
            }

            StringBuilder sb = new StringBuilder();
            try (BufferedReader br = new BufferedReader(
                    new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8))) {
                String line;
                while ((line = br.readLine()) != null) {
                    sb.append(line);
                }
            } catch (IOException e) {
                System.err.println("Error reading request body: " + e.getMessage());
                exchange.sendResponseHeaders(500, -1); // Internal Server Error
                return;
            }

           String body = sb.toString();
//            System.out.println("Received POST data: " + body);
            // 使用FastJSON解析JSON字符串
            AlarmInfo alarmInfo = null;
            try {
                alarmInfo = JSON.parseObject(body, AlarmInfo.class);
            } catch (Exception e) {
                System.err.println("Failed to parse JSON object: " + e.getMessage());
                exchange.sendResponseHeaders(400, 0); // Bad Request
                return;
            }

            System.out.println("Received POST data: " + alarmInfo);

            // 设置响应头和状态码
            exchange.getResponseHeaders().add("Content-Type", "application/json");
            exchange.sendResponseHeaders(200, body.length());

            // 写入响应体
            try (OutputStream responseBody = exchange.getResponseBody()) {
                responseBody.write(body.getBytes(StandardCharsets.UTF_8));
            }
        }
    }
}

测试效果

python端发送报警信息

Java服务端接收报警信息

部署

代码测试通过后,我们就可以进行项目完整打包部署工作了,Python项目一般可以可以通过Pyinstaller组件进行打包部署。

步骤一:安装必要的库

安装Pyinstaller库,命令如下:

pip install Pyinstaller -i https://pypi.tuna.tsinghua.edu.cn/simple

步骤二:创建spec文件

创建一个.spec文件,比如命名为ods.spec,并在其中添加如下内容来指定如何打包您的应用程序。这里的例子假设您的主Python脚本名为ods.py。 

# -*- mode: python ; coding: utf-8 -*-

from PyInstaller.utils.hooks import collect_data_files
import ultralytics

block_cipher = None

a = Analysis(
    ['ods.py'],
    pathex=[],
    binaries=[],
    datas=collect_data_files('ultralytics'),  # 收集Ultralytics库中的数据文件
    hiddenimports=[],
    hookspath=[],
    hooksconfig={},
    runtime_hooks=[],
    excludes=[],
    win_no_prefer_redirects=False,
    win_private_assemblies=False,
    cipher=block_cipher,
    noarchive=False,
)

pyz = PYZ(a.pure, a.zipped_data,
             cipher=block_cipher)

exe = EXE(pyz,
          a.scripts,
          a.binaries,
          a.zipfiles,
          a.datas,
          name='ods',
          debug=False,
          strip=False,
          upx=True,
          runtime_tmpdir=None,
          icon=['ods.jpg'],
          console=True )

请确保替换ods.py为您自己的主脚本名,并根据需要调整其他参数。 

步骤三:运行spec文件打包

 运行下面的命令来使用您创建的spec文件来打包应用程序:

pyinstaller main.spec

打包完成后,会自动生成dist文件,里面有一个exe可执行文件,双击打开运行即可。如果出现缺少配置文件的错误,可手动补充添加进去即可。如下图所示:

步骤四:打包运行测试

 双击exe文件,测试运行,Java服务端会实时接收报警的信息,如下图所示:

完成!

后续更新策略执行的代码,敬请期待!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2145948.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于web的工作管理系统设计与实现

博主介绍:专注于Java vue .net php phython 小程序 等诸多技术领域和毕业项目实战、企业信息化系统建设,从业十五余年开发设计教学工作 ☆☆☆ 精彩专栏推荐订阅☆☆☆☆☆不然下次找不到哟 我的博客空间发布了1000毕设题目 方便大家学习使用 感兴趣的…

01 Vim 编辑器的简单使用

目前在类liunx系统上,我们使用比较多的是 vim 编辑器。vim 具有程序编辑的能力,可以主动的以字体颜色辨别语法的正确性,方便程序设计。 文章目录 1 vim介绍2 vim 三种模式3 常用快捷键一般模式操作:切换模式操作:增删…

体感魂斗罗(一)

文章目录 体感魂斗罗实现步骤设备读取摄像头视频流使用电脑摄像头读取局域网内手机摄像头效果示意IP摄像头底部工具栏 体感魂斗罗实现步骤 目前想到的有如下步骤 读取摄像头视频流图像检测人体关键点关键点转换为人体姿势固定姿势转换键盘键位 设备 摄像头(可用手…

[数据集][目标检测]文本表格检测数据集VOC+YOLO格式6688张5类别

数据集格式:Pascal VOC格式YOLO格式(不包含分割路径的txt文件,仅仅包含jpg图片以及对应的VOC格式xml文件和yolo格式txt文件) 图片数量(jpg文件个数):6688 标注数量(xml文件个数):6688 标注数量(txt文件个数):6688 标注…

上半年亏损扩大/百亿资产重组终止,路畅科技如何“脱困”?

在智能网联汽车市场形势一片大好的前提下,路畅科技上半年的营收却出现了下滑,并且亏损也进一步扩大。 2024年半年度报告显示,路畅科技营业收入1.35亿元,同比下滑7.83%;实现归属上市公司股东的净利润为亏损2491.99万元…

【oj刷题】二分查找篇:二分查找算法的原理和应用场景

前言: 二分查找算法,又称折半查找算法,是一种在有序数组中查找特定元素的高效查找方法。它通过将搜索区间不断缩小一半,从而在对数时间内找到目标元素。二分查找是基于分治策略的一种典型应用,能够高效的处理许多问题&…

软考高级:嵌入式系统调度算法 AI 解读

嵌入式系统中的调度算法用于管理任务的执行顺序,确保系统资源能够有效分配。以下是几种常见的调度算法的通俗讲解。 生活化例子 想象你是一位超市收银员,有很多顾客排队,每位顾客都可以看作一个任务,收银台就是你的处理器。你需…

1.1 软件测试 + AI

欢迎大家订阅【软件测试】学习专栏,开启你的软件测试学习之旅! 文章目录 前言一、软件测试二、人工智能的引入 前言 人工智能的引入为软件测试带来了巨大的变革,不仅提升了测试效率和准确性,也为软件质量的保障提供了新的手段。通…

通信工程学习:什么是ONT光网络终端

ONT:光网络终端 ONT(Optical Network Terminal,光网络终端)是光纤接入网络(FTTH)中的关键设备,用于将光纤信号转换为电信号或将电信号转换为光信号,以实现用户设备与光纤网络的连接。…

华为OD机试 - 返回矩阵中非1的元素个数 - 广度优先搜索BFS(Python/JS/C/C++ 2024 E卷 200分)

华为OD机试 2024E卷题库疯狂收录中,刷题点这里 专栏导读 本专栏收录于《华为OD机试真题(Python/JS/C/C)》。 刷的越多,抽中的概率越大,私信哪吒,备注华为OD,加入华为OD刷题交流群,…

最长连续子序列 - 华为OD统一考试(E卷)

OD统一考试(E卷) 分值: 100分 题解: Java / Python / C 2024华为OD机试(E卷D卷C卷)最新题库【超值优惠】Java/Python/C合集 题目描述 有N个正整数组成的一个序列。给定整数sum,求长度最长的连续…

WIFI路由器的套杆天线简谈

❝本次推文简单介绍下WIFI路由器的套杆天线。 路由器天线 路由器在这个万物互联的时代,想必大家对其都不陌生。随着科技的发展,常用的路由器上的天线也越来越多,那么问题来了:天线越多,信号越好吗?路由器…

前端mock了所有……

目录 一、背景描述 二、开发流程 1.引入Mock 2.创建文件 3.需求描述 4.Mock实现 三、总结 一、背景描述 前提: 事情是这样的,老板想要我们写一个demo拿去路演/拉项目,有一些数据,希望前端接一下,写几个表格&a…

Linux进程间通信——探索共享内存—— 剖析原理, 学习接口应用

前言:本节内容主要讲解进程间通信的, systemV版本下的共享内存。 共享内存,顾名思义, 其实就是一块内存, 它不同于管道是一个文件。 所以它的传输速度是很快的。 因为管道是文件,有缓冲区, 而共…

Day99 代码随想录打卡|动态规划篇--- 01背包问题

题目(卡玛网T46): 小明是一位科学家,他需要参加一场重要的国际科学大会,以展示自己的最新研究成果。他需要带一些研究材料,但是他的行李箱空间有限。这些研究材料包括实验设备、文献资料和实验样本等等&am…

LeRobot - 让现实机器人更易学

文章目录 一、关于 LeRobot特点模拟环境中预训练模型的示例 致谢教程 - Getting Started with Real-World Robots 二、安装三、Walkthrough1、可视化数据集2、LeRobotDataset的格式3、评估预先训练的策略4、训练你自己的政策复制最先进的(SOTA) 四、贡献…

Vue3 中 Aos 动画同时触发的解决办法

文章目录 问题现象解决之后的效果解决办法问题猜测 问题现象 我总共有四行数据,每一行都是一个动画,但是触发第一个之后其他三个也都触发了 我想要的效果是:动画从底部出现的时候触发一个动画,不要都触发掉 解决之后的效果 解决…

智慧卫生间系统:引领公共卫生间管理的新时代@卓振思众

随着城市化进程的加快,公共卫生间的使用频率不断增加。如何提升公共卫生间的使用体验、管理效率以及卫生水平,已成为各地政府和管理者关注的焦点。智慧卫生间系统应运而生,成为解决这一问题的重要工具。它结合了物联网技术和智能管理理念&…

四、Cookie 和 Session

文章目录 1. Cookie 饼干1.1 什么是 Cookie?1.2 如何创建 Cookie1.3 服务器如何获取 Cookie1.4 Cookie 值的修改1.5 浏览器查看 Cookie1.6 Cookie 生命控制(指浏览器中Cookie的存在时间)1.7 Cookie 有效路径 Path 的设置 2. Session 会话2.1 什么是 Ses…

Canopen-pn有线通信标准在汽车制造中至关重要

电子元件越来越多地被集成到车辆中,从而实现与物联网世界的连接。该行业中主要的高速串行接口方法包括控制器局域网 (CAN) 总线 。CAN 是运输应用中使用的一种强大的总线标准。它旨在允许微控制器(MCU) 和相关组件与彼此的应用程序进行通信。这无需系统具有主机即可…