目录
概述
1 认识Eclipse Paho C
1.1 paho.mqtt.c简介
1.2 下载和安装paho.mqtt.c
1.3 一些重要的函数
2 异步订阅消息实现
2.1 编写异步订阅消息功能
2.1.1 初始化MQTT参数
2.1.2 初始化函数
2.1.3 订阅消息的回调函数
2.1.4 取消订阅Topic
2.2 编译代码和测试
3 验证MQTT Client功能
3.1 EMQX服务器上查看MQTT Client
3.2 MQTT.fx发布Topic
4 完整实验代码
概述
本文主要介绍在linux环境(ubuntu)环境下,下载和安装Eclipse Paho C MQTT 软件包,还编写一个范例实现异步订阅Message的功能,并使用基于EMQX的服务验证其功能,使用MQTT.fx发布消息,以验证订阅消息功能的可靠性。
1 认识Eclipse Paho C
1.1 paho.mqtt.c简介
paho.mqtt.c是一个使用C语言编写的MQTT客户端库,用于连接和与MQTT代理进行通信。它是Eclipse Paho项目的一部分,旨在为C开发人员提供一个易于使用、可靠和高性能的MQTT解决方案。
这个库允许开发人员创建MQTT客户端,该客户端可以连接到远程MQTT代理,并实现与代理之间的消息发布、订阅和管理等功能。paho.mqtt.c库提供了一套简单而直观的API,使开发人员能够轻松地使用MQTT协议进行通信。
paho.mqtt.c库支持多种MQTT协议版本,包括MQTTv3.1和MQTTv3.1.1。它提供了多种操作和配置选项,以适应不同的需求和场景。该库还提供了一些高级功能,例如支持SSL/TLS加密、持久化会话、消息保持和遗嘱消息等。
paho.mqtt.c库在多个平台上都可以使用,包括Windows、Linux、macOS和嵌入式系统等。它具有良好的可移植性和可扩展性,可以方便地与其他C语言项目集成。
总之,paho.mqtt.c是一个强大而灵活的MQTT客户端库,适用于使用C语言开发MQTT应用程序的开发人员。它简化了与MQTT代理的通信过程,并提供了丰富的功能和选项,使开发人员能够快速构建高性能的MQTT应用程序。
1.2 下载和安装paho.mqtt.c
登录mqtt官网,点击Software,可以看见如下页面,选择Eclipse Paho C进入下载页面
https://mqtt.org/
点击Eclipse Paho C后,会自动跳转到下载页面
笔者选择使用命令直接安装该软件包,具体操作步骤如下:
Step -1: 下载软件包执行命令:
git clone https://github.com/eclipse/paho.mqtt.c.git
step-2: 进入paho.mqtt.c目录,执行make
cd paho.mqtt.c
make
系统会自动编译代码,等待编译结果。
编译完成后,会自动生成build文件,这时可以安装
step-3 : 执行如下命令就可以安装软件
sudo make install
1.3 一些重要的函数
函数名称 | 功能介绍 |
---|---|
MQTTClient_create | 创建一个MQTT Client Object |
MQTTClient_connect | 连接MQTT服务器 |
MQTTClient_disconnect | 断开MQTT服务器 |
MQTTClient_subscribe | 订阅Topic |
MQTTClient_setCallbacks | 设置回调函数 |
MQTTClient_publishMessage | 发布消息 |
MQTTClient_unsubscribe | 取消订阅消息 |
MQTTClient_destroy | 销毁MQTT Client Object |
2 异步订阅消息实现
2.1 编写异步订阅消息功能
2.1.1 初始化MQTT参数
参数功能介绍:
参数名称 | 参数值 | 描述 |
---|---|---|
ADDRESS | tcp://192.168.1.11:1883 | mqtt broker的IP地址 |
CLIENTID | mqtt_ubuntu_asys | 设备ID |
TOPIC | switch | 订阅的Topic |
QOS | 1 | 服务质量等级=1 |
TIMEOUT | 10000L | |
USERNAME | mqtt_ubuntu_user | 终端认证username |
PASSWORD | 123456 | 终端认证username对应的password |
在代码中修改的参数:
2.1.2 初始化函数
初始化MQTT终端需要完成以下4个步骤:
step-1: 创建MQTT Client
step-2: 配置回调函数
step-3: 连接服务器
step-4: 订阅Topic
具体实现代码如下:
代码57行:创建MQTT Client,需要传入服务器IP和Client ID信息
代码65行:配置callback函数,函数原型,见源码
代码72行: 心跳包时间间隔设置为20s
代码73行: 清除会话 标记设置为1,不接受离线消息
代码74行: 配置设备终端用户
代码75行: 配置设备终端用户password
代码76行: MQTT连接Broker
代码85行: 订阅topic
2.1.3 订阅消息的回调函数
该函数主要实现接收订阅的topic信息,并通过终端打印出来。
代码37行:打印订阅的Topic名称
代码38行:打印payload数据长度和payload原始数据
代码39行: 释放存储poyload信息内存
代码40行: 释放存储topic名称信息内存
2.1.4 取消订阅Topic
要取消订阅的Topic,调用MQTTClient_unsubscribe函数可以实现该功能。实现范例如下:
2.2 编译代码和测试
使用如下命令编译代码
gcc test_02_Asynsubscription.c -lpaho-mqtt3c
执行.out文件后,打印的log提示,MQTT Client已经订阅了 topic为switch的消息。
3 验证MQTT Client功能
3.1 EMQX服务器上查看MQTT Client
在ubuntu上运行MQTT Client后,EMQX服务器会显示MQTT Client的运行状态,登录EMQX服务器可以看见
在订阅管理面板上,也可以看见mqtt_ubuntu_asys订阅了Topic为"switch"的消息
3.2 MQTT.fx发布Topic
使用MQTT.fx发布Topic为switch的消息,Client ID 为mqtt_ubuntu_asys的客户端订阅了该消息,那么当MQTT.fx发布消息之后,mqtt_ubuntu_asys会收到该消息,并在终端上打印出来。
要使用MQTT.fx MQTT Client工具订阅MQTTAsync,首先保证MQTT.fx能正常连接至EMQX服务器
使用MQTT.fx发布Topic为switch的消息
{
"switch": false
}
使用MQTT.fx发布Topic为switch的消息
{
"switch": true
}
在EMQX的保留信息页面,查看MQTT.fx发布Topic为switch的消息,该信息和Client ID 为mqtt_ubuntu_asys的客户端收到的信息一致
4 完整实验代码
创建test_02_Asynsubscription.c,编写如下代码:
/***************************************************************
Copyright 2024-2029. All rights reserved.
文件名 : test_02_Asynsubscription.c
作者 : tangmingfei2013@126.com
版本 : V1.0
描述 : mqtt异步订阅消息
日志 : 初版V1.0 2024/03/13
***************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#define ADDRESS "tcp://192.168.1.11:1883"
#define CLIENTID "mqtt_ubuntu_asys"
#define TOPIC "switch"
#define QOS 1
#define TIMEOUT 10000L
#define USERNAME "mqtt_ubuntu_user"
#define PASSWORD "123456"
volatile MQTTClient_deliveryToken deliveredtoken;
void delivered(void *context, MQTTClient_deliveryToken dt)
{
printf("Message with token value %d delivery confirmed\n", dt);
deliveredtoken = dt;
}
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
printf("Message arrived\n");
printf(" topic: %s\n", topicName);
printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
void connlost(void *context, char *cause)
{
printf("\nConnection lost\n");
printf(" cause: %s\n", cause);
}
int main(int argc, char* argv[])
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
int rc;
if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to create client, return code %d\n", rc);
rc = EXIT_FAILURE;
goto exit;
}
if ((rc = MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to set callbacks, return code %d\n", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = USERNAME; //用户名
conn_opts.password = PASSWORD; //密码
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
rc = EXIT_FAILURE;
goto destroy_exit;
}
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
"Press Q<Enter> to quit\n\n", TOPIC, CLIENTID, QOS);
if ((rc = MQTTClient_subscribe(client, TOPIC, QOS)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to subscribe, return code %d\n", rc);
rc = EXIT_FAILURE;
}
else
{
int ch;
do
{
ch = getchar();
} while (ch!='Q' && ch != 'q');
if ((rc = MQTTClient_unsubscribe(client, TOPIC)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to unsubscribe, return code %d\n", rc);
rc = EXIT_FAILURE;
}
}
if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to disconnect, return code %d\n", rc);
rc = EXIT_FAILURE;
}
destroy_exit:
MQTTClient_destroy(&client);
exit:
return rc;
}