认识MQTT(C语言)

news2024/9/22 19:22:11

MQTT基础概念

MQTT 入门介绍 | 菜鸟教程

MQTT使用

在linux下搭建MQTT服务器(Broker)

在linux下执行下面命令安装MQTT服务器

📎mosquitto-1.6.3.tar.gzhttps://www.yuque.com/attachments/yuque/0/2023/gz/35243076/1687955850547-b5941261-e660-4c0a-bb58-04b9c8efb56d.gz

tar xf mosquitto-1.6.3.tar.gz
cd mosquitto-1.6.3/
make
sudo make install

安装完成后,终端输入mosquitto命令即可运行。端口默认是1883

$ mosquitto
1639476502: mosquitto version 1.6.3 starting
1639476502: Using default config.
1639476502: Opening ipv4 listen socket on port 1883.
1639476502: Opening ipv6 listen socket on port 1883.

MQTT客户端

PC客户端

PC客户端测试推荐使用MQTT.fx客户端软件

》1.设置MQTT.fx软件

》2.开启linux下的MQTT服务器

终端输入mosquitto命令即可运行

》3.链接

链接后linux服务器出现提示

》4.发布者页面

》5.订阅者页面

》6.通过上面介绍可以开启两次软件,一个是发布者一个是订阅者,进行通信测试

客户端库移植

编译安装

官方下载地址

GitHub - eclipse/paho.mqtt.c at v1.3.0

把源码包放到自己家目录任意位置,执行下面的指令

📎paho.mqtt.c-1.3.0.ziphttps://www.yuque.com/attachments/yuque/0/2023/zip/35243076/1687958647466-904614f1-e4a9-47a4-b514-8a5b34d77f03.zip

unzip paho.mqtt.c-1.3.0.zip
cd paho.mqtt.c-1.3.0/
cmake -DCMAKE_INSTALL_PREFIX=/usr
make
sudo make install

执行完上面的命令后,会在/usr目录下生成samples文件

进入目录我们能看到很多代码

这是官方给我们提供的示例

但是在根目录下进行编写执行代码是不妥的,我们一般在家目录下自己的文件夹进行编写。

把/usr目录里的代码拿到自己的工作目录再修改编译

我们主要使用MQTTClient_subscribe.c(订阅)MQTTClient_publish.c(发布)

MQTTClient_subscribe.c代码分析(中文版)

/*******************************************************************************
 * 版权所有 (c) 2012, 2017 IBM Corp.
 *
 * 保留所有权利。此程序及其附带材料
 * 根据 Eclipse Public License v1.0 和 Eclipse Distribution License v1.0
 * 提供。许可证附带此发行版。
 *
 * Eclipse Public License 可在以下网址获取:
 *   http://www.eclipse.org/legal/epl-v10.html
 * Eclipse Distribution License 可在以下网址获取:
 *   http://www.eclipse.org/org/documents/edl-v10.php。
 *
 * 贡献者:
 *    Ian Craggs - 初始贡献
 *******************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"

#define ADDRESS     "tcp://192.168.31.240:1883"  // MQTT服务器地址
#define CLIENTID    "ExampleClientSub"           // 客户端ID
#define TOPIC       "MQTT Examples"               // 订阅的主题
#define PAYLOAD     "Hello World!"                // 发布的消息内容
#define QOS         1                             // 服务质量
#define TIMEOUT     10000L

volatile MQTTClient_deliveryToken deliveredtoken;

void delivered(void *context, MQTTClient_deliveryToken dt)
{
    printf("消息传递令牌值为 %d 的消息已确认传递\n", dt);
    deliveredtoken = dt;
}

/**
 * @brief 
 * 
 * @param context 
 * @param topicName 收到来自哪个主题的消息
 * @param topicLen 主题的长度
 * @param message 消息体:payload(消息体,字符串)  payloadlen:消息的长度
 * @return int 
 */
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
    int i;
    char* payloadptr;

    printf("收到消息\n");
    printf("     主题: %s\n", topicName);
    printf("   消息内容: ");
#if 0
    payloadptr = message->payload;
    for(i=0; i<message->payloadlen; i++)
    {
        putchar(*payloadptr++);
    }
#endif
    printf("接收到的消息 = %s\n", (char *)message->payload);

    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);
    return 1;
}

void connlost(void *context, char *cause)
{
    printf("\n连接丢失\n");
    printf("     原因: %s\n", cause);
}

int main(int argc, char* argv[])
{
    // 客户端句柄(描述符)
    MQTTClient client;
    // 连接参数
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    int rc;
    int ch;

    // 创建客户端,并且指定客户端连接的 MQTT 服务器地址和客户端 ID
    MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);

    // 初始化连接参数
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;

    // 设置回调接口,只需要关注 msgarrvd:消息到达后,会自动调用这个接口
    MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);

    // 连接到 broker
    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    {
        printf("连接失败,返回代码 %d\n", rc);
        exit(EXIT_FAILURE);
    }

    printf("订阅主题 %s\n客户端: %s\n使用服务质量 QoS%d\n\n"
           "按下 Q<Enter> 退出\n\n", TOPIC, CLIENTID, QOS);

    // 订阅某个主题,指定订阅主题的名字,可以指定服务质量 QoS
    MQTTClient_subscribe(client, TOPIC, QOS);

    // 死循环,直到收到了一个 'Q' 就退出
    do 
    {
        ch = getchar();
    } while(ch!='Q' && ch != 'q');

    MQTTClient_unsubscribe(client, TOPIC);
    MQTTClient_disconnect(client, 10000);
    MQTTClient_destroy(&client);
    return rc;
}

回调函数实现原理

func(msgarrd)
{
    while(1)
    {
        recv(data);
        msgarrd(data);
    }
}
MQTTClient_setCallbacks(msgarrd)
    pthread_create(func, msgarrd);

注意:接收回调函数的返回值一定不要改,返回0会导致段错误。

MQTTClient_publish.c代码分析(中文版)

/*******************************************************************************
 * 版权所有 (c) 2012, 2017 IBM Corp.
 *
 * 保留所有权利。此程序及其附带材料
 * 根据 Eclipse Public License v1.0 和 Eclipse Distribution License v1.0
 * 提供。许可证附带此发行版。
 *
 * Eclipse Public License 可在以下网址获取:
 *   http://www.eclipse.org/legal/epl-v10.html
 * Eclipse Distribution License 可在以下网址获取:
 *   http://www.eclipse.org/org/documents/edl-v10.php。
 *
 * 贡献者:
 *    Ian Craggs - 初始贡献
 *******************************************************************************/

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"

#define ADDRESS     "tcp://localhost:1883"  // MQTT 服务器地址
#define CLIENTID    "ExampleClientPub"      // 客户端ID
#define TOPIC       "MQTT Examples"          // 发布的主题
#define PAYLOAD     "Hello World!"           // 发布的消息内容
#define QOS         1                        // 服务质量
#define TIMEOUT     10000L                   // 发布超时时间

int main(int argc, char* argv[])
{
    MQTTClient client;  // MQTT 客户端
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;  // 连接选项
    MQTTClient_message pubmsg = MQTTClient_message_initializer;  // 发布的消息
    MQTTClient_deliveryToken token;  // 消息传递令牌
    int rc;  // 返回代码

    MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);  // 创建 MQTT 客户端
    conn_opts.keepAliveInterval = 20;  // 保持活动状态的时间间隔
    conn_opts.cleansession = 1;  // 清除会话信息

    // 连接到 MQTT 服务器
    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    {
        printf("连接失败,返回代码 %d\n", rc);
        exit(EXIT_FAILURE);
    }
    
    pubmsg.payload = PAYLOAD;  // 设置发布的消息内容
    pubmsg.payloadlen = (int)strlen(PAYLOAD);  // 设置消息内容的长度
    pubmsg.qos = QOS;  // 设置服务质量
    pubmsg.retained = 0;  // 设置是否保留消息
    MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);  // 发布消息到指定主题
    printf("等待 %d 秒钟以确保消息 %s\n"
            "发布到主题 %s,客户端ID为: %s\n",
            (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
    rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);  // 等待消息发布完成
    printf("传递令牌为 %d 的消息已发布\n", token);
    MQTTClient_disconnect(client, 10000);  // 断开连接
    MQTTClient_destroy(&client);  // 销毁客户端
    return rc;
}

编码测试

》1.以MQTTClient_subscribe.c为例,修改代码中IP地址和主题即可完成最简单的通信。

》2.使用如下命令编译代码,注意需要指定需要的库-l paho-mqtt3c

gcc MQTTClient_subscribe.c -l paho-mqtt3c

3.在执行程序时注意开启linux服务器

终端输入mosquitto命令即可运行

》4.执行编译生成的程序

》5.这样就可和windows下的MQTT.fx进行通信测试了

我们可以看到最后一个字符出现了乱码

是因为修改了程序的这部分导致的,但并不影响我们的测试

练习(重点)

利用mqtt.fx软件实现聊天功能,fx订阅"up"主题,程序订阅"down"主题。使用如下的json通信协议。

{
  "name": "zhangsan",
  "age": 16,
  "msg": "hello world"
}

提示:mqtt的连接类似与TCP的连接,有且仅有一个连接。连接和订阅动作不能放到循环中。通信时不要用中文,我们终端默认是utf-8编码,而fx软件用的是其它编码,会出现乱码现象。

注意: 官方订阅和发布是两个例子,需要整合到一个代码里,最后只启动一个进程 。进程启动后,从终端获取用户输入然后发送给fx,并且能接收来自fx的消息。终端只接收msg,name和age按照上面的例子定死即可。涉及到多文件编译,可以写个Makefile来组织工程。对于同一个Broker地址只需要一路mqtt链接即可。

答案如下:下面是源码使用tar -xvf samples.tar进行解压

📎samples.taricon-default.png?t=N658https://www.yuque.com/attachments/yuque/0/2023/tar/35243076/1688999287758-23d9df26-fb88-4fda-9238-a012bf0b5a40.tar

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "MQTTClient.h"
#include "cJSON.h"

#define ADDRESS "tcp://192.168.19.129:1883"
#define CLIENTID "ExampleClientSub"
#define TOPIC "down"
#define PAYLOAD "Hello World!"
#define QOS 1
#define TIMEOUT 10000L

volatile MQTTClient_deliveryToken deliveredtoken;

void delivered(void *context, MQTTClient_deliveryToken dt)
{
    printf("Message with token value %d delivery confirmed\n", dt);
    deliveredtoken = dt;
}

//线程处理接收到的消息
int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
{
    int i;
    char *payloadptr;
    //下边处理收到的消息
    //把传入的字符串转成cJSON的结构(反序列化)
    //最好添加判断,若不是json结构不进行处理,直接提示
    cJSON *cjson = cJSON_Parse((char *)message->payload);//消息来到了这个数组里
    if (cjson == NULL)
    {
        printf("cjson error...\r\n");
        MQTTClient_freeMessage(&message); //直接释放
        MQTTClient_free(topicName); 
        return 1;
    }
    //开始摘果子
    char *name = cJSON_GetObjectItem(cjson, "name")->valuestring;
    printf("%s:", name);

    int age = cJSON_GetObjectItem(cjson, "age")->valueint;

    char *msg = cJSON_GetObjectItem(cjson, "msg")->valuestring;

    printf("%s\n", msg);

    MQTTClient_freeMessage(&message);
    MQTTClient_free(topicName);

    return 1;
}

void connlost(void *context, char *cause)
{
    printf("\nConnection lost\n");
    printf("     cause: %s\n", cause);
}
//主函数

int main(int argc, char *argv[])
{
    pthread_t tid;

    //客户端句柄(描述符)
    MQTTClient client;
    //连接参数
    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    MQTTClient_message pubmsg = MQTTClient_message_initializer;
    MQTTClient_deliveryToken token;
    int rc;
    int ch;

    //创建客户端,并且指定客户端连接的mqtt服务器地址和客户端ID
    MQTTClient_create(&client, ADDRESS, CLIENTID,
        MQTTCLIENT_PERSISTENCE_NONE, NULL);

    //初始化连接参数
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;

    //设置回调接口,只需要关注msgarrvd:消息到达后,会自动调用这个接口
    MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);

    //连接到broker(只链接一次即可)
    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to connect, return code %d\n", rc);
        exit(EXIT_FAILURE);
    }

    printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n"
        "Press Q<Enter> to quit\n\n",
        TOPIC, CLIENTID, QOS);

    //订阅某个主题,指定订阅主题的名字,可以指定qos服务质量
    //订阅down主题
    MQTTClient_subscribe(client, TOPIC, QOS);

    //死循环,直到收到了一个q就退出
    do
        {
            char buf[128];
            fgets(buf, sizeof(buf), stdin);
            if (buf[strlen(buf) - 1] == '\n')
                buf[strlen(buf) - 1] = '\0';
            cJSON *TCP = cJSON_CreateObject();
            cJSON_AddStringToObject(TCP, "name", "southernbrid");
            cJSON_AddNumberToObject(TCP, "age", 18);
            cJSON_AddStringToObject(TCP, "msg", buf);
            char *json_data = cJSON_Print(TCP);

            //填充pubmsg结构体
            pubmsg.payload = json_data;
            pubmsg.payloadlen = (int)strlen(json_data);
            pubmsg.qos = QOS;
            pubmsg.retained = 0;
            deliveredtoken = 0;

            //向up发送消息
            MQTTClient_publishMessage(client, "up", &pubmsg, &token);
            printf("Waiting for publication of %s\n"
                "on topic %s for client with ClientID: %s\n",
                PAYLOAD, "up", CLIENTID);
            //老师友情提醒:注意资源释放
            //不然等与慢性自杀
            free(json_data);
            cJSON_Delete(TCP);
        } while (1);

    //回收资源
    MQTTClient_unsubscribe(client, TOPIC);
    MQTTClient_disconnect(client, 10000);
    MQTTClient_destroy(&client);
    return rc;
}

 

你不得不看的图文并茂的MQTT协议通信过程!!!_你不得不看mqtt__杰杰_的博客-CSDN博客图文并茂讲解MQTT协议通信过程,深入理解MQTT协议工作过程。_你不得不看mqtthttps://jiejie.blog.csdn.net/article/details/106737995?spm=1001.2014.3001.5502

MQTT协议简介及协议原理__杰杰_的博客-CSDN博客带你看看MQTT协议简介及协议原理_mqtt协议https://jiejie.blog.csdn.net/article/details/106732811?spm=1001.2014.3001.5502

可以结合着wireshark抓包看看内部实现原理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/739818.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

003-Dubbo服务的发布和引用

目录 Dubbo3.0发布注册应用级注册-配置应用级注册-端口应用级注册-消费者-确定服务信息应用级注册-消费者-元数据中心 Dubbo3.0 发布注册 应用级注册-配置 因为接口级注册随着服务增多&#xff0c;注册中压力会越来越大 所以在3.0版本提供了应用级注册 #默认是all 接口和应…

两两交换链表中的节点——力扣24

题目描述 方法一&#xff1a;递归 class Solution{ public:ListNode* swapPairs(ListNode* head){if(!head || !head->next){return head;} ListNode* newHead head->next;head->next swapPairs(newHead->next);newHead->next head;return newHead;} }; 方法…

python以固定时间间隔取行

目录 1. 间隔取行2. 时间戳间隔取行&#xff1a;下采样参考链接 1. 间隔取行 dataframe 实现每隔 n 行取 1 行 近期在做数据分析的时候&#xff0c;用到了对csv文件每隔n行取1行的操作&#xff0c;正常情况下会立马想到for循环&#xff0c;可能大家还会有其他方法&#xff0c…

第八章——函数探幽

C内联函数 内联函数是为了提高程序运行速度所做的一项改进。常规函数与内联函数的主要区别不在于编写方式&#xff0c;而在于C编译器如何将它们组合到程序中。 对于 C内联函数&#xff0c;编译器将使用相应的函数代码替换函数调用&#xff0c;程序无需跳到另一个位置处执行代…

LEADTOOLS V22 支持.NET 5-7.0 Crack

使用 LEADTOOLS 构建更好的应用程序 LEADTOOLS 由专利人工智能和机器学习算法提供支持&#xff0c;是一系列综合工具包&#xff0c;可将识别、文档、医疗、成像和多媒体技术集成到桌面、服务器、平板电脑、网络和移动解决方案中。 光学字符识别/ICR 以无与伦比的速度和准确性提…

Django系列所有漏洞复现vulhubCVE-2018-14574,CVE-2022-34265,CVE-2021-35042

文章目录 Django < 2.0.8 任意URL跳转漏洞&#xff08;CVE-2018-14574&#xff09;漏洞详情&#xff1a;复现&#xff1a; Django Trunc(kind) and Extract(lookup_name) SQL注入漏洞&#xff08;CVE-2022-34265&#xff09;漏洞详情&#xff1a;复现&#xff1a; Django Qu…

十五分钟逐步掌握关键路径问题(时间余量、关键活动以及关键路径的求解)

关键路径问题 名人说&#xff1a;莫听穿林打叶声&#xff0c;何妨吟啸且徐行。—— 苏轼《定风波莫听穿林打叶声》 本篇笔记整理&#xff1a;Code_流苏(CSDN)&#xff08;一个喜欢古诗词和编程的Coder&#x1f60a;&#xff09; 目录 关键路径问题〇、概念说明1、AOE网2、关键路…

微服务系列文章之 nginx负载均衡

nginx负载均衡 负载均衡建立在现有网络结构之上&#xff0c;提供了一种廉价有效透明的方法扩展网络设备和服务器的带宽&#xff0c;增加吞吐量、加强网络数据处理能力、提高网络的灵活性和可用性。 随着网站的发展&#xff0c;服务器压力越来越大&#xff0c;我们可能首先会将数…

虚拟IP绑定公网IP访问

绑定公网 IP 我们目前的虚拟 IP&#xff0c;还不能通过公网的形式进行访问&#xff0c;我们首先&#xff0c;来使用内部的 IP 进行访问看看效果如下&#xff1a; curl 虚拟IP 如上图我访问了两次&#xff0c;第一次访问返回的是 2222 的 nginx&#xff0c;第二次访问是 1111 的…

【SpringCloud-9】JWT

这一篇主要介绍一下&#xff0c;微服务之间的用户权限问题。 通常呢&#xff0c;对于用户的登录鉴权&#xff0c;有两种方式&#xff1a; 1、基于session的方式&#xff1a; session是要存到服务端的&#xff0c;但是分布式服务太多&#xff0c;不可能每个服务端都存。 那就…

Python:文件选择界面 and 文件夹选择界面

文章目录 &#xff08;1&#xff09;文件选择界面&#xff08;2&#xff09;文件夹选择界面 Python本身没有内置的文件夹选择界面。然而&#xff0c;可以使用第三方库来实现在代码中选择文件或文件夹的功能。一个常用的库是tkinter&#xff0c;它是Python的标准GUI库之一&#…

IDEA中侧边栏没有git commit模块,如何恢复?

一、修改之前 侧边栏没有git commit模块 二、修改之后 侧边栏恢复了git commit模块 三、下面是恢复教程 1.中文版 打开 文件 -> 设置 -> 版本控制 -> 提交 -> 勾选 【使用非模式提交界面】 -> 点击【确定】 2.英文版 打开 file -> Settings -> Version Co…

ABB机器人在RobotStudio中进行数字与字符串相互转换的具体方法

ABB机器人在RobotStudio中进行数字与字符串相互转换的具体方法 如下图所示,打开RobotStudio软件,在RAPID—Module1中编写程序,首先声明几个测试需要用到的变量, 本例中利用 NumToStr 函数将数组变量中的元素依次转换成字符后赋值给tempString变量, 如下图所示,利用 StrTo…

第八章 npm锁定版本

1、历史原因 当我们走 npm install 带 ^ 会升级 为什么要锁版本 稳定大于一切 代码需要可控 风险可控 系统可控 环境需要一致

第四章 React18的重要更新和使用的新特性

1、7个新特性 2、3个新API 3、1个新模式 4、2个新并发API 1、7个新特性 Render API 使用了它才可以进入并发模式的渲染 setState自动批处理 有些情况&#xff0c;不希望合并处理 flsuhSync 关于卸载组件时的更新状态警告&#xff08;直接删除这个报错&#xff09; 关于r…

nacos启动问题整理

一 win下启动 1、nacos1.X启动 2、nacos2.X启动 2.x需要jdk11以上版本 1&#xff09;打开bin目录&#xff0c;修改startup.com脚本 启动模式&#xff0c;点击startup.cmd默认启动的集群模式&#xff0c;需要修改这个启动文件 2&#xff09; 连接mysql&#xff0c;执行sql…

【笔记】数字电路基础2 - 数制编码与逻辑电路

目录 数制、编码与逻辑代数数制编码逻辑代数 组合逻辑电路组合逻辑电路分析与设计编码器译码器加法器数值比较器数据选择器奇偶校验器 数制、编码与逻辑代数 数制 本小节主要陈述十进制、二进制、十六进制及其对应的转换法则&#xff0c;网上对应的文章已经有很多&#xff0c;…

一文搞定SpringBoot中日志框架使用

文章目录 Spring Boot 对日志框架的封装SLF4J Logback快速入门调试模式Logback 扩展SLF4J Log4J2 Spring Boot 对日志框架的封装 我们知道在日志方面&#xff0c;SpringBoot默认是使用的SLF4JLogBack的形式。我们来看看它使用的日志实现框架LogBack&#xff0c;其在 Default…

Tomcat、Maven以及Servlet的基本使用

Tomcat什么是TomcatTomcat的目录结构启动Tomcat MavenMaven依赖管理流程配置镜像源 Servlet主要工作实现Servlet添加依赖实现打包分析 配置插件 Tomcat 什么是Tomcat Tomcat 是一个 HTTP 服务器。前面我们已经学习了 HTTP 协议, 知道了 HTTP 协议就是 HTTP 客户端和 HTTP 服务…

LinearAlgebraMIT_4_矩阵的LU分解

矩阵做逆变换需要要反过来&#xff0c;如下&#xff0c; 转置的逆等于逆的转置。 在知道了上面的基础知识后&#xff0c;我们进行矩阵的分解&#xff0c;常见如LU分解和LDU分解&#xff0c;如下&#xff0c; 在这里&#xff0c;我们首先具有一个矩阵A&#xff0c;我们对矩阵A进…