检测摄像头的fps

news2025/1/11 11:11:01

需求

项目中经常遇到不是摄像头就是网线的问题,曾经遇到一个项目算法日志一直报 warning,经过好几个小时的远程排查,发现是摄像头的 fps 不稳定,而且出现 fps 逐渐降低的情况,所以算法跑着跑着就挂了。

于是就需要开发一个测试 fps 的工具,工具倒是不复杂,主要依赖 opencv-python 库读取摄像头视频流。

代码

最基础版本

import argparse
import sys
import time

import cv2

parser = argparse.ArgumentParser()
ip = parser.add_argument("-i", "--ip", type=str)
args = parser.parse_args()

frame_id = 0

cap = cv2.VideoCapture(f"rtsp://username:password@{args.ip}:554/Streaming/Channels/101")
while True:
    try:
        ret, frame = cap.read()
        if not ret:
            break
        else:
            frame_id += 1

            if frame_id < 100:
                remainder = frame_id % 5
                if remainder == 0:
                    print(f"wait:{'-' * (frame_id // 5)}>", end="\r")
            if frame_id == 100:
                time_start = time.time()

            if frame_id > 100:
                time_end = time.time()
                print(
                    f"fps: {(frame_id - 100) / (time_end - time_start)}",
                    end="\r",
                )
    except KeyboardInterrupt:
        exit(-1)

\r 是回车符,光标会回到行首。但是只有一串数字输出,所以经过我的改良有了下面的一版:
改良版本

import argparse
import atexit
import sys
import time
from collections import OrderedDict
from multiprocessing import Event, Process, Queue
from typing import Any, List

import cv2
import keyboard

parser = argparse.ArgumentParser()
ip = parser.add_argument("-i", "--ip", type=str)
args = parser.parse_args()
alive_set = Event()

exit_msg = "\nAll processes have exited."


def test_fps(args: Any, event: Event, fps_q: Queue, name: str):
    frame_id = 0

    cap = cv2.VideoCapture(
        f"rtsp://username:password@{args.ip}:554/Streaming/Channels/101"
    )
    string = ""
    current_time = time.time()
    if cap.isOpened():
        print(f"Enter `q` to exit...\n{'-' * 40}")
    else:
        print("The camera is not opened yet.")
        exit(0)

    while True:
        try:
            ret, _ = cap.read()
            if not ret:
                break
            else:
                frame_id += 1

                if frame_id < 100:
                    remainder = frame_id % 5
                    first_100_time = time.time()

                    if remainder == 0 and current_time < first_100_time:
                        string = (
                            f"wait: {(first_100_time - current_time) * 1000:.4f} ms"
                        )
                if frame_id == 100:
                    time_start = time.time()
                    event.set()
                    string = ""

                if frame_id > 100:
                    time_end = time.time()
                    string = f"{(frame_id - 100) / (time_end - time_start):.8f}"

            fps_q.put([name, string])

        except KeyboardInterrupt:
            exit(0)


def test_time(time_start: float, event: Event, time_q: Queue, name: str):
    while True:
        try:
            if event.is_set():
                try:
                    time_now = time.time()
                    cost = f"{time_now - time_start:.3f}s."
                    time_q.put([name, cost])
                except (KeyboardInterrupt, Exception) as e:
                    exit(0)
        except (KeyboardInterrupt, Exception) as e:
            exit(0)


def print_progress(progress: "OrderedDict"):
    # sys.stdout.write("\033[2J\033[H")  # clear screen
    out_list = []
    for name, data in progress.items():
        if data:
            out_list.append(f"{name}: {data}")

    string = ""
    if len(out_list) == 1:
        string = "\r" + out_list[0].replace("fps:", "").strip()

    elif len(out_list) > 1:
        string = "\r" + " | ".join(out_list)
    else:
        string = "\r" + "waiting..."

    sys.stdout.write(string)
    sys.stdout.flush()


def kill_pid(processes: List[Process]):
    try:
        for p in processes:
            p.terminate()

    except Exception as e:
        sys.stderr.write(f"Detailed error: {e}")
        exit(0)


def main():
    print("Test the fps of camera...")
    status = Queue(maxsize=2)
    progress = OrderedDict()
    processes = []

    try:
        time_start = time.time()
        tasks = [
            Process(target=test_fps, args=(args, alive_set, status, "fps"), name="fps"),
            Process(
                target=test_time,
                args=(time_start, alive_set, status, "time"),
                name="time",
            ),
        ]

        for t in tasks:
            t.start()
            processes.append(t)
            progress[t.name] = None

        while any(i.is_alive() for i in tasks):
            while not status.empty():
                try:
                    name, data = status.get()
                    progress[name] = data
                    print_progress(progress)

                    keyboard.add_hotkey("q", kill_pid, args=(processes,))

                except KeyboardInterrupt:
                    break

    except KeyboardInterrupt:
        atexit.register(kill_pid, *(processes,))
        print(exit_msg)
        exit(0)


if __name__ == "__main__":
    main()
    print(exit_msg)

改良版使用了 多进程队列 通信和 Event 以及 keyboard 库,支持动态单行局部字符串一直刷新和 q 键退出代码的功能。

结果

$ python.exe .\test_fps.py -i 192.168.0.67
Test the fps of camera...
Enter `q` to exit...
----------------------------------------
fps: 20.03542377 | time: 43.861s.
All processes have exited.

下面有动图:
请添加图片描述
最后推荐一个好用的windows下的录屏工具,上面的gif就是这个工具录制的。ScreenToGif. 再次感叹开源的伟大。🤓

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/981658.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LeetCode每日一题:1123. 最深叶节点的最近公共祖先(2023.9.6 C++)

目录 1123. 最深叶节点的最近公共祖先 题目描述&#xff1a; 实现代码与解析&#xff1a; dfs 原理思路&#xff1a; 1123. 最深叶节点的最近公共祖先 题目描述&#xff1a; 给你一个有根节点 root 的二叉树&#xff0c;返回它 最深的叶节点的最近公共祖先 。 回想一下&…

linux 下 C++ 与三菱PLC 通过MC Qna3E 二进制 协议进行交互

西门子plc 有snap7库 进行交互&#xff0c;并且支持c 而且跨平台。但是三菱系列PLC并没有现成的开源项目&#xff0c;没办法只能自己拼接&#xff0c;我这里实现了MC 协议 Qna3E 帧&#xff0c;并使用二进制进行交互。 #pragma once#include <stdio.h> #include <std…

linux c++ 开发 - 05- 使用CMake创建一个动态库

外层CMakeList.txt中的内容&#xff1a; cmake_minimum_required(VERSION 3.16) PROJECT(HELLO) ADD_SUBDIRECTORY(lib bin)lib中CMakeLists.txt中的内容&#xff1a; SET(LIBHELLO_SRC hello.cpp) ADD_LIBRARY(hello SHARED ${LIBHELLO_SRC})hello.h: hello.cpp: ADD_LIBR…

Liquid Studio 2023.2 Crack

Liquid Studio 提供了用于XML和JSON开发 的高级工具包以及Web 服务测试、数据映射和数据转换工具。 开发环境包含一整套用于设计 XML 和 JSON 数据结构和模式的工具。这些工具提供编辑、验证和高级转换功能。对于新手或专家来说&#xff0c;直观的界面和全面的功能将帮助您节省…

C++回顾录

代码随想录 (programmercarl.com) 数组和内存 数组是存放在连续内存空间上的相同类型数据的集合。 数组可以方便的通过下标索引的方式获取到下标下对应的数据。 举一个字符数组的例子&#xff0c;如图所示&#xff1a; 数组可以方便的通过下标索引的方式获取到下标下对应的…

使用docker搭建owncloud Harbor 构建镜像

1、使用mysql:5.6和 owncloud 镜像&#xff0c;构建一个个人网盘。 2、安装搭建私有仓库 Harbor 3、编写Dockerfile制作Web应用系统nginx镜像&#xff0c;生成镜像nginx:v1.1&#xff0c;并推送其到私有仓库。具体要求如下&#xff1a; &#xff08;1&#xff09;基于centos基础…

Scala面向对象编程(高级部分)

1. 静态属性和静态方法 &#xff08;1&#xff09;回顾Java中的静态概念 public static 返回值类型 方法名(参数列表) {方法体} 静态属性… 说明: Java中静态方法并不是通过对象调用的&#xff0c;而是通过类对象调用的&#xff0c;所以静态操作并不是面向对象的。 &#xff0…

SpringBoot 统一异常处理

1. 统一返回结果集 package com.liming.pojo;import com.liming.exception.AppExceptionCodeMsg; import lombok.Data;/*** 全局统一返回结果类* author 黎明* date 2023/9/6 14:11* version 1.0*/ Data public class Result<T> {private Integer code; // 状态码privat…

C#模拟PLC设备运行

涉及&#xff1a;控件数据绑定&#xff0c;动画效果 using System; using System.Windows.Forms;namespace PLCUI {public partial class MainForm : Form{ public MainForm(){InitializeComponent();}private void MainForm_Load(object sender, EventArgs e){// 方式2&#x…

基于SpringBoot的校园失物招领系统

基于SpringBooVuet的校园失物招领系统&#xff0c;前后端分离 附万字文档 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBoot、Vue、Mybaits Plus、ELementUI工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 角色&#xff1a;管理员、用户 管理员  …

线性空间、子空间、基、基坐标、过渡矩阵

线性空间的定义 满足加法和数乘封闭。也就是该空间的所有向量都满足乘一个常数后或者和其它向量相加后仍然在这个空间里。进一步可以理解为该空间中的所有向量满足加法和数乘的组合封闭。即若 V 是一个线性空间&#xff0c;则首先需满足&#xff1a; 注&#xff1a;线性空间里面…

AR工业远程巡查系统:实时监控设备状态,及时发现潜在问题

随着工业4.0的到来&#xff0c;先进的技术和创新的解决方案正在改变着工业生产的方式。其中&#xff0c;增强现实&#xff08;AR&#xff09;技术带来的工业巡检系统就是一个典型的例子。这种系统通过在现实世界中添加虚拟信息&#xff0c;使得操作人员能够更有效地进行检查和维…

leetcode 1002. 查找共用字符

2023.9.6 个人感觉这题难度不止简单&#xff0c;考察到的东西还是挺多的。 首先理解题意&#xff0c;可以将题意转化为&#xff1a;求字符串数组中 各字符串共同出现的字符的最小值。 分为三步做&#xff1a; 构造一个哈希表hash&#xff0c;初始化第一个字符串的字母出现频率…

2020年12月 C/C++(一级)真题解析#中国电子学会#全国青少年软件编程等级考试

C/C编程&#xff08;1~8级&#xff09;全部真题・点这里 第1题&#xff1a;字符三角形 描述 给定一个字符&#xff0c;用它构造一个底边长5个字符&#xff0c;高3个字符的等腰字符三角形。 输入 输入只有一行&#xff0c; 包含一个字符。 输出 该字符构成的等腰三角形&#xff…

Zookeeper简述

数新网络-让每个人享受数据的价值 官网现已全新升级—欢迎访问&#xff01; 前 言 ZooKeeper是一个开源的、高可用的、分布式的协调服务&#xff0c;由Apache软件基金会维护。它旨在帮助管理和协调分布式系统和应用程序&#xff0c;提供了一个可靠的平台&#xff0c;用于处理…

苹果与芯片巨头Arm达成20年新合作协议,将继续采用芯片技术

9月6日消息&#xff0c;据外媒报道&#xff0c;芯片设计巨头Arm宣布在当地时间周二提交给美国证券交易委员会&#xff08;SEC&#xff09;的最新IPO文件中&#xff0c;透露与苹果达成了一项长达20年的新合作协议&#xff0c;加深了双方之间的合作关系。 报道称&#xff0c;虽然…

Informatica使用操作流程及Expression(表达式转换)案例2

操作流程 ①定义源<Odbc01_oracle:employees> ②定义目标<EDW_EMPLOYEES> ③创建映射<M_ORACLE_EDW01_employees> ④定义任务<S_ORCL_EDW01_employees> ⑤创建工作流<W_ORCL_EDW01_employees> ⑥工作流调度监控 ⑦查验数据 一、需求&…

js---16-----JavaScript中的类型转换机制

、类型转换机制是什么&#xff1f; JS中有六种简单数据类型&#xff1a;undefined、null、bollean、string、number、symbol&#xff0c;以及引用类型object 但是我们声明的时候只有一种数据类型&#xff0c;只用运行期间才会确定当前类型。 上面代码中&#xff0c;x的值在编…

浅谈redis未授权漏洞

redis未授权漏洞 利用条件 版本比较高的redis需要修改redis的配置文件&#xff0c;将bind前面#注释符去掉&#xff0c;将protected-mode 后面改为no 写入webshell config get dir #查看redis数据库路径 config set dir web路径# #修改靶机Redis数据库路径 config set dbfilen…

巧用抽象类与接口,打造高效Java程序(上)

White graces&#xff1a;个人主页 &#x1f649;专栏推荐:《C语言入门知识》&#x1f649; &#x1f649; 内容推荐:&#x1f649; &#x1f439;今日诗词:十年花骨东风泪&#xff0c;几点螺香素壁尘&#x1f439; 目录 &#x1f338;思维导图&#x1f338; &#x1f338;…