目录
- 一、Mosquitto中的QoS定义
- QoS1和3区别
- 二、安装base64库
- 三、cjson简介
- 四、主程序代码
- 五、调用Mosquitto库使用的cmakelist
更多内容详见 【MQTT协议】使用c++实现mqtt协议(Mosquitto源码编译)
一、Mosquitto中的QoS定义
MQTT协议中的QoS(Quality of Service)表示消息传输的服务质量等级,它是MQTT协议中非常重要的一个概念。
MQTT协议中定义了三个不同等级的QoS:
QoS 0:最多一次(At most once)传输。消息发布者只发送一次消息,不进行确认,也不关心消息是否到达订阅者。这种QoS等级的消息传输效率最高,但可靠性最低。
QoS 1:最少一次(At least once)传输。消息发布者会发送消息,并等待确认。如果消息没有被确认,会再次发送,直到收到确认为止。这种QoS等级的消息传输可靠性较高,但效率稍低。
QoS 2:恰好一次(Exactly once)传输。消息发布者发送消息,并等待确认。如果没有收到确认,会再次发送,直到收到确认为止。订阅者接收到消息后,会发送一个确认消息给发布者。如果发布者没有收到确认,会再次发送消息,直到收到确认为止。这种QoS等级的消息传输可靠性最高,但效率最低。
QoS1和3区别
在MQTT协议中,客户端和服务端可以通过协商来确定消息传输的QoS等级,以满足消息传输效率和可靠性的要求。
MQTT协议中的QoS 1和QoS 2都保证了消息的可靠性,但它们之间有一些区别。
QoS 1(最少一次)保证了消息至少会被传输一次,但不保证消息一定会被准确地传输一次。在QoS 1级别下,发布者发送消息并等待一个确认,如果没有收到确认,会再次发送消息,直到收到确认为止。但是,如果确认消息丢失或延迟,可能会导致发布者不断地发送相同的消息。因此,QoS 1级别下可能会出现重复的消息。
QoS 2(恰好一次)保证了消息会被恰好传输一次,不会出现重复的消息。在QoS 2级别下,发布者发送消息并等待一个确认。订阅者收到消息后,会发送一个确认消息给发布者。如果发布者没有收到确认,会再次发送消息,直到收到确认为止。如果订阅者收到重复的消息,可以通过消息中的标识符进行去重,以保证只处理一次。
综上所述,QoS 2级别下的消息传输可靠性更高,但是会带来更大的网络开销和延迟。在选择QoS等级时,需要根据具体的应用需求来确定。
二、安装base64库
base64库用于完成图片与base64之间的转换工作。
git clone https://github.com/ReneNyffenegger/cpp-base64.git
得到base64.h及base64.cpp,直接在工程中进行引用即可。
三、cjson简介
由于Mosquitto传递字节消息,需使用CJSON完成对象的序列化:
如果需要将C语言结构体转化为JSON字符串,需要手动遍历结构体中的成员,并使用CJSON提供的接口将它们转化为JSON对象。
cjson库在上一篇博文中已安装。这里使用cjson获取字符串实现mqtt节点间的信息传递
四、主程序代码
视频在拉流后,视频帧到不同算法间的关系是发布/订阅关系,
下面代码进行了2节点简单模拟,文中对于Mosquitto原始库做了一个简易封装,了解逻辑即可
#include "mqtt-client.hpp"
#include <opencv2/opencv.hpp>
#include <base64.h>
#include "cJSON.h"
#include <fstream>
using namespace std;
string matToBase64(const cv::Mat& image) {
// 将图像编码为base64字符串
vector<uchar> data;
cv::imencode(".jpg", image, data);
string base64_img = base64_encode(data.data(), data.size());
return base64_img;
}
cv::Mat base64ToMat(const std::string& base64_str) {
// 解码base64字符串
std::string decoded_str = base64_decode(base64_str);
// 将解码后的字符串转换为cv::Mat
cv::Mat image = cv::imdecode(std::vector<uchar>(decoded_str.begin(), decoded_str.end()), cv::IMREAD_COLOR);
return image;
}
int main(){
typedef struct {
int id;
std::string pic;
} Student;
// 读取PNG图片为Mat
cv::Mat image = cv::imread("/home/trtlearn/mqtt-client/workspace/pic.jpg");
string base64_img = matToBase64(image);
// 创建一个JSON对象
cJSON *root = cJSON_CreateObject();
// 将结构体成员转化为JSON对象
cJSON_AddNumberToObject(root, "num", 0.75);
cJSON_AddStringToObject(root, "pic", base64_img.c_str());
// 将JSON对象转换为JSON字符串
char* json_str = cJSON_Print(root);
cJSON_Delete(root);
//新建两个客户端,其中一个发布消息接收数值,另一个接收图片。其中第一个节点即是发布者又是订阅者。
string strPic(json_str);
MQTTInitializer _;
MQTTConnectCredential credential;
credential.client_id = "wish";
credential.subscribe_topic = "cc";
credential.server_address = "127.0.0.1";
credential.port = 9999;
MQTTConnectCredential credential2;
credential2.client_id = "wish1";
credential2.subscribe_topic = "cc";
credential2.server_address = "127.0.0.1";
credential2.port = 9999;
auto client = create_mqtt_client(credential);
auto client2 = create_mqtt_client(credential2);
if(client == nullptr){
printf("Failed to create client.\n");
return -1;
}
if(client2 == nullptr){
printf("Failed to create client2.\n");
return -1;
}
printf("Create client success.\n");
//设置订阅后,收到消息时的回调函数
client->set_message_callback([](MQTTClient* client, std::string topic, std::string message){
cJSON *root = cJSON_Parse(message.c_str());
cJSON *num = cJSON_GetObjectItem(root, "num");
printf("客户端1收到 [%s] 数值: %f\n", topic.c_str(), (float)(num->valuedouble));
cJSON_Delete(root);
});
client2->set_message_callback([&](MQTTClient* client, std::string topic, std::string message){
cJSON *root = cJSON_Parse(message.c_str());
cJSON *pics = cJSON_GetObjectItem(root, "pic");
cv::Mat image=base64ToMat(string(pics->valuestring));
if (!image.empty()) {
cv::imwrite("receiver.jpg",image);
}
printf("客户端2收到 [%s] 图片\n", topic.c_str());
cJSON_Delete(root);
});
while(true){
char c = getchar();
if(c == 'q'){
break;
}
if(c == 's'){
if(client->publish("cc", strPic)){
printf("Publish success\n");
}else{
printf("Publish failed\n");
}
}
}
printf("Done.\n");
return 0;
}
运行程序,输入s发布数值与图片信息,打印订阅输出结果:
五、调用Mosquitto库使用的cmakelist
cmake_minimum_required(VERSION 3.0)
project(pro)
add_definitions(-std=c++11)
option(CUDA_USE_STATIC_CUDA_RUNTIME OFF)
set(CMAKE_CXX_STANDARD 11)
#set(CMAKE_BUILD_TYPE Release)
set(CMAKE_BUILD_TYPE Debug)
set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/workspace)
set(OpenCV_DIR "/usr/local/opencv/lib64/cmake/opencv4")
set(mosquitto_Include "/usr/local/mosquitto/include/")
set(CJSON_INCLUDE "/usr/local/include/cjson")
set(mosquitto_DIR "/usr/local/mosquitto")
set(mosquitto_LIBS mosquitto mosquittopp)
set(CJSON_LIBS cjson)
find_package(OpenCV)
include_directories(
${PROJECT_SOURCE_DIR}/src
${OpenCV_INCLUDE_DIRS}
${mosquitto_Include}
${CJSON_INCLUDE}
)
link_directories(
${TENSORRT_DIR}/lib
${CUDA_DIR}/lib64
${CUDNN_DIR}/lib
${mosquitto_DIR}/lib
)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11 -Wall -O0 -Wfatal-errors -pthread -w -g")
#递归地添加的相关文件
file(GLOB_RECURSE cpp_srcs ${PROJECT_SOURCE_DIR}/src/*.cpp)
file(GLOB_RECURSE c_srcs ${PROJECT_SOURCE_DIR}/src/*.c)
add_executable(pro ${cpp_srcs} ${c_srcs})
target_link_libraries(pro ${OpenCV_LIBS} ${mosquitto_LIBS} ${CJSON_LIBS})
add_custom_target(
run
DEPENDS pro
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}/workspace
COMMAND ./pro
)