一 mqtt通信模型
MQTT 协议提供一对多的消息发布,可以降低应用程序的耦合性,用户只需要编写极少量的应用代码就能完成一对多的消息发布与订阅,该协议是基于<客户端-服务器>模型,在协议中主要有三种身份:发布者(Publisher)、服务器(Broker)以及订阅者(Subscriber)。其中,MQTT消息的发布者和订阅者都是客户端,服务器只是作为一个中转的存在,将发布者发布的消息进行转发给所有订阅该主题的订阅者;发布者可以发布在其权限之内的所有主题,并且消息发布者可以同时是订阅者,实现了生产者与消费者的脱耦,发布的消息可以同时被多个订阅者订阅。
客户端->(发布)->服务器->(订阅)->客户端
MQTT客户端功能:1.发布消息给其它相关的客户端。 2.订阅主题请求接收相关的应用消息。 3.取消订阅主题请求移除接收应用消息。 4.从服务端终止连接。
MQTT服务器端功能: MQTT 服务器常被称为 Broker(消息代理),以是一个应用程序或一台设备,它一般为云服务器,比如BTA三巨头的一些物联网平台就是常使用MQTT协议,它是位于消息发布者和订阅者之间,以便用于接收消息并发送到订阅者之中,它的功能有: 1.接受来自客户端的网络连接请求。 2.接受客户端发布的应用消息。 3.处理客户端的订阅和取消订阅请求。 4.转发应用消息给符合条件的已订阅客户端(包括发布者自身)。
二 mosquito
2.1 准备工作
openssl的交叉编译
下载链接:/source/old/index.html
cp openssl-1.1.1g.tar.gz /opt/north/
tar -zxvf openssl-1.1.1g.tar.gz
cd openssl-1.1.1g
mkdir build
./config no-asm -shared --prefix=/opt/mosquitto/arm/ssl/ CC=/opt/gcc-linaro-5.3-2016.02-x86_64_arm-linux-gnueabihf/bin/arm-linux-gnueabihf-gcc CXX=/opt/gcc-linaro-5.3-2016.02-x86_64_arm-linux-gnueabihf/bin/arm-linux-gnueabihf-g++
make
make install
cmake autoconf 和libtool的安装
apt-get install cmake
apt-get install autoconf
apt-get install libtool
2.2 mosquito下载
官网下载地址:Download | Eclipse Mosquitto
2.3 编译
linux端自行编译
# tar axvf mosquitto-1.5.5.tar.gz
# cd mosquitto-1.5.5
- # make
交叉编译
# make CC=arm-xxx-gcc CXX=arm-xxx-g++
# sudo make install
2.3 运行
注:默认端口为1883 有需要可以补一下配置信息
三 paho.mqtt.c
3.1 paho下载
eclipse/paho.mqtt.c: An Eclipse Paho C client library for MQTT for Windows, Linux and MacOS. API documentation: https://eclipse.github.io/paho.mqtt.c/
3.2 目录
cd Eclipse-Paho-MQTT-C-1.3.12-Linux/
3.3 编译
3.4 环境
3.5 运行
四 测试demo
4.1 同步
send:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#include "MQTTAsync.h"
#define ADDRESS "tcp://127.0.0.1:1883"
#define CLIENTID "ClientID1"
#define TOPIC "topic"
#define QOS 0
#define TIMEOUT 10000L
int main(int argc, char* argv[])
{
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
int rc;
// 创建MQTT客户端
MQTTClient_create(&client, ADDRESS, CLIENTID,
MQTTCLIENT_PERSISTENCE_NONE, NULL);
// 设置连接选项
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.MQTTVersion = 3;
// 连接到MQTT代理服务器
if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
{
printf("连接失败,错误码: %d\n", rc);
exit(-1);
}
printf("连接成功\n");
// 发布消息
pubmsg.payload = "Hello, MQTT";
pubmsg.payloadlen = strlen(pubmsg.payload);
pubmsg.qos = QOS;
pubmsg.retained = 0;
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
printf("发布消息到主题 %s: %s\n", TOPIC, (char*)pubmsg.payload);
int connect = 0;
// 等待发布完成
while( connect < 10 ) {
MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
printf("发布消息到主题 %s: %s\n", TOPIC, (char*)pubmsg.payload);
rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
printf("发布完成\n");
sleep(2);
connect++;
}
// 断开MQTT连接
MQTTClient_disconnect(client, 10000);
printf("断开连接\n");
// 销毁MQTT客户端
MQTTClient_destroy(&client);
return rc;
}
recv
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
int messageArrived(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
printf("接收到来自主题 %s 的消息:\n", topicName);
printf("消息内容:");
if (message->payloadlen > 0) {
printf("%.*s", message->payloadlen, (char*)message->payload);
}
printf("\n");
// 在这里执行你的消息处理逻辑
MQTTClient_freeMessage(&message);
MQTTClient_free(topicName);
return 1;
}
int main()
{
// 创建 MQTT 客户端对象并连接到 MQTT 代理
MQTTClient client;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
MQTTClient_create(&client, "tcp://127.0.0.1:1883", "ClientID", MQTTCLIENT_PERSISTENCE_NONE, NULL);
MQTTClient_connect(client, &conn_opts);
// 订阅感兴趣的主题
char* TOPIC = "topic";
int topiclen = strlen(TOPIC);
MQTTClient_subscribe(client, TOPIC, 0);
// 循环接收消息
int rc;
MQTTClient_message *pubmsg = NULL;
do {
rc = MQTTClient_receive(client, &TOPIC, &topiclen, &pubmsg, 1000);
if (rc == MQTTCLIENT_SUCCESS) {
messageArrived(NULL, TOPIC, topiclen, pubmsg);
}
sleep(2);
} while (rc != MQTTCLIENT_DISCONNECTED);
// 断开连接并清理资源
MQTTClient_disconnect(client, 1000);
MQTTClient_destroy(&client);
return rc;
}
4.2 异步
send:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTAsync.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleServerPub"
#define TOPIC "topic"
#define PAYLOAD "Hello, world!"
#define QOS 0
#define TIMEOUT 10000L
volatile MQTTAsync_token deliveredtoken;
void connectionLost(void *context, char *cause)
{
printf("connect lost");
}
void messageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message);
void onPublish(void* context, MQTTAsync_successData* response);
int main()
{
MQTTAsync client;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_responseOptions res = MQTTAsync_responseOptions_initializer;
int rc;
MQTTAsync_create(&client,ADDRESS,CLIENTID,MQTTASYNC_PERSISTENCE_ERROR,NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.MQTTVersion = 3;
MQTTAsync_setCallbacks(client,CLIENTID,connectionLost,NULL,NULL);
MQTTAsync_connect(client,&conn_opts);
pubmsg.payload = "hello,mqtt";
pubmsg.payloadlen = strlen(pubmsg.payload);
pubmsg.qos = QOS;
pubmsg.retained = 0;
int connect = 0;
while( connect < 10 ) {
sleep(2);
MQTTAsync_sendMessage(client,TOPIC,&pubmsg,&res);
MQTTAsync_waitForCompletion(client,res.token,TIMEOUT);
printf("send msg [%d]\n",connect);
connect++;
}
MQTTAsync_disconnectOptions ops;
MQTTAsync_disconnect(client,&ops);
MQTTAsync_destroy(&client);
}
recv:
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTAsync.h"
#define ADDRESS "tcp://localhost:1883"
#define CLIENTID "ExampleClientPub"
#define TOPIC "topic"
#define PAYLOAD "Hello, world!"
#define QOS 0
#define TIMEOUT 10000L
int onMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* pubmsg)
{
char* payload = (char*)pubmsg->payload;
int payloadLen = pubmsg->payloadlen;
// 在这里处理收到的消息
printf("Received message on topic: %s\n", topicName);
printf("Message payload: %.*s\n", payloadLen, payload);
// 完成消息处理后,记得释放消息对象
MQTTAsync_freeMessage(&pubmsg);
MQTTAsync_free(topicName);
return 1;
}
void onConnectionLost(void *context, char *cause)
{
printf("connect lost");
}
int main()
{
int rc;
MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
MQTTAsync_message pubmsg = MQTTAsync_message_initializer;
MQTTAsync_responseOptions opts = MQTTAsync_responseOptions_initializer;
MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTASYNC_PERSISTENCE_ERROR, NULL);
// 设置回调函数
MQTTAsync_setCallbacks(client, NULL, onConnectionLost, onMessageArrived, NULL);
conn_opts.keepAliveInterval = 20;
conn_opts.cleansession = 1;
conn_opts.MQTTVersion = 3;
// 连接到 MQTT 代理
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to connect to broker, return code %d\n", rc);
exit(EXIT_FAILURE);
}
// 等待连接完成
printf("Connecting to broker...\n");
while (!MQTTAsync_isConnected(client)) {}
printf("connect successful\n");
// 订阅主题
if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &opts)) != MQTTASYNC_SUCCESS)
{
printf("Failed to subscribe, return code %d\n", rc);
exit(EXIT_FAILURE);
}
// 进入主循环等待消息
printf("Waiting for messages...\n");
while (1) {
sleep(1);
}
// 完成后,断开连接并释放资源
MQTTAsync_disconnect(client, NULL);
MQTTAsync_destroy(&client);
return 0;
}