【MQTT协议】使用Mosquitto实现mqtt协议(二):编写视频帧的发布/订阅服务

news2025/1/23 1:09:43

目录

  • 一、Mosquitto中的QoS定义
    • QoS1和3区别
  • 二、安装base64库
  • 三、cjson简介
  • 四、主程序代码
  • 五、调用Mosquitto库使用的cmakelist

更多内容详见 【MQTT协议】使用c++实现mqtt协议(Mosquitto源码编译)

一、Mosquitto中的QoS定义

MQTT协议中的QoS(Quality of Service)表示消息传输的服务质量等级,它是MQTT协议中非常重要的一个概念。
MQTT协议中定义了三个不同等级的QoS:

QoS 0:最多一次(At most once)传输。消息发布者只发送一次消息,不进行确认,也不关心消息是否到达订阅者。这种QoS等级的消息传输效率最高,但可靠性最低。

QoS 1:最少一次(At least once)传输。消息发布者会发送消息,并等待确认。如果消息没有被确认,会再次发送,直到收到确认为止。这种QoS等级的消息传输可靠性较高,但效率稍低。

QoS 2:恰好一次(Exactly once)传输。消息发布者发送消息,并等待确认。如果没有收到确认,会再次发送,直到收到确认为止。订阅者接收到消息后,会发送一个确认消息给发布者。如果发布者没有收到确认,会再次发送消息,直到收到确认为止。这种QoS等级的消息传输可靠性最高,但效率最低。

QoS1和3区别

在MQTT协议中,客户端和服务端可以通过协商来确定消息传输的QoS等级,以满足消息传输效率和可靠性的要求。
MQTT协议中的QoS 1和QoS 2都保证了消息的可靠性,但它们之间有一些区别。
QoS 1(最少一次)保证了消息至少会被传输一次,但不保证消息一定会被准确地传输一次。在QoS 1级别下,发布者发送消息并等待一个确认,如果没有收到确认,会再次发送消息,直到收到确认为止。但是,如果确认消息丢失或延迟,可能会导致发布者不断地发送相同的消息。因此,QoS 1级别下可能会出现重复的消息。
QoS 2(恰好一次)保证了消息会被恰好传输一次,不会出现重复的消息。在QoS 2级别下,发布者发送消息并等待一个确认。订阅者收到消息后,会发送一个确认消息给发布者。如果发布者没有收到确认,会再次发送消息,直到收到确认为止。如果订阅者收到重复的消息,可以通过消息中的标识符进行去重,以保证只处理一次。
综上所述,QoS 2级别下的消息传输可靠性更高,但是会带来更大的网络开销和延迟。在选择QoS等级时,需要根据具体的应用需求来确定。

二、安装base64库

base64库用于完成图片与base64之间的转换工作。
git clone https://github.com/ReneNyffenegger/cpp-base64.git
得到base64.h及base64.cpp,直接在工程中进行引用即可。

三、cjson简介

由于Mosquitto传递字节消息,需使用CJSON完成对象的序列化:
如果需要将C语言结构体转化为JSON字符串,需要手动遍历结构体中的成员,并使用CJSON提供的接口将它们转化为JSON对象。
cjson库在上一篇博文中已安装。这里使用cjson获取字符串实现mqtt节点间的信息传递

四、主程序代码

视频在拉流后,视频帧到不同算法间的关系是发布/订阅关系,
下面代码进行了2节点简单模拟,文中对于Mosquitto原始库做了一个简易封装,了解逻辑即可

#include "mqtt-client.hpp"
#include <opencv2/opencv.hpp>
#include <base64.h>
#include "cJSON.h"
#include <fstream>
using namespace std;
string matToBase64(const cv::Mat& image) {
    // 将图像编码为base64字符串
    vector<uchar> data;
    cv::imencode(".jpg", image, data);
    string base64_img = base64_encode(data.data(), data.size());
    return base64_img;
}
cv::Mat base64ToMat(const std::string& base64_str) {
    // 解码base64字符串
    std::string decoded_str = base64_decode(base64_str);
    // 将解码后的字符串转换为cv::Mat
    cv::Mat image = cv::imdecode(std::vector<uchar>(decoded_str.begin(), decoded_str.end()), cv::IMREAD_COLOR);
    return image;
}
int main(){  
    typedef struct {
    int id;
    std::string pic;
    } Student;
    // 读取PNG图片为Mat
    cv::Mat image = cv::imread("/home/trtlearn/mqtt-client/workspace/pic.jpg");
    string base64_img = matToBase64(image);
    // 创建一个JSON对象
    cJSON *root = cJSON_CreateObject();
    // 将结构体成员转化为JSON对象
    cJSON_AddNumberToObject(root, "num", 0.75);
    cJSON_AddStringToObject(root, "pic", base64_img.c_str());
    // 将JSON对象转换为JSON字符串
    char* json_str = cJSON_Print(root);
    cJSON_Delete(root);
    //新建两个客户端,其中一个发布消息接收数值,另一个接收图片。其中第一个节点即是发布者又是订阅者。
    string strPic(json_str);
    MQTTInitializer _;
    MQTTConnectCredential credential;
    credential.client_id = "wish";
    credential.subscribe_topic = "cc";
    credential.server_address = "127.0.0.1";
    credential.port = 9999;
    MQTTConnectCredential credential2;
    credential2.client_id = "wish1";
    credential2.subscribe_topic = "cc";
    credential2.server_address = "127.0.0.1";
    credential2.port = 9999;
    auto client = create_mqtt_client(credential);
    auto client2 = create_mqtt_client(credential2);

    if(client == nullptr){
        printf("Failed to create client.\n");
        return -1;
    }
    if(client2 == nullptr){
        printf("Failed to create client2.\n");
        return -1;
    }
    printf("Create client success.\n");
    //设置订阅后,收到消息时的回调函数
    client->set_message_callback([](MQTTClient* client, std::string topic, std::string message){
        cJSON *root = cJSON_Parse(message.c_str());
        cJSON *num = cJSON_GetObjectItem(root, "num");
        printf("客户端1收到 [%s] 数值: %f\n", topic.c_str(), (float)(num->valuedouble));
        cJSON_Delete(root);
    });
    client2->set_message_callback([&](MQTTClient* client, std::string topic, std::string message){
        cJSON *root = cJSON_Parse(message.c_str());
        cJSON *pics = cJSON_GetObjectItem(root, "pic");
        cv::Mat image=base64ToMat(string(pics->valuestring));
    if (!image.empty()) {
        cv::imwrite("receiver.jpg",image);
    }
        printf("客户端2收到 [%s] 图片\n", topic.c_str());
        cJSON_Delete(root);
    });
    while(true){
        char c = getchar();
        if(c == 'q'){
            break;
        }

        if(c == 's'){
            if(client->publish("cc", strPic)){
                printf("Publish success\n");
            }else{
                printf("Publish failed\n");
            }
        }
    }
    printf("Done.\n");
    return 0;
}

运行程序,输入s发布数值与图片信息,打印订阅输出结果:
在这里插入图片描述

五、调用Mosquitto库使用的cmakelist

cmake_minimum_required(VERSION 3.0)
project(pro)
add_definitions(-std=c++11)

option(CUDA_USE_STATIC_CUDA_RUNTIME OFF)
set(CMAKE_CXX_STANDARD 11)
#set(CMAKE_BUILD_TYPE Release)
set(CMAKE_BUILD_TYPE Debug)
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/workspace)

set(OpenCV_DIR   "/usr/local/opencv/lib64/cmake/opencv4")
set(mosquitto_Include   "/usr/local/mosquitto/include/")
set(CJSON_INCLUDE   "/usr/local/include/cjson")
set(mosquitto_DIR   "/usr/local/mosquitto")
set(mosquitto_LIBS mosquitto mosquittopp)
set(CJSON_LIBS cjson)

find_package(OpenCV)

include_directories(
    ${PROJECT_SOURCE_DIR}/src
    ${OpenCV_INCLUDE_DIRS}
    ${mosquitto_Include}
    ${CJSON_INCLUDE}
)

link_directories(
    ${TENSORRT_DIR}/lib
    ${CUDA_DIR}/lib64
    ${CUDNN_DIR}/lib
    ${mosquitto_DIR}/lib
)

set(CMAKE_CXX_FLAGS  "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -O0 -Wfatal-errors -pthread -w -g")

#递归地添加的相关文件
file(GLOB_RECURSE cpp_srcs ${PROJECT_SOURCE_DIR}/src/*.cpp)
file(GLOB_RECURSE c_srcs ${PROJECT_SOURCE_DIR}/src/*.c)

add_executable(pro ${cpp_srcs} ${c_srcs})

target_link_libraries(pro ${OpenCV_LIBS} ${mosquitto_LIBS} ${CJSON_LIBS})

add_custom_target(
    run
    DEPENDS pro
    WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/workspace
    COMMAND ./pro
)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/425611.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CLIP论文拜读及理解

文章目录一、clip论文阅读二、prompt1.除prompt之外的预训练语言模型2.prompt2.1. prompt定义2.2. prompt类型2.3. prompt重构2.3.1 prompt template2.3.2 Answer engineering2.4 多个 prompt的使用2.5 prompt的训练总结参考&#xff08;博文、论文&#xff09;一、clip论文阅读…

Windows系统安装WSL,并安装docker服务

背景 因为工作需要&#xff0c;要在电脑上执行sh脚本&#xff0c;并启动docker服务执行具体逻辑。因为我的电脑是windows系统&#xff0c;对做本任务来说&#xff0c;比较吃力&#xff0c;所以想到使用wsl&#xff0c;让windows电脑具有linux电脑的能力。 什么是 WSL 2 WSL 2 …

JVM的类加载的过程以及双亲委派模型

目录 1、加载 &#xff08;加载字节码文件&#xff0c;生成.class对象&#xff09; 2、验证&#xff08;验证Class文件是否符合规范&#xff09; 3、准备 &#xff08;为静态变量分配内存并设置变量初始值&#xff09; 4、解析&#xff08;初始化常量池中的一些常量&#…

索引的分类

1.唯一索引 给表中某一列设置为了唯一约束(这列不允许出现重复数据)后&#xff0c;数据库会为将这一列设置索引&#xff0c;这个索引叫做唯一索引&#xff08;主键那一列是一个特殊的唯一索引&#xff0c;不仅要满足唯一索引这一列不可以出现重复数据&#xff0c;而且这一列还…

Android opencv

install cmake cpp folder,新建c项目 获取 OpenCV4Android SDK O4A_SDK 下载&#xff0c;并解压 ~/Downloads/OpenCV-android-sdk$ tree -d -L 2 . ├── apk ├── samples │ ├── 15-puzzle │ ├── camera-calibration │ ├── color-blob-detection │ ├…

文件:IO流

1. 什么是IO /O 即输入Input/ 输出Output的缩写&#xff0c;其实就是计算机调度把各个存储中&#xff08;包括内存和外部存储&#xff09;的数据写入写出的过程&#xff1b;java中用“流&#xff08;stream&#xff09;”来抽象表示这么一个写入写出的功能&#xff0c;封装成一…

Vue学习笔记(7. axios异步请求)

1. axios请求方式 方式1&#xff1a;axios({method:"GET",url:"..."}) 方式2&#xff1a;axios({method:"POST",url:"...",data:"..."}) 方式3&#xff1a;axios.get("url...") 方式4&#xff1a;axios.post(…

Python中的35个关键字

✅作者简介&#xff1a;CSDN内容合伙人、阿里云专家博主、51CTO专家博主、新星计划第三季python赛道Top1&#x1f3c6; &#x1f4c3;个人主页&#xff1a;hacker707的csdn博客 &#x1f525;系列专栏&#xff1a;零基础入门篇 &#x1f4ac;个人格言&#xff1a;不断的翻越一座…

【 Spring MVC 核心功能(二) - 获取参数(下)】

文章目录一、使用 RequestBody 接收JSON对象二、使用 RequestPart 上传⽂件三、获取 Cookie四、获取 Header五、存储和获取 Session5.1 存储 Session5.2 获取 Session一、使用 RequestBody 接收JSON对象 有时客户端会通过 post 方式发送 json 格式的请求&#xff0c;那后端就可…

2.3.5双链表

单链表vs双链表 就是既有前驱指针也有后继指针&#xff0c;由line改为double。 双链表的插入怎么实现&#xff1f; s->nextp->next; p->next->priors s->priorp //把p赋给s的前驱指针指向的位置 p->nexts; 如果p刚好是最后一个元素。 p->next->prio…

基于ArcGIS的电子地图矢量化方法

一、电子地图及纸质地图矢量化的目的 地图数据来源有很多&#xff0c;其中栅格数据数字化是地图数据的重要来源。栅格数据的矢量化包括地理配准以及矢量化。矢量化后的地图数据往往可以为我们的空间统计分析提供实验依据&#xff0c;从而探究地理分布的时空差异性。 空间参考&a…

完整指南:如何安装Man手册

Man手册简介 man手册是Unix和类Unix操作系统中的命令行工具&#xff0c;用于提供关于特定命令、函数和文件的帮助文档。它通常包含命令的语法、选项、参数、示例以及其他相关信息。man手册可以通过在终端输入"man"命令&#xff0c;后跟要查看的命令或函数名称来访问…

huggingface transformer模型介绍

总结&#xff1a; 模型提高性能&#xff1a;新的目标函数&#xff0c;mask策略等一系列tricks Transformer 模型系列 自从2017&#xff0c;原始Transformer模型激励了大量新的模型&#xff0c;不止NLP任务&#xff0c;还包括预测蛋白质结构&#xff0c;时间序列预测。 有些模…

灌区流量监测设备-中小灌区节水改造

系统概述 灌区信息化管理系统主要对对灌区的水情、雨情、土壤墒情、气象等信息进行监测&#xff0c;对重点区域进行视频监控&#xff0c;同时对泵站、闸门进行远程控制&#xff0c;实现了信息的测量、统计、分析、控制、调度等功能。为灌区管理部门科学决策提供了依据&#xf…

Python 无监督学习实用指南:6~10

原文&#xff1a;Hands-on unsupervised learning with Python 协议&#xff1a;CC BY-NC-SA 4.0 译者&#xff1a;飞龙 本文来自【ApacheCN 深度学习 译文集】&#xff0c;采用译后编辑&#xff08;MTPE&#xff09;流程来尽可能提升效率。 不要担心自己的形象&#xff0c;只关…

23年5月高项学习笔记12 —— 干系人管理

过程&#xff1a; 1. 识别干系人&#xff1a;定期识别干系人&#xff0c;分析和记录他们的利益&#xff0c;参与度、相互依赖性、影响力和对项目的潜在的影响 输入&#xff1a;立项管理文件、沟通管理计划、干系人参与计划、需求文件、变更日志、问题日志、协议&#xff08;协…

今天给大家介绍一篇基于springboot的医院管理系统的设计与实现

临近学期结束&#xff0c;你还在做java程序网络编程&#xff0c;期末作业&#xff0c;老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。这里根据疫情当下&#xff0c;你想解决的问题&#xff0c;今天给大家介绍一篇基…

2023/4/10总结

线段树 线段树是一种二叉树&#xff0c;通俗易懂的来说就是对于一个线段&#xff0c;我们会用一个二叉树来表示。线段树是一种工具&#xff0c;她能把对区间&#xff08;线段&#xff09;的修改与维护从0(N)的时间复杂度变成0(logN)。 如图&#xff1a; 如上图&#xff0c;我…

MYSQL SQL语句优化技术技巧

MySQL是一种流行的关系型数据库管理系统&#xff0c;它提供了各种各样的SQL语句优化技术&#xff0c;下面是一些常见的优化技巧&#xff1a; 1. 使用索引 索引可以大大提高查询性能。在MySQL中&#xff0c;可以使用CREATE INDEX语句在列上创建索引。当查询包含WHERE子句并且…

Google SEO 搜索中心

在公司发展还没有那么成熟的时候&#xff0c;也许你的测试网站是外网可以公开访问的&#xff0c;也许你网站中的机密图片在测试环境&#xff08;不小心上到正式环境&#xff09;却被搜索引擎无情抓取&#xff0c;以及有些内部用户才能使用的网址&#xff0c;你并不想被搜索引擎…