目录
概述
1 同步模式和异步模式
1.1 同步模式
1.2 异步模式
2 下载和安装paho.mqtt.c
3 同步方式发布和订阅消息功能实现
3.1 MQTT Client参数配置
3.2 初始化MQTT Client
3.3 发布消息功能
3.4 订阅消息功能
3.5 解析订阅的信息
4 编译和测试
4.1 编译代码
4.2 运行
5 验证MQTT Client功能
5.1 EMQX服务器上查看MQTT Client
5.2 MQTT.fx发布Topic
5.3 MQTT.fx订阅的主题
6 完整代码
概述
本文主要介绍在linux环境(ubuntu)环境下,下载和安装Eclipse Paho C MQTT 软件包,还编写一个范例实现同步发布Message的功能,并使用基于EMQX的服务验证其功能,还是用MQTT.fx订阅消息,已验证发布消息功能的可靠性。
1 同步模式和异步模式
1.1 同步模式
在同步模式下,客户机应用程序在单个线程上运行。使用MQTTClient_publish()和MQTTClient_publishMessage()函数发布消息。要确定QoS1或QoS2(请参阅服务质量)消息已成功交付,应用程必须调用MQTTClient_waitForCompletion()函数。同步发布示例中显示了显示同步发布的示例。在同步模式下接消息使用MQTTClient_receive()函数。客户机应用程序必须相对频繁地调用 MQTTClient_receive()MQTTClient_yield(),以便允许处理确认和MQTT“ping”,从而保持与服务器的网络连接处于活动状态。
总结同步模式应用方法
1)客户机应用程序在单个线程上运行
2)使用MQTTClient_publish()或者MQTTClient_publishMessage()发布消息
3)使用MQTTClient_waitForCompletion()确认消息是否发布成功
4)使用MQTTClient_receive()接收消息
5)必须频繁调用MQTTClient_receive()和MQTTClient_yield(),以确认消息
1.2 异步模式
在异步模式下,客户机应用程序在多个线程上运行。主程序调用客户端库中的函数来发布和订阅,就像同步模式一样。但是,握手和维护网络连接的处理是在后台执行的。使用调用MQTTClient_setCallbacks()(参见MQTTClient_messageArrived()、MQTTClient_connectionLost()和MQTTClient_deliveryComplete())向库注册的回调,向客户端应用程序提供状态通知和消息接收。然而,这个API不是线程安全的——在没有同步的情况下,不可能从多个线程调用它。可以为此使用MQTTAsync API来实现这些功能。
总结异步模式应用方法
1)客户机应用程序在多个线程上运行
2)主程序调用客户端库中的函数来发布和订阅,使用MQTTClient_publish()或者MQTTClient_publishMessage()发布消息;使用MQTTClient_publishMessage订阅消息
3)使用调用MQTTClient_setCallbacks(),向客户端应用程序提供状态通知和消息接收
异步模式的详细使用范例,参看文章:
Linux环境下使用Eclipse Paho C 实现(MQTT Client)异步订阅Message-CSDN博客
Linux环境下使用Eclipse Paho C 实现(MQTT Client)异步方式发布Message-CSDN博客
2 下载和安装paho.mqtt.c
登录mqtt官网,点击Software,可以看见如下页面,选择Eclipse Paho C进入下载页面
https://mqtt.org/
下载paho.mqtt.c
笔者选择使用命令直接安装该软件包,具体操作步骤如下:
Step -1: 下载软件包执行命令:
git clone https://github.com/eclipse/paho.mqtt.c.git
step-2: 进入paho.mqtt.c目录,执行make
cd paho.mqtt.c
make
系统会自动编译代码,等待编译结果。
编译完成后,会自动生成build文件,这时可以安装
step-3 : 执行如下命令就可以安装软件
sudo make install
3 同步方式发布和订阅消息功能实现
3.1 MQTT Client参数配置
初始化MQTT Client,必须配置一些参数,包括broker的IP地址,订阅的topic等,具体参数如下表所示:
参数功能介绍:
参数名称 | 参数值 | 描述 |
---|---|---|
ADDRESS | tcp://192.168.1.11:1883 | mqtt broker的IP地址 |
CLIENTID | mqtt_ubuntu_asys | 设备ID |
TOPIC | MQTTAsync | 发布的Topic |
SUBTOPIC | switch | 订阅的Topic |
PAYLOAD | 12.56 | Topic下的payload |
QOS | 1 | 服务质量等级=1 |
TIMEOUT | 10000L | 超时计数 |
USERNAME | mqtt_ubuntu_user | 终端认证username |
PASSWORD | 123456 | 终端认证username对应的password |
在代码中定义这些参数的位置:
3.2 初始化MQTT Client
初始化MQTT终端需要完成以下2个步骤:
step-1: 创建MQTT Client
step-2: 连接服务器
具体实现代码如下:
代码66行:创建MQTT Client,需要传入服务器IP和Client ID信息
代码73行: 心跳包时间间隔设置为20s
代码74行: 清除会话 标记设置为1,不接受离线消息
代码75行: 配置设备终端用户
代码76行: 配置设备终端用户password
代码78行: MQTT连接Broker
3.3 发布消息功能
要实现发布消息功能,需要将payload及其相关参数填到MQTTClient_message定义的数据结构中,下面介绍整个public message 函数的功能。
代码39行: 装载payload
代码40行:payload的字符长度
代码41行:消息服务等级参数
代码42行:配置为保留消息
代码44行:使用MQTTClient_publishMessage函数发布消息
代码53行:使用MQTTClient_waitForCompletion等待消息发布完毕
3.4 订阅消息功能
要取消订阅的Topic,调用MQTTClient_unsubscribe函数可以实现该功能。实现范例如下:
3.5 解析订阅的信息
代码69行: 使用MQTTClient_receive接收订阅的消息
代码70行: 通过检测rc的值,并判断topic的值是否有效,以确定是否要解析消息
4 编译和测试
4.1 编译代码
使用如下命令编译代码
gcc test_03_Synchronous.c -lpaho-mqtt3c -lpthread
4.2 运行
执行.out文件后,可以看见,MQTT Client订阅和发布消息成功了,服务器端收到消息后,token值会自动加1
5 验证MQTT Client功能
5.1 EMQX服务器上查看MQTT Client
在ubuntu上运行MQTT Client后,EMQX服务器会显示MQTT Client的运行状态,登录EMQX服务器可以看见
在订阅管理面板上,也可以看见mqtt_ubuntu_asys订阅了Topic为"switch"
5.2 MQTT.fx发布Topic
使用MQTT.fx发布Topic为switch的消息,Client ID 为mqtt_ubuntu_asys的客户端订阅了该消息,那么当MQTT.fx发布消息之后,mqtt_ubuntu_asys会收到该消息,并在终端上打印出来。
要使用MQTT.fx MQTT Client工具订阅MQTTsync,首先保证MQTT.fx能正常连接至EMQX服务器
使用MQTT.fx发布Topic为switch的消息
{
"switch": false
}
使用MQTT.fx发布Topic为switch的消息
{
"switch": false
}
在EMQX的保留信息页面,查看MQTT.fx发布Topic为switch的消息,该信息和Client ID 为mqtt_ubuntu_asys的客户端
5.3 MQTT.fx订阅的主题
在EMQX的订阅管理页面,查看MQTT.fx订阅Topic为MQTTsync的消息,该信息和Client ID 为mqtt_ubuntu_asys的客户端发布的消息
在MQTT.fx上查看Topic为MQTTsync的消息
6 完整代码
创建test_03_Synchronous.c,编写如下代码:
/***************************************************************
Copyright 2024-2029. All rights reserved.
文件名 : test_03_Synchronous.c
作者 : tangmingfei2013@126.com
版本 : V1.0
描述 : mqtt同步发布和订阅消息
日志 : 初版V1.0 2024/03/13
***************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include "MQTTClient.h"
#define ADDRESS "tcp://192.168.1.11:1883"
#define CLIENTID "mqtt_ubuntu_asys"
#define TOPIC "MQTTsync"
#define SUBTOPIC "switch"
#define PAYLOAD "12.56"
#define QOS 1
#define TIMEOUT 10000L
#define USERNAME "mqtt_ubuntu_user"
#define PASSWORD "123456"
static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
static MQTTClient_message pubmsg = MQTTClient_message_initializer;
static MQTTClient client;
static MQTTClient_deliveryToken deliveredtoken, temptoken;
int count;
MQTTClient_deliveryToken user_publicMsg( void )
{
MQTTClient_deliveryToken token;
int rc;
pubmsg.payload = PAYLOAD;
pubmsg.payloadlen = (int)strlen(PAYLOAD);
pubmsg.qos = QOS;
pubmsg.retained = 1;
if ((rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to publish message, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Waiting for up to %d seconds for publication of %s\n"
"on topic %s for client with ClientID: %s\n",
(int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("Message with delivery token %d delivered\n", token);
return token;
}
int receive_subMessage( void )
{
int topicLen;
int rc;
char *topic = SUBTOPIC;
MQTTClient_message *message = NULL;
rc = MQTTClient_receive( client, &topic, &topicLen, &message, TIMEOUT);
if( rc == MQTTCLIENT_SUCCESS && topic!= NULL ){
printf("Message arrived \n");
printf(" topic: %s\n", topic);
printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
}
printf("MQTTClient receive message, return code %d\n", rc);
return rc;
}
/* 线程 */
void thread_subMsg(void)
{
usleep(1000000L);
count++;
}
int main(int argc, char* argv[])
{
int current_cnt;
pthread_t id;
int ret;
int rc;
deliveredtoken = 0;
if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to create client, return code %d\n", rc);
exit(EXIT_FAILURE);
}
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.username = USERNAME; //用户名
conn_opts.password = PASSWORD; //密码
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to connect, return code %d\n", rc);
exit(EXIT_FAILURE);
}
printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n",
SUBTOPIC, CLIENTID, QOS);
if ((rc = MQTTClient_subscribe(client, SUBTOPIC, QOS)) != MQTTCLIENT_SUCCESS)
{
printf("Failed to subscribe, return code %d\n", rc);
rc = EXIT_FAILURE;
}
ret = pthread_create(&id,NULL,(void *) thread_subMsg,NULL);
if(ret!=0){
printf ("Create pthread error!\n");
exit (1);
}
while(1)
{
receive_subMessage();
temptoken = user_publicMsg();
if(count != current_cnt )
{
current_cnt = count;
if(temptoken != deliveredtoken){
deliveredtoken = temptoken;
}
}
}
if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
printf("Failed to disconnect, return code %d\n", rc);
MQTTClient_destroy(&client);
return rc;
}