Linux环境下使用Eclipse Paho C 实现(MQTT Client)同步模式发布和订阅Message

news2025/1/11 18:47:10

目录

概述

1 同步模式和异步模式

1.1 同步模式

1.2 异步模式

2 下载和安装paho.mqtt.c

3 同步方式发布和订阅消息功能实现

3.1 MQTT Client参数配置

3.2 初始化MQTT Client

3.3 发布消息功能

3.4 订阅消息功能

3.5 解析订阅的信息

4 编译和测试

4.1 编译代码

4.2 运行

5 验证MQTT Client功能

5.1 EMQX服务器上查看MQTT Client

5.2 MQTT.fx发布Topic

5.3 MQTT.fx订阅的主题

6 完整代码


概述

本文主要介绍在linux环境(ubuntu)环境下,下载和安装Eclipse Paho C MQTT 软件包,还编写一个范例实现同步发布Message的功能,并使用基于EMQX的服务验证其功能,还是用MQTT.fx订阅消息,已验证发布消息功能的可靠性。

1 同步模式和异步模式

1.1 同步模式

在同步模式下,客户机应用程序在单个线程上运行。使用MQTTClient_publish()和MQTTClient_publishMessage()函数发布消息。要确定QoS1或QoS2(请参阅服务质量)消息已成功交付,应用程必须调用MQTTClient_waitForCompletion()函数。同步发布示例中显示了显示同步发布的示例。在同步模式下接消息使用MQTTClient_receive()函数。客户机应用程序必须相对频繁地调用 MQTTClient_receive()MQTTClient_yield(),以便允许处理确认和MQTT“ping”,从而保持与服务器的网络连接处于活动状态。

总结同步模式应用方法

1)客户机应用程序在单个线程上运行

2)使用MQTTClient_publish()或者MQTTClient_publishMessage()发布消息

3)使用MQTTClient_waitForCompletion()确认消息是否发布成功

4)使用MQTTClient_receive()接收消息

5)必须频繁调用MQTTClient_receive()和MQTTClient_yield(),以确认消息

1.2 异步模式

在异步模式下,客户机应用程序在多个线程上运行。主程序调用客户端库中的函数来发布和订阅,就像同步模式一样。但是,握手和维护网络连接的处理是在后台执行的。使用调用MQTTClient_setCallbacks()(参见MQTTClient_messageArrived()、MQTTClient_connectionLost()和MQTTClient_deliveryComplete())向库注册的回调,向客户端应用程序提供状态通知和消息接收。然而,这个API不是线程安全的——在没有同步的情况下,不可能从多个线程调用它。可以为此使用MQTTAsync API来实现这些功能。

总结异步模式应用方法

1)客户机应用程序在多个线程上运行

2)主程序调用客户端库中的函数来发布和订阅,使用MQTTClient_publish()或者MQTTClient_publishMessage()发布消息;使用MQTTClient_publishMessage订阅消息

3)使用调用MQTTClient_setCallbacks(),向客户端应用程序提供状态通知和消息接收

异步模式的详细使用范例,参看文章:

Linux环境下使用Eclipse Paho C 实现(MQTT Client)异步订阅Message-CSDN博客

Linux环境下使用Eclipse Paho C 实现(MQTT Client)异步方式发布Message-CSDN博客

2 下载和安装paho.mqtt.c

登录mqtt官网,点击Software,可以看见如下页面,选择Eclipse Paho C进入下载页面

https://mqtt.org/

下载paho.mqtt.c

笔者选择使用命令直接安装该软件包,具体操作步骤如下:

Step -1: 下载软件包执行命令:

git clone https://github.com/eclipse/paho.mqtt.c.git

step-2: 进入paho.mqtt.c目录,执行make

cd paho.mqtt.c
make

系统会自动编译代码,等待编译结果。

编译完成后,会自动生成build文件,这时可以安装

step-3 : 执行如下命令就可以安装软件

sudo make install

3 同步方式发布和订阅消息功能实现

3.1 MQTT Client参数配置

初始化MQTT Client,必须配置一些参数,包括broker的IP地址,订阅的topic等,具体参数如下表所示:

参数功能介绍:

参数名称参数值描述
ADDRESStcp://192.168.1.11:1883mqtt broker的IP地址
CLIENTIDmqtt_ubuntu_asys设备ID
TOPICMQTTAsync发布的Topic
SUBTOPICswitch订阅的Topic
PAYLOAD12.56Topic下的payload
QOS1服务质量等级=1
TIMEOUT10000L超时计数
USERNAMEmqtt_ubuntu_user终端认证username
PASSWORD123456终端认证username对应的password

在代码中定义这些参数的位置:

3.2 初始化MQTT Client

初始化MQTT终端需要完成以下2个步骤:

step-1: 创建MQTT Client

step-2: 连接服务器

具体实现代码如下:

代码66行:创建MQTT Client,需要传入服务器IP和Client ID信息

代码73行: 心跳包时间间隔设置为20s

代码74行: 清除会话 标记设置为1,不接受离线消息

代码75行: 配置设备终端用户

代码76行: 配置设备终端用户password

代码78行: MQTT连接Broker

3.3 发布消息功能

要实现发布消息功能,需要将payload及其相关参数填到MQTTClient_message定义的数据结构中,下面介绍整个public message 函数的功能。

代码39行: 装载payload

代码40行:payload的字符长度

代码41行:消息服务等级参数

代码42行:配置为保留消息

代码44行:使用MQTTClient_publishMessage函数发布消息

代码53行:使用MQTTClient_waitForCompletion等待消息发布完毕

3.4 订阅消息功能

要取消订阅的Topic,调用MQTTClient_unsubscribe函数可以实现该功能。实现范例如下:

3.5 解析订阅的信息

代码69行: 使用MQTTClient_receive接收订阅的消息

代码70行: 通过检测rc的值,并判断topic的值是否有效,以确定是否要解析消息

4 编译和测试

4.1 编译代码

使用如下命令编译代码

 gcc test_03_Synchronous.c -lpaho-mqtt3c -lpthread

4.2 运行

执行.out文件后,可以看见,MQTT Client订阅和发布消息成功了,服务器端收到消息后,token值会自动加1

5 验证MQTT Client功能

5.1 EMQX服务器上查看MQTT Client

在ubuntu上运行MQTT Client后,EMQX服务器会显示MQTT Client的运行状态,登录EMQX服务器可以看见

在订阅管理面板上,也可以看见mqtt_ubuntu_asys订阅了Topic为"switch"

5.2 MQTT.fx发布Topic

使用MQTT.fx发布Topic为switch的消息,Client ID 为mqtt_ubuntu_asys的客户端订阅了该消息,那么当MQTT.fx发布消息之后,mqtt_ubuntu_asys会收到该消息,并在终端上打印出来。

要使用MQTT.fx MQTT Client工具订阅MQTTsync,首先保证MQTT.fx能正常连接至EMQX服务器

使用MQTT.fx发布Topic为switch的消息

{ 
   "switch": false 
}

使用MQTT.fx发布Topic为switch的消息

{ 
    "switch": false 
}

在EMQX的保留信息页面,查看MQTT.fx发布Topic为switch的消息,该信息和Client ID 为mqtt_ubuntu_asys的客户端

5.3 MQTT.fx订阅的主题

在EMQX的订阅管理页面,查看MQTT.fx订阅Topic为MQTTsync的消息,该信息和Client ID 为mqtt_ubuntu_asys的客户端发布的消息

在MQTT.fx上查看Topic为MQTTsync的消息

6 完整代码

创建test_03_Synchronous.c,编写如下代码:

/***************************************************************
Copyright  2024-2029. All rights reserved.
文件名  : test_03_Synchronous.c
作者    : tangmingfei2013@126.com
版本    : V1.0
描述    : mqtt同步发布和订阅消息
日志    : 初版V1.0 2024/03/13
​
***************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
​
#include "MQTTClient.h"
 
#define ADDRESS     "tcp://192.168.1.11:1883"
#define CLIENTID    "mqtt_ubuntu_asys"
#define TOPIC       "MQTTsync"
#define SUBTOPIC    "switch"
​
#define PAYLOAD     "12.56"
#define QOS         1
#define TIMEOUT     10000L
​
#define USERNAME    "mqtt_ubuntu_user"
#define PASSWORD    "123456" 
​
static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
static MQTTClient_message pubmsg = MQTTClient_message_initializer;
static MQTTClient client;
static MQTTClient_deliveryToken deliveredtoken, temptoken;
int count;
​
​
MQTTClient_deliveryToken user_publicMsg( void )
{
    MQTTClient_deliveryToken token;
    int rc;
        
    pubmsg.payload = PAYLOAD;
    pubmsg.payloadlen = (int)strlen(PAYLOAD);
    pubmsg.qos = QOS;
    pubmsg.retained = 1;
    
    if ((rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS)
    {
         printf("Failed to publish message, return code %d\n", rc);
         exit(EXIT_FAILURE);
    }
 
    printf("Waiting for up to %d seconds for publication of %s\n"
            "on topic %s for client with ClientID: %s\n",
            (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
    rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
    printf("Message with delivery token %d delivered\n", token);
    
    return token;
}
​
int receive_subMessage( void )
{
    int topicLen;
    int rc;
    char *topic = SUBTOPIC;
    MQTTClient_message *message = NULL;
    
    rc = MQTTClient_receive( client, &topic, &topicLen, &message, TIMEOUT);
    if( rc == MQTTCLIENT_SUCCESS && topic!= NULL ){
        printf("Message arrived \n");
        printf("     topic: %s\n", topic);
        printf("   message: %.*s\n", message->payloadlen, (char*)message->payload);
    }
    printf("MQTTClient receive message, return code %d\n", rc);
    
    return rc;
}
​
​
/* 线程  */
void thread_subMsg(void)
{
    usleep(1000000L);
    count++;
}
​
​
int main(int argc, char* argv[])
{
    int current_cnt;
    pthread_t id;
    int ret;
    int rc;
 
    deliveredtoken = 0;
    if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
        MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
    {
         printf("Failed to create client, return code %d\n", rc);
         exit(EXIT_FAILURE);
    }
 
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    conn_opts.username = USERNAME;   //用户名
    conn_opts.password = PASSWORD;   //密码
    
    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to connect, return code %d\n", rc);
        exit(EXIT_FAILURE);
    }
    
    printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n", 
            SUBTOPIC, CLIENTID, QOS);
            
    if ((rc = MQTTClient_subscribe(client, SUBTOPIC, QOS)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to subscribe, return code %d\n", rc);
        rc = EXIT_FAILURE;
    }
    
    ret = pthread_create(&id,NULL,(void *) thread_subMsg,NULL);
    if(ret!=0){
        printf ("Create pthread error!\n");
        exit (1);
    }
​
    while(1)
    {
        receive_subMessage();
        temptoken = user_publicMsg(); 
        
        if(count != current_cnt )
        {
            current_cnt = count;
            if(temptoken != deliveredtoken){
                deliveredtoken = temptoken;
            }
        }
    }
 
    if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
        printf("Failed to disconnect, return code %d\n", rc);
    MQTTClient_destroy(&client);
    
    return rc;
}
​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1540298.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

InstructGPT的流程介绍

1. Step1&#xff1a;SFT&#xff0c;Supervised Fine-Tuning&#xff0c;有监督微调。顾名思义&#xff0c;它是在有监督&#xff08;有标注&#xff09;数据上微调训练得到的。这里的监督数据其实就是输入Prompt&#xff0c;输出相应的回复&#xff0c;只不过这里的回复是人工…

ctfshow-web入门-反序列化

web254 先看题 <?php/* # -*- coding: utf-8 -*- # Author: h1xa # Date: 2020-12-02 17:44:47 # Last Modified by: h1xa # Last Modified time: 2020-12-02 19:29:02 # email: h1xactfer.com # link: https://ctfer.com*/error_reporting(0); highlight_file(__FIL…

详细解读开源版Sora视频生成模型

Diffusion Models专栏文章汇总&#xff1a;入门与实战 前言&#xff1a;OpenAI的视频生成模型Sora一经发布就广受全世界的瞩目&#xff0c;上海人工智能实验室最近推出了一个基于Diffusion Transformer的结构的模型Latte&#xff0c;堪称最接近Sora原理的视频生成模型。这篇博客…

神级工具之git (一): git 基操

一切都从&#xff1a;Git User Manual开始&#xff0c;或者中文版的Git中文手册 核心概念 工作区 工作区我们可见的&#xff0c;可以进行修改的目录树。我们可以在目录树中进行文件的查看&#xff0c;修改。通常我们会使用一个神级编辑器Vim。我给她取了个名字&#xff0c;就…

综合知识篇19-软件知识产权保护考点(2024年软考高级系统架构设计师冲刺知识点总结系列文章)

专栏系列文章: 2024高级系统架构设计师备考资料(高频考点&真题&经验)https://blog.csdn.net/seeker1994/category_12593400.html案例分析篇00-【历年案例分析真题考点汇总】与【专栏文章案例分析高频考点目录】(2024年软考高级系统架构设计师冲刺知识点总结-案例…

【AI】发现一款运行成本较低的SelfHosting语言模型

【背景】 作为一个想构建局域网AI服务的屌丝,一直苦恼的自然是有限的资源下有没有对Spec要求低一点的SelfHosting的AI服务框架了。今天给大家介绍这款听起来有点希望,但是我也还没试验过,感兴趣的可以去尝试看看。 【介绍】 大模型生成式AI与别的技术不同,由于资源要求高…

CSK6 接入聆思平台(LSPlatform)

一、开发环境 硬件&#xff1a;视觉语音大模型AI开发套件 二、使用大语言模型 官方指导文档&#xff1a; 开始使用 | 聆思文档中心 获取API密钥 | 聆思文档中心 1、注册 提交申请之后需要将注册电话号码通过微信发送给聆思科技工作人员&#xff0c;工作人员授权后&#xff…

使用 chezmoi vscode, 管理你的 dotfiles

什么是 dotfiles In Unix-like operating systems, any file or folder that starts with a dot character (for example, /home/user/.config), commonly called a dot file or dotfile. 任何以 . 开头去命名的文件或者目录都可以称为 dotfile, 在 Unix-like 系统一般用的比较…

JavaEE-文件操作和IO

我们先来认识狭义上的⽂件(file)。针对硬盘这种持久化存储的I/O设备&#xff0c;当我们想要进⾏数据保存时&#xff0c;往往不是保存成⼀个整体&#xff0c;⽽是独⽴成⼀个个的单位进⾏保存&#xff0c;这个独⽴的单位就被抽象成⽂件的概念&#xff0c;就类似办公桌上的⼀份份真…

工作中常用到的Linux命令

系统&#xff0c;用户信息操作相关命令 查看主机ip地址 ifconfig 获取用户信息 id 修改用户密码 passwd 查看链接用户 who 创建新用户账号 useradd 删除用户账号 userdel 修改用户账号的属性 usermod 查看系统发行版本 cat /proc/version 说明适用于所有版本。…

C++面向对象三大特征-----继承(详细版)

目录 继承 一、继承的基础介绍 普通版网页和继承版网页的区别 语法 二、继承方式 三种继承方式 三、继承中的对象模型 四、继承中构造和析构函数 五、继承同名成员的处理方式 访问同名成员&#xff1a; 作用域写法&#xff1a; 六、继承同名静态成员的处理方式 访问…

通讯录的模拟实现(C语言)

通讯录要求&#xff1a; 1&#xff0c;联系人要拥有姓名。年龄。性别&#xff0c;电话&#xff0c;地址 2&#xff0c;拥有增加&#xff0c;删除&#xff0c;搜索&#xff0c;修改&#xff0c;展示&#xff08;所有联系人&#xff09;&#xff0c;退出功能 3&#xff0c;能存…

力扣49. 字母异位词分组

Problem: 49. 字母异位词分组 文章目录 题目描述思路及解法复杂度Code 题目描述 思路及解法 1.以字符串作为键&#xff0c;与该键是字母异位词所组成的数组为值创建map集合&#xff1b; 2.每次取出一个字符串将其排序&#xff0c;再存入对应的数组&#xff1b; 3.将map中的值存…

VSGitHub项目联动(上传和克隆),创建你的第一个仓库,小白配置

目录&#xff1a; 前言一&#xff0c;基本说明1.1名词概念1.2必配条件 二&#xff0c;配置方法2.1本地生成密钥2.2云端代码托管平台SSH配置添加&#xff08;GitHub&#xff09;2.3VS项目配置 三&#xff0c;参考四&#xff0c;一些讨论 前言 &#x1f308;在编写VS代码项目时&a…

containerd源代码分析: 整体架构

本文从代码的大的整体组织上来熟悉containerd项目 containerd项目总的说是一个cs模式的原生控制台程序组。containerd作为服务端来接收处理client的各种请求&#xff0c;如常用的拉取推送镜像&#xff0c;创建查询停止容器&#xff0c;生成快照&#xff0c;发送消息等。client/…

程序设计语言+嵌入式系统设计师备考笔记

0、前言 本专栏为个人备考软考嵌入式系统设计师的复习笔记&#xff0c;未经本人许可&#xff0c;请勿转载&#xff0c;如发现本笔记内容的错误还望各位不吝赐教&#xff08;笔记内容可能有误怕产生错误引导&#xff09;。 1、嵌入式系统开发与设计 1.1嵌入式应用程序的生成与加…

在线获取文本列表并集计算器

具体请前往&#xff1a;在线文本并集计算工具

rabbitmq 3.9.29 docker mac 管理员页面无法打开

SyntaxError: Unexpected token ‘catch’ SyntaxError: Unexpected token ‘catch’ at EJS.Compiler.compile (http://127.0.0.1:15672/js/ejs-1.0.min.js:1:6659) at new EJS (http://127.0.0.1:15672/js/ejs-1.0.min.js:1:1625) at format (http://127.0.0.1:15672/js/main…

【Flask】Flask数据迁移操作

Flask数据迁移操作 前提条件 安装第三方包&#xff1a; # ORM pip install flask-sqlalchemy # 数据迁移 pip install flask-migrate # MySQL驱动 pip install pymysql # 安装失败&#xff0c;指定如下镜像源即可 # pip install flask-sqlalchemy https://pypi.tuna.tsinghu…

【Docker】golang操作容器使用rename动态更新容器的名字

【Docker】golang操作容器使用rename动态更新容器的名字 大家好 我是寸铁&#x1f44a; 总结了一篇golang操作容器使用rename动态更新容器的名字✨ 喜欢的小伙伴可以点点关注 &#x1f49d; 前言 今天遇到一个新的需求&#xff0c;要动态改变运行中的容器名字。 可以考虑先把…