MQTT客户端应用编程及接口分析

news2025/1/17 3:44:59

MQTT客户端应用编程及接口分析

MQTT协议简介

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。

客户端服务端安装

1.安装

sudo apt-add-repository ppa:mosquitto-dev/mosquitto-ppasudo 
sudo apt-get install mosquitto mosquitto-clients
  1. 测试
mosquitto_sub -h test.mosquitto.org -t "#" -v

如果发现大量的消息滚动刷屏,说明该客户端链接成功

客户端开发包

SDK包下载地址:
eclipse/paho.mqtt.c

编译及生产lib和头文件
cmake -DMAKE_INSTALL_PREFIX=./output
make && make install
在这里插入图片描述

API接口介绍

接口类型头文件备注
Paho MQTT C Client Library MQTTClient.h非现程安全
Paho Asynchronous MQTT C Client LibraryMQTTAsync.h线程安全

数据结构

typedef void* MQTTClient;
typedef void* MQTTAsync;

MQTTClient 库接口

  1. MQTTClient_create() : MQTTClient.h
int MQTTClient_create(MQTTClient *handle,
                      const char *serverURI,
                      const char *clientId,
                      int         persistence_type,
                      void *      persistence_context 
)

此函数创建一个 MQTT 客户端,以便连接到指定的服务器并使用指定的持久性存储(参考 MQTTClient_persistence 函数)。另外,与创建函数对应的是销毁函数 MQTTClient_destroy()。

  1. MQTTClient_connect() : MQTTClient.h
int MQTTClient_connect(MQTTClient handle,
                       MQTTClient_connectOptions *options
)

此函数尝试使用指定的选项将以前创建的客户端(调用 MQTTClient_create() 函数)连接到 MQTT 服务器。如果要启用异步消息和状态通知,调用该函数之前必须先调用 MQTTClient_setCallbacks() 函数。

  1. MQTTClient_destroy() : MQTTClient.h
void MQTTClient_destroy(MQTTClient *handle)

此函数用于释放分配给 MQTT 客户端的内存(调用 MQTTClient_create() 函数)。当不再需要客户端时,应该调用它。

4.MQTTClient_disconnect() : MQTTClient.h

int MQTTClient_disconnect(MQTTClient handle, int timeout)

此函数尝试断开客户端与 MQTT 服务器的连接。为了让客户端有时间完成对调用此函数时正在传递的消息的处理,需要指定一个超时期限。当超时期限过期时,即使仍有未完成的消息确认,客户端也会断开连接。下一次客户端连接到同一服务器时,任何未完成的 QoS 1 或 2 消息将根据前一个连接和新连接的清理设置重试(参考 MQTTClient_connectOptions.cleansession 和 MQTTClient_connect())。

  1. MQTTClient_free() : MQTTClient.h
void MQTTClient_free(void *ptr)

此函数释放 MQTT C Client 库分配的内存,特别是主题名称。当 client 库和应用程序使用不同版本的 C 编译器编译时,在 Windows 上需要这样做。因此,在释放任何 MQTT C Client 分配的内存时,始终使用此函数是一个很好的策略。

  1. MQTTClient_freeMessage() : MQTTClient.h
void MQTTClient_freeMessage(MQTTClient_message ** msg)

此函数释放分配给 MQTT 消息的内存,包括分配给消息有效负载的附加内存。当完全处理消息时,客户端应用程序调用此函数。重要提示:此函数不释放分配给消息主题字符串的内存。客户端应用程序负责使用 MQTTClient_free() 库函数释放此内存。

  1. MQTTClient_isConnected() : MQTTClient.h
int MQTTClient_isConnected(MQTTClient handle)

此函数允许客户端应用程序测试某个 client 与 MQTT 服务器连接情况,已连接返回 true,否则返回 false。

  1. MQTTClient_publish() : MQTTClient.h
int MQTTClient_publish(MQTTClient                handle,
                       const char *              topicName,
                       int                       payloadlen,
                       const void *              payload,
                       int                       qos,
                       int                       retained,
                       MQTTClient_deliveryToken *dt
)

此函数试图将消息发布到给定主题(另请参见 MQTTClient_publishMessage 函数)。当此函数成功返回时,将发出 MQTTClient_deliveryToken。如果客户端应用程序需要测试 QoS1 和 QoS2 消息的成功传递,可以异步或同步地完成(参考 异步与同步客户端应用程序、 MQTTClient_waitForCompletion 和 MQTTClient_deliveryComplete 函数)。

  1. MQTTClient_publishMessage() : MQTTClient.h
int MQTTClient_publishMessage(MQTTClient                handle,
                              const char *              topicName,
                              MQTTClient_message *      msg,
                              MQTTClient_deliveryToken *dt
)

此函数试图将消息发布到给定主题(另请参见 MQTTClient_publish 函数)。当此函数成功返回时,将发出一个 MQTTClient_deliveryToken 令牌。如果客户端应用程序需要测试 QoS1 和 QoS2 消息的成功传递,可以异步或同步地完成(参考 异步与同步客户端应用程序、 MQTTClient_waitForCompletion 和 MQTTClient_deliveryComplete 函数)。

  1. MQTTClient_receive() : MQTTClient.h
int MQTTClient_receive(MQTTClient            handle,
                       char **               topicName,
                       int *                 topicLen,
                       MQTTClient_message ** message,
                       unsigned long         timeout
)

此函数执行传入消息的同步接收。只有当客户端应用程序没有设置回调方法以支持消息的异步接收时,才应该使用它(请参见 异步与同步客户端应用程序 以及 MQTTClient_setCallbacks 函数)。使用此函数可以编写单线程客户端订阅者应用程序。当调用时,此函数将阻塞,直到下一条消息到达或指定的超时过期为止(另请参见 MQTTClient_yield 函数)。

重要提示:应用程序在处理完成后必须释放分配给主题和消息的内存(请参见 MQTTClient_freeMessage 函数)。

  1. MQTTClient_setCallbacks() : MQTTClient.h
int MQTTClient_setCallbacks(MQTTClient                    handle,
                            void *                        context,
                            MQTTClient_connectionLost *   cl,
                            MQTTClient_messageArrived *   ma,
                            MQTTClient_deliveryComplete * dc
)

此函数设置特定客户端的回调函数。如果客户端应用程序不使用特定的回调,请将相关参数设置为 NULL。调用 MQTTClient_setCallbacks() 将客户端置于多线程模式。任何必要的消息确认和状态通信都在后台处理,不需要客户端应用程序的任何干预。更多信息请参见 异步与同步客户端应用程序。

注意:在调用此函数时,MQTT 客户端必须断开连接。

  1. MQTTClient_setDisconnected() : MQTTClient.h
int MQTTClient_setDisconnected(MQTTClient handle,
                               void *     context,
                               MQTTClient_disconnected *co 
)

设置客户端的 MQTTClient_disconnected() 回调函数。如果从服务器接收到断开连接的数据包,将调用这个函数。该函数只对 MQTT V5 及以上版本有效。

  1. MQTTClient_setPublished() : MQTTClient.h
int MQTTClient_setPublished(MQTTClient             handle,
                            void *                 context,
                            MQTTClient_published * co
)

MQTTAsync 库接口

  • MQTTAsync_connect() : MQTTAsync.h
  • MQTTAsync_create() : MQTTAsync.h
  • MQTTAsync_createWithOptions() : MQTTAsync.h
  • MQTTAsync_destroy() : MQTTAsync.h
  • MQTTAsync_disconnect() : MQTTAsync.h
  • MQTTAsync_free() : MQTTAsync.h
  • MQTTAsync_freeMessage() : MQTTAsync.h
  • MQTTAsync_getPendingTokens() : MQTTAsync.h
  • MQTTAsync_getVersionInfo() : MQTTAsync.h
  • MQTTAsync_global_init() : MQTTAsync.h
  • MQTTAsync_isComplete() : MQTTAsync.h
  • MQTTAsync_isConnected() : MQTTAsync.h
  • MQTTAsync_malloc() : MQTTAsync.h
  • MQTTAsync_reconnect() : MQTTAsync.h
  • MQTTAsync_send() : MQTTAsync.h
  • MQTTAsync_sendMessage() : MQTTAsync.h
  • MQTTAsync_setAfterPersistenceRead() : MQTTAsync.h
  • MQTTAsync_setBeforePersistenceWrite() : MQTTAsync.h
  • MQTTAsync_setCallbacks() : MQTTAsync.h
  • MQTTAsync_setConnected() : MQTTAsync.h
  • MQTTAsync_setConnectionLostCallback() : MQTTAsync.h
  • MQTTAsync_setDeliveryCompleteCallback() : MQTTAsync.h
  • MQTTAsync_setDisconnected() : MQTTAsync.h
  • MQTTAsync_setMessageArrivedCallback() : MQTTAsync.h
  • MQTTAsync_setTraceCallback() : MQTTAsync.h
  • MQTTAsync_setTraceLevel() : MQTTAsync.h
  • MQTTAsync_setUpdateConnectOptions() : MQTTAsync.h
  • MQTTAsync_strerror() : MQTTAsync.h
  • MQTTAsync_subscribe() : MQTTAsync.h
  • MQTTAsync_subscribeMany() : MQTTAsync.h
  • MQTTAsync_unsubscribe() : MQTTAsync.h
  • MQTTAsync_unsubscribeMany() : MQTTAsync.h
  • MQTTAsync_waitForCompletion() : MQTTAsync.h
  • MQTTProperties_add() : MQTTProperties.h
  • MQTTProperties_copy() : MQTTProperties.h
  • MQTTProperties_free() : MQTTProperties.h
  • MQTTProperties_getNumericValue() : MQTTProperties.h
  • MQTTProperties_getNumericValueAt() : MQTTProperties.h
  • MQTTProperties_getProperty() : MQTTProperties.h
  • MQTTProperties_getPropertyAt() : MQTTProperties.h
  • MQTTProperties_hasProperty() : MQTTProperties.h
  • MQTTProperties_len() : MQTTProperties.h
  • MQTTProperties_propertyCount() : MQTTProperties.h
  • MQTTProperties_read() : MQTTProperties.h
  • MQTTProperties_write() : MQTTProperties.h
  • MQTTProperty_getType() : MQTTProperties.h
  • MQTTPropertyName() : MQTTProperties.h
  • MQTTReasonCode_toString() : MQTTReasonCodes.h

###代码使用案例

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTAsync.h"

#if !defined(_WIN32)
#include <unistd.h>
#else
#include <windows.h>
#endif

#if defined(_WRS_KERNEL)
#include <OsWrapper.h>
#endif

#define ADDRESS     "tcp://mqtt.eclipseprojects.io:1883"
#define CLIENTID    "ExampleClientPub"
#define TOPIC       "MQTT Examples"
#define PAYLOAD     "Hello World!"
#define QOS         1
#define TIMEOUT     10000L

int finished = 0;

void connlost(void *context, char *cause)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
	int rc;

	printf("\nConnection lost\n");
	printf("     cause: %s\n", cause);

	printf("Reconnecting\n");
	conn_opts.keepAliveInterval = 20;
	conn_opts.cleansession = 1;
	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start connect, return code %d\n", rc);
 		finished = 1;
	}
}

void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
{
	printf("Disconnect failed\n");
	finished = 1;
}

void onDisconnect(void* context, MQTTAsync_successData* response)
{
	printf("Successful disconnection\n");
	finished = 1;
}

void onSendFailure(void* context, MQTTAsync_failureData* response)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
	int rc;

	printf("Message send failed token %d error code %d\n", response->token, response->code);
	opts.onSuccess = onDisconnect;
	opts.onFailure = onDisconnectFailure;
	opts.context = client;
	if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start disconnect, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}
}

void onSend(void* context, MQTTAsync_successData* response)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_disconnectOptions opts = MQTTAsync_disconnectOptions_initializer;
	int rc;

	printf("Message with token value %d delivery confirmed\n", response->token);
	opts.onSuccess = onDisconnect;
	opts.onFailure = onDisconnectFailure;
	opts.context = client;
	if ((rc = MQTTAsync_disconnect(client, &opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start disconnect, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}
}


void onConnectFailure(void* context, MQTTAsync_failureData* response)
{
	printf("Connect failed, rc %d\n", response ? response->code : 0);
	finished = 1;
}


void onConnect(void* context, MQTTAsync_successData* response)
{
	MQTTAsync client = (MQTTAsync)context;
	MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
	MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
	int rc;

	printf("Successful connection\n");
	opts.onSuccess = onSend;
	opts.onFailure = onSendFailure;
	opts.context = client;
	pubmsg.payload = PAYLOAD;
	pubmsg.payloadlen = (int)strlen(PAYLOAD);
	pubmsg.qos = QOS;
	pubmsg.retained = 0;
	if ((rc = MQTTAsync_sendMessage(client, TOPIC, &pubmsg, &opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start sendMessage, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}
}

int messageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* m)
{
	/* not expecting any messages */
	return 1;
}

int main(int argc, char* argv[])
{
	MQTTAsync client;
	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
	int rc;

	if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to create client object, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}

	if ((rc = MQTTAsync_setCallbacks(client, NULL, connlost, messageArrived, NULL)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to set callback, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}

	conn_opts.keepAliveInterval = 20;
	conn_opts.cleansession = 1;
	conn_opts.onSuccess = onConnect;
	conn_opts.onFailure = onConnectFailure;
	conn_opts.context = client;
	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
	{
		printf("Failed to start connect, return code %d\n", rc);
		exit(EXIT_FAILURE);
	}

	printf("Waiting for publication of %s\n"
         "on topic %s for client with ClientID: %s\n",
         PAYLOAD, TOPIC, CLIENTID);
	while (!finished)
		#if defined(_WIN32)
			Sleep(100);
		#else
			usleep(10000L);
		#endif

	MQTTAsync_destroy(&client);
 	return rc;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/528430.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringCloud_服务注册中心_Consul(八)

SpringCloud_服务注册中心_Consul(八) 分为五部分 Consul简介 安装并运行Consul 服务提供者 服务消费者 三个注册中心异同点 Consul简介 官网&#xff1a;https://developer.hashicorp.com/consul/docs/intro 是Go语言写的 Consul是一套开源的分布式服务发现和配置管理系统&am…

GB50312-2016标准中需要检测的参数(AEMFLUKE)含双绞线和光

很多同学经常搞不清GB50312-2016标准的规定测试参数&#xff0c;或者说和测试设备对不上号。特意从标准中摘抄出来&#xff0c;供大家参考。 ACR-F(Attenuation to Crosstalk Ratio at the Far-end) 衰减远端串音比 ACR-N(Attenuation to Crosstalk Ratio at the Near-end)衰…

用于申威Alpha指令集处理器CModel裸机(不带操作系统)的CoreMark性能测试程序源码编译流程

CoreMark是一个综合基准&#xff0c;用于测量嵌入式系统中使用的中央处理器(CPU)的性能。它是在2009由eembc的shay gal-on开发的&#xff0c;并且试图将其发展成为工业标准&#xff0c;取代过时的dehrystone基准。代码用C编写&#xff0c;包含以下算法&#xff1a;列表处理(增删…

如何在Colab中使用gpu资源(附使用MMdet推理示例)

如何在Colab中“白嫖”gpu资源&#xff08;附使用MMdet推理示例&#xff09; Google Colab简介 当今&#xff0c;深度学习已经成为许多人感兴趣的话题&#xff0c;Google Colab&#xff08;全称为Google Colaboratory&#xff09;是Google推出的一个强大的云端 notebook&…

《微服务实战》 第七章 Spring Cloud 之 GateWay

前言 API 网关是一个搭建在客户端和微服务之间的服务&#xff0c;我们可以在 API 网关中处理一些非业务功能的逻辑&#xff0c;例如权限验证、监控、缓存、请求路由等。 1、通过API网关访问服务 客户端通过 API 网关与微服务交互时&#xff0c;客户端只需要知道 API 网关地…

UWB智慧工厂人员定位系统源码,人员在岗监控、车辆实时轨迹监控源码

近年来人员定位系统在工业领域的发展势头迅猛&#xff0c;工业识别与定位成为促进制造业数字化的关键技术。通过实时定位可以判断所有的人、物、车的位置。实时定位系统要适用于复杂工业环境&#xff0c;单一技术是很难实现的&#xff0c;需要融合多种不同的定位技术&#xff0…

【hive】hive grouping sets和GROUPING__ID的用法

前言​ GROUPING SETS,GROUPING__ID,CUBE,ROLLUP 这几个分析函数通常用于OLAP中&#xff0c;不能累加&#xff0c;而且需要根据不同维度上钻和下钻的指标统计&#xff0c;比如&#xff0c;分小时、天、月的UV数。 grouping sets根据不同的维度组合进行聚合&#xff0c;等价于…

从事网络安全工作,这五大证书是加分项!

对我们而言&#xff0c;无论从事什么工作&#xff0c;考取相关证书都有非常重要的作用&#xff0c;它是我们找工作时的加分项&#xff0c;同时也是对我们技术水平的验证&#xff0c;那么从事网络安全工作可以考哪些证书?本篇文章为大家介绍一下。 1、CISP 国家注册信息安全专业…

vue3【父子组件间的传值--setup语法糖】

这篇文章主要讲解vue3语法糖中组件传值的用法、 一、父组件给子组件传值 父组件 <template><div classmain>我是父组件<Child :msg"parentMsg"></Child></div></template><script setup> import Child from ./child im…

idea热部署插件JRebel激活

JRebel可以实现在idea中热部署项目&#xff0c;修改后不用重启项目&#xff0c;让开发更丝滑。 JRebel需要激活才可以正常使用。 不想安装服务的可以用我个人部署的服务器注册&#xff0c;不保证稳定哦&#xff0c;有问题可以留言。 安装完插件直接看激活。 http://121.5.183.2…

亲水性Sulfo-Cyanine3 NHS ester水溶性CY3标记活性脂

Sulfo-Cy3是一种荧光染料&#xff0c;可用于生物成像和细胞标记等应用。Sulfo-Cy3是一种含有硫酸基的Cy3染料&#xff0c;具有高度的水溶性和稳定性。Sulfo-Cy3可以与NHS&#xff08;N-羟基琥珀酰亚胺&#xff09;结合&#xff0c;形成Sulfo-Cy3 NHS&#xff0c;这种结合物可以…

微生物常见统计检验方法比较及选择

谷禾健康 微生物组经由二代测序分析得到庞大数据结果&#xff0c;其中包括OTU/ASV表&#xff0c;物种丰度表&#xff0c;alpha多样性、beta多样性指数&#xff0c;代谢功能预测丰度表等&#xff0c;这些数据构成了微生物组的变量&#xff0c;大量数据构成了高纬度数据信息。 针…

[JS与链表]双向链表

前言 阅读本文前请先阅读 [JS与链表]普通链表_AI3D_WebEngineer的博客-CSDN博客 ES6的Class继承 类的继承可以使用extends&#xff0c;让子类继承父类的属性和方法。 而在子类内部&#xff08;构造函数constructor&#xff09;必须调用super()实现继承&#xff08;super()代表父…

基于MPSOC+C6678+高精度AD/DA的软件无线电处理平台

板卡概述 VPX_XM630 是一款基于6U VPX 总线架构的高速信号处理平台&#xff0c;该平台采用一片Xilinx 的Kintex UltraScale 系列FPGA&#xff08;XCKU115&#xff09;作为主处理器&#xff0c;完成复杂的数据采集、回放以及实时信号处理算法。采用一片带有ARM 内核的高性能嵌入…

k8s简单记录

进入pod中的某个容器并执行命令 # 进入pod中的busybox容器&#xff0c;查看文件内容 # 补充一个命令: kubectl exec pod名称 -n 命名空间 -it -c 容器名称 /bin/sh 在容器内部执行命令 # 使用这个命令就可以进入某个容器的内部&#xff0c;然后进行相关操作了 # 比如&#x…

【论文简述】Multi-View Stereo Representation Revisit: Region-Aware MVSNet(CVPR 2023)

一、论文简述 1. 第一作者&#xff1a;Yisu Zhang 2. 发表年份&#xff1a;2023 3. 发表期刊&#xff1a;CVPR 4. 关键词&#xff1a;MVS、3D重建、符号距离场 5. 探索动机&#xff1a;像素深度估计仍存在两个棘手的缺陷。一是无纹理区域的估计置信度较低。二是物体边界附…

一文读懂DNS解析原理和流程(中科三方)

什么是DNS域名解析 我们首先要了解域名和IP地址的区别。IP地址是互联网上计算机唯一的逻辑地址&#xff0c;通过IP地址实现不同计算机之间的相互通信&#xff0c;每台联网计算机都需要通过IP地址来互相联系和分别。 但由于IP地址是由一串容易混淆的数字串构成&#xff0c;人们很…

awk指令的详细指南

目录 工作原理 命令格式 awk常见的内建变量&#xff08;可直接用&#xff09;如下所示 按行输出文本 按字段输出文本 通过管道、双引号调用 Shell 命令 示例 CPU使用率 数组 ​编辑统计文件的内容出现的次数 使用awk 统计secure 访问日志中每个客户端IP的出现次数? …

云上的二维设计原来是这样的!

今天与大家探索云上的二维设计&#xff0c;3DEXPERIENCE DraftSight基于云平台实现与云端进行连接&#xff0c;实现一定的云上协作&#xff0c;提升绘图工作效率&#xff0c;我们从以下三方面来进行说明&#xff1a; 01&#xff1a;DraftSight设计 02&#xff1a;Revision变更…

ECharts 快速入门

文章目录 1.1 ECharts介绍1.2 vue使用ECharts1&#xff09;vscode打开测试工程2) 工程安装echarts依赖3) 配置echarts4) vue组件使用echarts5) 页面效果&#xff1a; 1.3 项目中 ECharts 的使用1) 配置和使用流程说明2) 前端显示效果 1.1 ECharts介绍 ECharts是百度开发的一个…