YOLOv8模型实时检测RTSP协议视频流并实时发送报警信息到Java服务端实现(超详细)

news2025/1/11 21:43:55

前言

在训练模型完成后,想把模型应用起来,比如模型可以部署到项目中,实时接收RTSP视频流进行识别检测,一旦达到自己所设置的置信度阈值(例如大于0.5),系统就会实时把报警信息发送给服务端,由服务端进行实时响应和策略执行,很像是个企业级应用流程 。

比如:我们在yolov8平台训练完“火焰”模型,想要实时检测并把识别结果发送给服务端进行报警,一旦出现火焰,立即发送服务端进行报警。如下图所示:

 智能识别系统(python端)

首先,获取RTSP视屏流,可以是单个视频流也可以是视频流数组,所有常量都可以通过配置文件的形式进行读取。在yolov8网络模型代码基础上进行调用模型,然后对实时的RTSP协议视屏流进行实时识别,一旦发现有异常,比如出现火焰,并且超过置信度阈值(0.5),就立马发送报警信息到服务端,以下是python代码,主要实现:

2.1.模型调用

   # 加载YOLOv8模型
    model = YOLO('model.pt')

2.2.RTSP协议视频流实时读取

def process_video_stream(rtsp_url):
    """
    从RTSP流中读取视频帧并处理。
    :param rtsp_url: RTSP流的URL
    :return: None
    """
    cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
    if not cap.isOpened():
        logging.error(f"无法打开RTSP视频流:{rtsp_url}")
        return

2.3.异常检测

def start_detection(rtsp_url):
    """
    启动视频流检测线程。
    :param rtsp_url: RTSP流的URL
    """
    detection_thread = threading.Thread(target=process_video_stream, args=(rtsp_url,))
    detection_thread.start()

2.4.实时发送报警信息

def send_alarm(alarm_data):
    """
    发送报警信息到服务器。
    :param alarm_data: 包含标签名称、数量、置信度和图像信息的字典
    :return: None
    """
    for endpoint in SERVER_ENDPOINTS:
        try:
            response = requests.post(endpoint, json=alarm_data)
            if response.status_code == 200:
                logging.info(f"报警信息成功发送到 {endpoint}")
            else:
                logging.error(f"报警信息发送到 {endpoint} 失败,状态码:{response.status_code}")
        except Exception as e:
            logging.error(f"发送报警信息到 {endpoint} 时发生错误:{e}")

2.5.发送报警信息格式

 # 构造报警信息
                class_names = [model.names[int(cls)] for cls in filtered_classes]
                alarm_data = {
                    "labels": class_names,
                    "counts": len(filtered_boxes),
                    "confidences": filtered_confidences.tolist(),
                    "image": image_to_base64(frame),
                    "timestamp": timestamp  # 添加时间戳字段
                }

                # 记录日志
                logging.info(f"发送报警信息:标签={class_names}, 时间={timestamp}, 类别={filtered_classes}, 置信度={filtered_confidences.tolist()}")

                # 发送报警信息
                send_alarm(alarm_data)

完整代码参考如下: 

import cv2
import torch
import threading
import requests
import time
from datetime import datetime
import json
import logging
import os
from queue import Queue
import base64
from concurrent.futures import ThreadPoolExecutor
from ultralytics import YOLO  # 导入YOLO类

# 读取配置文件
def load_config(config_file='config.json'):
    """
    从配置文件中加载配置信息。
    :param config_file: 配置文件路径
    :return: 配置信息字典
    """
    try:
        with open(config_file, 'r', encoding='utf-8') as f:
            config = json.load(f)
        return config
    except Exception as e:
        logging.critical(f"无法加载配置文件:{e}")
        raise

# 将图像转换为Base64编码的字符串
def image_to_base64(image):
    """
    将图像转换为Base64编码的字符串。
    :param image: 图像数组
    :return: Base64编码的字符串
    """
    try:
        _, buffer = cv2.imencode('.jpg', image)
        return base64.b64encode(buffer).decode('utf-8')
    except Exception as e:
        logging.error(f"图像转换失败:{e}")
        return None

# 发送报警信息到服务器
def send_alarm(alarm_data, server_endpoints):
    """
    发送报警信息到服务器。
    :param alarm_data: 包含标签名称、数量、置信度和图像信息的字典
    :param server_endpoints: 服务器端点列表
    :return: None
    """
    headers = {'Content-Type': 'application/json'}
    try:
        for endpoint in server_endpoints:
            response = requests.post(endpoint, json=alarm_data, headers=headers,timeout=10)
            if response.status_code == 200:
                logging.info(f"报警信息成功发送到 {endpoint}")
            else:
                logging.error(f"报警信息发送到 {endpoint} 失败,状态码:{response.status_code}")
    except requests.exceptions.RequestException as e:
        logging.error(f"发送报警信息到 {endpoint} 时发生网络错误:{e}")
    except Exception as e:
        logging.error(f"发送报警信息到 {endpoint} 时发生错误:{e}")
# 从RTSP流中读取视频帧并处理
def process_video_stream(rtsp_url, model, config, alarm_queue):
    """
    从RTSP流中读取视频帧并处理。
    :param rtsp_url: RTSP流的URL
    :param model: YOLO模型
    :param config: 配置信息字典
    :param alarm_queue: 报警队列
    :return: None
    """
    try:
        cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
        if not cap.isOpened():
            logging.error(f"无法打开RTSP视频流:{rtsp_url}")
            return

        # 初始化重试计数器和重试时间
        retry_count = 0
        retry_start_time = time.time()  # 记录开始重试的时间

        # 初始化 FPS 计算器
        fps_start_time = time.time()
        fps_num_frames = 0
        fps_window = []

        while True:
            # 检查是否超过了重试时间
            if time.time() - retry_start_time > config['total_retry_duration'] * 60:
                logging.error(f"重试时间超过 {config['total_retry_duration']} 分钟,停止处理。")
                break

             try:
                # 读取视频帧
                ret, frame = cap.read()
                if not ret:
                    logging.warning(f"无法获取帧数据:{rtsp_url}")
                    # 释放资源并重新打开
                    cap.release()
                    cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
                    if not cap.isOpened():
                        # 重试连接
                        for _ in range(config['max_retries_per_minute']):
                            if cap.isOpened():
                                break
                            logging.error(f"尝试重新打开RTSP视频流:{rtsp_url}")
                            cap = cv2.VideoCapture(rtsp_url, cv2.CAP_FFMPEG)
                            retry_count += 1
                            time.sleep(config['retry_interval'])  # 每次重试间隔时间
                        else:
                            logging.error(f"无法重新打开RTSP视频流:{rtsp_url}")
                            time.sleep(60 - (time.time() - fps_start_time) % 60)  # 等待到下一分钟
                 continue
                    continue

                # 使用YOLOv8进行检测
                results = model(frame, verbose=False)

                # 获取检测结果
                result = results[0]
                boxes = result.boxes.to(model.device)  # 将结果移动到同一设备
                confidences = boxes.conf.cpu().numpy()
                classes = boxes.cls.cpu().numpy().astype(int)

                # 筛选置信度大于阈值的结果
                high_conf_indices = confidences > config['confidence_threshold']
                filtered_boxes = boxes.xyxy[high_conf_indices].cpu().numpy()
                filtered_classes = classes[high_conf_indices]
                filtered_confidences = confidences[high_conf_indices]

                if len(filtered_boxes) > 0:
                    # 当前时间戳
                    current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')  # 格式化日期时间

                    # 构造报警信息
                    class_names = [model.names[int(cls)] for cls in filtered_classes]  # 类名列表
                    alarm_data = {
                        "labels": class_names,
                        "counts": len(filtered_boxes),
                        "confidences": filtered_confidences.tolist(),
                        "image": image_to_base64(frame),
                        "timestamp": current_datetime  # 添加格式化的日期时间字段
                    }
                    # 记录日志
                    logging.info(
                        f"发送报警信息:标签={class_names}, 时间={current_datetime}, 类别={filtered_classes}, 置信度={filtered_confidences.tolist()}")
                    # 将报警信息放入队列
                    alarm_queue.put(alarm_data)

               # 计算 FPS
                fps_num_frames += 1
                if (time.time() - fps_start_time) > config['fps_interval']:
                    fps = fps_num_frames / (time.time() - fps_start_time)
                    fps_window.append(fps)
                    if len(fps_window) > config['fps_window_size']:
                        fps_window.pop(0)
                    avg_fps = sum(fps_window) / len(fps_window)
                    logging.info(f"当前 FPS:{avg_fps:.2f}")
                    fps_start_time = time.time()
                    fps_num_frames = 0

            except cv2.error as e:
                logging.error(f"OpenCV错误:{e}")
                retry_count += 1
                continue
            except requests.exceptions.RequestException as e:
                logging.error(f"网络请求错误:{e}")
                retry_count += 1
                continue
            except torch.cuda.OutOfMemoryError as e:
                logging.error(f"CUDA内存不足:{e}")
                retry_count += 1
                continue
            except Exception as e:
                logging.error(f"处理视频流时发生错误:{e}")
                retry_count += 1
                continue
              
      cap.release()
    except Exception as e:
        logging.error(f"处理视频流时发生全局异常:{e}")

# 从队列中读取报警信息并发送到服务器
def send_alarms(alarm_queue, server_endpoints):
    """
    从队列中读取报警信息并发送到服务器。
    :param alarm_queue: 报警队列
    :param server_endpoints: 服务器端点列表
    :return: None
    """
    try:
        while True:
            alarm_data = alarm_queue.get()
            send_alarm(alarm_data, server_endpoints)
            alarm_queue.task_done()
    except Exception as e:
        logging.error(f"发送报警信息时发生异常:{e}")

# 启动多个视频流的检测
def start_detection(rtsp_urls, config):
    """
    启动多个视频流的检测。
    :param rtsp_urls: RTSP流的URL列表
    :param config: 配置信息字典
    :return: None
    """
    # 创建报警队列
    alarm_queue = Queue(maxsize=config['max_workers'])

    # 启动报警发送线程
    executor = ThreadPoolExecutor(max_workers=config['max_workers'])
    for _ in range(config['max_workers']):
        executor.submit(send_alarms, alarm_queue, config['server_endpoints'])

    # 加载YOLOv8模型,并优先使用GPU,如果没有GPU则使用CPU
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    logging.info(f"使用设备:{device}")
    model = YOLO(config['model_path']).to(device)

    # 启动多个检测线程
    for rtsp_url in rtsp_urls:
        detection_thread = threading.Thread(target=process_video_stream, args=(rtsp_url, model, config, alarm_queue))
        detection_thread.start()

if __name__ == '__main__':
    # 加载配置文件
    config = load_config()

    # 设置日志级别
    logging.basicConfig(level=config['log_level'], format='%(asctime)s - %(levelname)s - %(message)s')

    # 直接在主程序中启动多个视频流的检测
    start_detection(config['rtsp_urls'], config)

配置文件代码:

{
    "rtsp_urls": [
        "rtsp://admin:XXXXXX@xx.xx.xx.xx:554/Streaming/Channels/101"
    ],
    "server_endpoints": [
        "http://localhost:8088/alarm"
    ],
    "model_path": "yolov8n.pt",
    "device": "cuda",
    "retry_interval": 6,
    "max_retries_per_minute": 10,
    "total_retry_duration": 30,
    "log_level": "INFO",
    "confidence_threshold": 0.5,
    "fps_interval": 1,
    "fps_window_size": 10,
    "max_workers": 5
}

服务端接收报警信息(Java端)

服务端是Java编写,主要是接收python端发送的报警信息,报警字段主要有:标签名称、置信度大小、图片、数量和时间等字段,python发送时会根据设置的server_endpoints地址进行发送。例如:http://localhost:8088/alarm。

Java服务端代码:

bean类:

package com.wei.demo1.demo;

import java.util.Arrays;
import java.util.Date;
import java.util.List;

/**
 * @BelongsProject: demo1
 * @BelongsPackage: com.wei.demo1.demo
 * @ClassName AlarmInfo
 * @Author: weiq
 * @CreateTime: 2024-09-13  14:01
 * @Description: TODO
 * @Version: 1.0
 */
public class AlarmInfo {
    private String[] labels;
    private Integer  counts;
    private String[] confidences;
    private String image;
    private String timestamp;

    public String[] getLabels() {
        return labels;
    }

    public void setLabels(String[] labels) {
        this.labels = labels;
    }

    public Integer getCounts() {
        return counts;
    }

    public void setCounts(Integer counts) {
        this.counts = counts;
    }

    public String[] getConfidences() {
        return confidences;
    }

    public void setConfidences(String[] confidences) {
        this.confidences = confidences;
    }
    ..........................

处理类:

package com.wei.demo1.demo;
import com.alibaba.fastjson.JSON;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Type;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;

public class AlarmReceiver {

    public static void main(String[] args) throws Exception {
        HttpServer server = HttpServer.create(new InetSocketAddress(8088), 0);
        server.createContext("/alarm", new AlarmHandler());
        server.setExecutor(null); // creates a default executor
        server.start();
        System.out.println("Server started and listening on port 8088");
    }

   static class AlarmHandler implements HttpHandler {
        @Override
        public void handle(HttpExchange exchange) throws IOException {
            if (!"POST".equals(exchange.getRequestMethod())) {
                exchange.sendResponseHeaders(405, -1); // Method Not Allowed
                return;
            }

            StringBuilder sb = new StringBuilder();
            try (BufferedReader br = new BufferedReader(
                    new InputStreamReader(exchange.getRequestBody(), StandardCharsets.UTF_8))) {
                String line;
                while ((line = br.readLine()) != null) {
                    sb.append(line);
                }
            } catch (IOException e) {
                System.err.println("Error reading request body: " + e.getMessage());
                exchange.sendResponseHeaders(500, -1); // Internal Server Error
                return;
            }

           String body = sb.toString();
//            System.out.println("Received POST data: " + body);
            // 使用FastJSON解析JSON字符串
            AlarmInfo alarmInfo = null;
            try {
                alarmInfo = JSON.parseObject(body, AlarmInfo.class);
            } catch (Exception e) {
                System.err.println("Failed to parse JSON object: " + e.getMessage());
                exchange.sendResponseHeaders(400, 0); // Bad Request
                return;
            }

            System.out.println("Received POST data: " + alarmInfo);

            // 设置响应头和状态码
            exchange.getResponseHeaders().add("Content-Type", "application/json");
            exchange.sendResponseHeaders(200, body.length());

            // 写入响应体
            try (OutputStream responseBody = exchange.getResponseBody()) {
                responseBody.write(body.getBytes(StandardCharsets.UTF_8));
            }
        }
    }
}

测试效果

python端发送报警信息

Java服务端接收报警信息

完成!

后续更新策略执行的代码,敬请期待!!! 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2144142.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux抢占调度

目录 抢占流程 抢占时机 用户态抢占时机 1、 从系统调用返回用户空间 2、 从中断返回用户空间 内核态抢占时机 1、中断处理程序返回内核空间 可以看到最终是到了 preempt_schedule_irq 2、当内核从non-preemptible(禁止抢占)状态变成pr…

唤醒金融数据中台:我的数据驱动秘籍

目录 一、明析业务痛点和机会点二、数据驱动精准化营销三、一体化数据平台——整合金融数据1. 数据整合与标准化2. 数据服务与共享3.业务体系集中化 四、强化金融数据安全,筑牢数据保护防线 在当今数字化时代的大潮中,数据无疑是金融行业最耀眼的财富。作…

(娱乐)魔改浏览器-任务栏图标右上角加提示徽章

一、目标: windows中,打开chromium,任务栏中会出现一个chromium的图标。我们的目标是给这个图标的右上角,加上"有1条新消息"的小提示图标,也叫徽章(badge)注意:本章节纯属娱乐,有需要…

道路横幅检测数据集 2000张 街道横幅 带标注 voc yolo

项目背景: 城市中的街道横幅通常用于广告宣传、公共通知等目的,但在某些情况下,它们也可能影响交通安全或市容市貌。因此,对街道横幅进行自动化检测不仅可以帮助城市管理机构及时发现并处理不当悬挂的横幅,还可以辅助…

12.Java基础概念-面向对象-static

欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 Facts speak louder than words! 一、static关键字的含义…

葡萄叶病害检测系统源码分享

葡萄叶病害检测检测系统源码分享 [一条龙教学YOLOV8标注好的数据集一键训练_70全套改进创新点发刊_Web前端展示] 1.研究背景与意义 项目参考AAAI Association for the Advancement of Artificial Intelligence 项目来源AACV Association for the Advancement of Computer V…

无人机之飞行高度篇

无人机的飞行高度受到多种因素的制约,包括无人机本身的性能、无线信号的强度与稳定性,以及国家相关的法律法规等。具体而言,不同类型的无人机有不同的飞行高度限制: 微型无人机:飞行高度一般不得超过50米。这类无人机…

新生们必看!大学开学必备清单,教你快人一步适应学校生活

新生们,开学的脚步临近,你们是否已经准备好迎接全新的校园生活了呢?即将是一段充满挑战和机遇的旅程,为了让大家能够更快地适应新环境,我们特别整理了大学开学必备清单,教你快人一步适应学校生活。新生们必…

[C语言]第十节 函数栈帧的创建和销毁一基础知识到高级技巧的全景探索

10.1. 什么是函数栈帧 我们在写 C 语言代码的时候,经常会把一个独立的功能抽象为函数,所以 C 程序是以函数为基本单位的。 那函数是如何调用的?函数的返回值又是如何待会的?函数参数是如何传递的?这些问题都和函数栈帧…

Flask-JWT-Extended登录验证

1. 介绍 """安装:pip install Flask-JWT-Extended创建对象 初始化与app绑定jwt JWTManager(app) # 初始化JWTManager设置 Cookie 的选项:除了设置 cookie 的名称和值之外,你还可以指定其他的选项,例如:过期时间 (max_age)&…

VulhubSkyTower靶机详解

项目地址 https://download.vulnhub.com/skytower/SkyTower.zip项目配置 我们下载一个VirtualBox,这是官网 Downloads – Oracle VirtualBox 安装到默认路径就行 打开后点击注册 选择解压后的vbox文件 然后点击左上角管理 点击导出虚拟电脑,选中后…

Vue(12)——路由的基本使用

VueRouter 作用:修改地址栏路径时,切换显示匹配的组件 基本步骤(固定) 下载:下载VueRouter模块到当前工程引入安装注册创建路由对象注入,将路由对象注入到new Vue 实例中,建立关联 发现了#/表…

移动端如何实现智能语音交互

智能语音交互(Intelligent Speech Interaction)是基于语音识别、语音合成、自然语言理解等技术,为企业在多种实际应用场景下,赋予产品“能听、会说、懂你”式的智能人机交互功能。适用于智能问答、智能质检、法庭庭审实时记录、实…

CICD 持续集成与持续交付

目录 一 CICD是什么 1.1 持续集成(Continuous Integration) 1.2 持续部署(Continuous Deployment) 1.3 持续交付(Continuous Delivery) 二 git工具使用 2.1 git简介 2.2 git 工作流程 三 部署git …

IntelliJ IDEA 2024.1 新特性下载安装激活方法

概述 IntelliJ IDEA 2024.1 发布了一系列令人期待新特性,可以帮助您提高开发效率。比如:全行代码补全、SpringBean 补全和自动装配、多语句内联端点、新版终端、编辑器中粘性行、AI Assistant 编码助手、改进的日志工作流、重命名嵌入提示、为整行代码提…

【北京迅为】《STM32MP157开发板使用手册》- 第三十三章Cortex-M4 DMA实验

iTOP-STM32MP157开发板采用ST推出的双核cortex-A7单核cortex-M4异构处理器,既可用Linux、又可以用于STM32单片机开发。开发板采用核心板底板结构,主频650M、1G内存、8G存储,核心板采用工业级板对板连接器,高可靠,牢固耐…

《锐捷AP 胖模式配置示例》

目录 WEB配置方式: 1. 登录 AP 管理界面 2. 配置无线服务 3. 配置射频参数 4. 配置 VLAN (如果需要) 5. 配置 IP 地址 6. 其他高级设置(根据需求) 命令行配置: 1. 进入特权模式 2. 进入全局配置模式 3. 配置管理 IP 地址 4. 创建无线 SSID 5. 配置 SSID 加密…

Selenium打开浏览器后闪退问题解决

笔者这两天在做一个自动化方案,用来优化数据统计。其中一部分数据需要通过云上堡垒机跳转访问,而这个堡垒机在笔者日常使用的火狐浏览器上运行不是很正常(表现在有些复制粘贴按钮显示不太灵敏)。 但在Edge浏览器上基本正常&#…

工行软件开发中心积极推进低代码平台建设,助力金融业务快速研发

工行软件开发中心融合现有研发体系,打造全链路可视化研发。平台整体架构建立于行内新一代前后端分离研发体系之上,引入可视化技术,构建业务研发资产,承接现有服务体系,基于数据模型驱动技术及代码扩展能力,快速实现应用开发,并整合行内研发支撑体系,实现应用的快速构建…

python定时发送邮件的功能如何实现自动化?

Python定时发送邮件教程?如何用Python发送电子邮件? Python定时发送邮件不仅能够帮助我们自动处理日常的邮件发送任务,还能在特定时间点触发邮件发送,确保信息的及时传达。AokSend将详细探讨如何利用Python实现定时发送邮件的自动…