EMQX 实践

news2024/12/23 4:31:35

MQTT 核心概念

发布订阅

MQTT 基于发布订阅模式,它解耦了消息的发送方(发布者)和接收方(订阅者),引入了一个中间代理的角色来完成消息的路由和分发。发布者和订阅者不需要知道彼此的存在,他们之间唯一的联系就是对消息的一致约定,例如消息将使用什么主题、消息将包含哪些字段等等。这让 MQTT 的通信更加灵活,因为我们可以随时动态地增加或减少订阅者和发布者。通过发布订阅,我们可以轻易地实现消息的广播、组播和单播。

服务端

在发布消息的客户端和订阅的客户端之间充当中介,将所有接收到的消息转发到匹配的订阅客户端。所以有时我们也会直接将服务端称为 Broker。

客户端

使用 MQTT 协议连接到 MQTT 服务端的设备或应用程序。它既可以是发布者,也可以是订阅者,也可以具备这两种身份。

主题

主题被用来标识和区分不同的消息,它是 MQTT 消息路由的基础。发布者可以在发布时指定消息的主题,订阅者则可以选择订阅自己感兴趣的主题来接收相关的消息。

通配符

订阅者可以在订阅的主题中使用通配符来达到一次订阅多个主题的目的。MQTT 提供了单层通配符和多层通配符两种主题通配符,以满足不同的订阅需要。

QoS

MQTT 定义了三种 QoS 等级,来分别提供不同的消息可靠性保证。每条消息都可以在发布时独立设置自己的 QoS。QoS 0 最多交付一次,消息可能丢失;QoS 1 至少交付一次,消息可以保证到达,但是可能重复;QoS 2 只交付一次,消息保证到达,并且不会重复。QoS 越大,消息的传输复杂程度也越高,我们需要根据实际场景来选择合适的 QoS。

会话

QoS 只是设计了消息可靠到达的理论机制,而会话则确保了 QoS 1、2 的协议流程得以真正实现。会话是客户端与服务端之间的有状态交互,它可以仅持续和网络连接一样长的时间,也可以跨越多个网络连接存在,我们通常将后者称为持久会话。我们可以选择让连接从已存在的会话中恢复,也可以选择从一个全新的会话开始。

保留消息

与普通消息不同,保留消息可以保留在 MQTT 服务器中。任何新的订阅者订阅与该保留消息中的主题匹配的主题时,都会立即接收到该消息,即使这个消息是在它们订阅主题之前发布的。这使订阅者在上线后可以立即获得数据更新,而不必等待发布者再次发布消息。在某种程度上,我们可以把保留消息当作是一个消息 “云盘” 来使用:随时上传消息到 “云盘”,然后在任意时刻从 “云盘” 获取消息。当然,这个 “云盘” 还有一个主题下只能存储一条最新的保留消息的限制。

遗嘱消息

发布订阅模式的特性决定了,除了服务器以外没有客户端能够感知到某个客户端从通信网络中离开。而遗嘱消息则为连接意外断开的客户端提供了向其他客户端发出通知的能力。客户端可以在连接时向服务器设置自己的遗嘱消息,服务器将在客户端异常断开后立即或延迟一段时间后发布这个遗嘱消息。而订阅了对应遗嘱主题的客户端,将收到这个遗嘱消息,并且采取相应的措施,例如更新该客户端的在线状态等等。

共享订阅

默认情况下,消息会被转发给所有匹配的订阅者。但有时,我们可能希望多个客户端协同处理接收到的消息,以便以水平扩展的方式来提高负载能力。又或者,我们希望为客户端增加一个备份客户端,当主客户端离线时,能够无缝切换到备份客户端继续接收消息,以确保高可用性。而 MQTT 的共享订阅特性,则提供了这一能力。我们可以将客户端划分为多个订阅组,消息仍然会被转发给所有订阅组,但每个订阅组内每次只会有一个客户端收到消息。


MQTT选型

MQTT BROKER 技术选型


EMQX安装

本地开发环境

可以选择安装Windows版本
Windows安装EMQ X
官方Windows部署
安装成功后,直接访问http://localhost:18083/
账号/密码:admin/public
image.png

生产环境

EMQX 本身支持分布式集群架构,能够在保证高可用性、容错性和可扩展性的同时,处理大量的客户端和消息。通过使用 EMQX 集群,您可以在一个或多个节点发生故障时仍然保持集群运行,从而享受到容错和高可用性的好处。
相比与之前版本,EMQX 5.0 集群采用了新的 Mria 集群架构,单节点能支持 500 万 MQTT 设备连接,集群可扩展至 1 亿并发 MQTT 连接。官方集群部署
image.png


安全指南

网络与 TLS
介绍了 EMQX 如何支持端对端加密通信,包括如何启用 SSL/TLS 连接和获取 SSL/TLS 证书。
认证
身份认证是物联网应用的重要组成部分,可以帮助有效阻止非法客户端的连接。为了提供更好的安全保障,EMQX 支持多种认证机制,如 X.509 证书认证、密码认证、JWT 认证、基于 MQTT 5.0 协议的增强认证以及 PSK 认证。本节介绍了这些认证机制的工作方式和配置方法。
授权
在 EMQX 中,授权是指对 MQTT 客户端的发布和订阅操作进行权限控制。本节将介绍如何通过内置数据库、文件、或通过集成 MySQL、PostgreSQL、MongoDB 和 Redis 进行授权相关操作。
黑名单
EMQX 为用户提供了黑名单功能,用户可以通过 Dashboard 和 HTTP API 将指定客户端加入黑名单以拒绝该客户端访问,除了客户端标识符以外,还支持直接封禁用户名甚至 IP 地址。
连接抖动检测
EMQX 支持自动封禁那些被检测到短时间内频繁登录的客户端,并且在一段时间内拒绝这些客户端的登录,以避免此类客户端过多占用服务器资源而影响其他客户端的正常使用。

认证

EMQX Dashboard 提供了开箱即用的认证与权限管理功能,用户仅通过用户界面,就可以快速实现客户端认证授权机制的配置,无需编写代码或手动编辑配置文件,即可对接各类数据源与认证服务,实现各个级别与各类场景下的安全配置,以更高的开发效率获得更安全的保障。

创建认证

在认证页面下的右上角,点击 创建 按钮,即可进入到创建认证的页面。创建一个认证需要选择一种认证方式,选择完成后需要选择一个存储或获取认证信息的数据源(JWT 认证方式除外),认证数据可以从这些数据源包括数据库或 HTTP 服务中获取,最后再配置连接到该数据源的连接信息即可。
认证方式:Password-Based,使用客户端 ID 或用户名加密码的认证方式;
image.png
数据源选择:redis
image.png
选择加密方式及加盐方式:加密方式md5 ,加盐方式prefix
image.png
初始化数据到redis:
HMSET “mqtt_user:username” “password_hash” “66ace8890090c2a50e729318d45fe53b” “salt” “abc”

验证

image.png

http签名配置

创建API秘钥

image.png
image.png

记录秘钥

appId: *************
appSecret: *************

MQTT通用组件开发

源码地址

目录

├─component-mqtt-client
└─component-mqtt-client-starter

component-mqtt-client

mqtt上下文

image.png

建立连接
public MqttClientApp connect() {
    countDownLatch = new CountDownLatch(1);
    Vertx.vertx().deployVerticle(this);
    return this;
}
接收消息
 @Override
    public void start() {
        if (Objects.isNull(this.mqttClient)) {
            this.mqttClient = MqttClient.create(vertx, createMqttClientOptions());
        }
        //接收服务端消息处理handler
        mqttClient.publishHandler(pub -> {
            Buffer buffer = pub.payload();
            String topicName = pub.topicName();
            String[] split = topicName.split("/");
            String string = buffer.toString(StandardCharsets.UTF_8);
            UpMessage upRawMessage = new UpMessage();
            HashMap<String, Object> headers = Maps.newHashMap();
            headers.put("topic",topicName);
            headers.put("qos",pub.qosLevel().value());
            upRawMessage.setHeaders(headers);
            upRawMessage.setMessageContent(string);
            upRawMessage.setProductKey(split[0]);
            upRawMessage.setDeviceId(split[1]);
            mqttListenerList.forEach(f -> {
                String topic = f.getTopic();
                String[] listenerTopic = topic.split("/");
                boolean flag = true;
                for (int i = 0; i < split.length; i++) {
                    if (allWildcard.equals(listenerTopic[i])) {
                        break;
                    }
                    if (singleWildcard.equals(listenerTopic[i])) {
                        continue;
                    }
                    if (!split[i].equals(listenerTopic[i])) {
                        flag = false;
                        break;
                    }
                }
                if (flag){
                    f.onMessage(upRawMessage);
                }

            });

        });
        mqttClient.closeHandler(unused -> getVertx().setTimer(RECONNECT_INTERVAL, h -> start()));
        mqttClient.connect(mqttConfig.getListenerInfos().getPort(), mqttConfig.getListenerInfos().getHost(),
                s -> {
                    if (s.succeeded()) {
                        log.info("MqttClient connect success.");
                        subscribe();
                        countDownLatch.countDown();
                    } else {
                        log.error("MqttClient connect fail: ", s.cause());
                        if (s.cause() != null) {
                            vertx.setTimer(RECONNECT_INTERVAL, handler -> this.start());
                        }
                    }
                });
    }
长连接推送消息
public MqttResp publish(MqttReq request) {
MqttResp response = new MqttResp();
Buffer payload = Buffer.buffer(request.getMessageContent());
mqttClient.publish(request.getTopic(), payload, MqttQoS.valueOf(request.getQos()), false, false, s -> {
    if (s.succeeded()) {
        log.info("===>MqttClient publish success[{}]", s.result());
    } else {
        log.error("===>MqttClient publish fail.", s.cause());
    }
});
response.setCode(200);
return response;
}
http推送消息
public Map<String, ?> callHttp(MqttReq params) {
String path = "";
String url = config().getAddress() + path;
log.debug("http url[{}] requestBodyStr[{}]", url, params.getMessageContent());

Dict dict = Dict.create();
dict.set("topic", params.getTopic());              //订阅主题
dict.set("payload", params.getMessageContent());   //内容
dict.set("qos", 0);                                //质量
dict.set("retain",false);                          //是否保存
String requestBodyStr = JSON.toJSONString(dict);


RequestBody requestBody = RequestBody.create(HTTP_MEDIA_TYPE_JSON_UTF8, requestBodyStr);
Request request = new Request.Builder()
.url(url)
.post(requestBody)
.header("Content-Type", "application/json")
.header("Authorization", Credentials.basic(config().getAppId(), config().getAppSecret()))
.build();

try (Response response = getHttpClientInstance().newCall(request).execute()) {
    log.debug("Call http success. url[{}] response[{}]", url, response);

    if (response.code() == 404) {
        return ImmutableMap.of("code", 404, "Message", "404 Not Found");
    } else if (!response.isSuccessful()) {
        return ImmutableMap.of("code", response.code(), "Message", "Server Error");
    }

    // 输出响应内容
    assert response.body() != null;
    String string = response.body().string();
    return JSON.parseObject(string);
} catch (IOException e) {
    log.warn("Call http failed, {}. url[{}] requestBodyStr[{}]", e.getMessage(), url, requestBodyStr);
}

return Collections.emptyMap();
}
Mqtt配置信息

image.png

public class MqttConfig {

    private String appId;

    private String appSecret;

    private String address;

    private String username;

    private String password;

    private ListenerInfo listenerInfos;

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public static class ListenerInfo {

        private String host;

        private int port;

        private boolean ssl;

        //订阅的topic
        private List<String> subscribeTopics;
    }

}
监听信息接口

image.png

public interface MqttListener {

    void setTopic(String topic);

    String getTopic();

    void onMessage(Message message);
}

component-mqtt-client-starter

MqttClientAutoConfiguration

image.png

META-INF

image.png

com.gitee.xmhzzz.component.mqtt.client.MqttClientAutoConfiguration

Mqtt服务实战demo

通过component-mqtt-client-starter快速构建mqtt-service服务
image.png

发生消息

public class MqttController {

    @Autowired
    private IMqttApi IMqttApi;

    @PostMapping("/pub/tcp")
    public void pubTcp(){
        MqttReq mqttReq = new MqttReq();
        mqttReq.setTopic("topicA/001/in");
        Map<String, Object> map = Maps.newHashMap();
        map.put("1","o");
        mqttReq.setData(map);
        IMqttApi.tcpPub(mqttReq);
    }
}

监听消息

@Slf4j
@Component
public class AMqttListener implements MqttListener {


    private String topic;

    public AMqttListener() {
        this.topic = "topicA/+/msg";
    }

    @Override
    public void setTopic(String topic) {

    }

    @Override
    public String getTopic() {
        return this.topic;
    }

    @Override
    public void onMessage(Message message) {
        log.info("a message[{}]", JSONObject.toJSONString(message));
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1525427.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IIS上部署.netcore WebApi项目及swagger

.netcore项目一般是直接双击exe文件&#xff0c;运行服务&#xff0c;今天有个需求&#xff0c;需要把.netcore项目运行在IIS上&#xff0c;遇到了一个小坑&#xff0c;在这里记录一下。 安装IIS&#xff0c;怎么部署站点&#xff0c;这些过于简单就不细说了&#xff0c;不知道…

java学习之路-方法讲解

目录 1.方法概念及使用 1.1什么是方法 1.2方法定义 1.3 方法调用的执行过程 1.4 实参和形参的关系(重要) 1.5 没有返回值的方法 2.方法重载 3.方法递归 3.1递归概念 3.2递归执行过程分析 3.3递归练习 代码示例1 代码示例2 1.方法概念及使用 1.1什么是方法 方法就是…

ipad电容笔有必要买吗?怎么选?四大缺陷弊端要严防!

电容笔有没有必要买还是得看我们个人的使用需求&#xff0c;如果平时做笔记、画画比较多的话&#xff0c;还是值得入手的&#xff0c;原装笔是好&#xff0c;但对于一些预算不多的朋友来说&#xff0c;价格还是过于高了&#xff0c;不是很划算。而且我们国内市场的平替电容笔也…

【Linux】基础 IO(文件系统 inode 软硬链接)-- 详解

一、理解文件系统 1、前言 我们一直都在说打开的文件&#xff0c;磁盘中包含了上百万个文件&#xff0c;肯定不可能都是以打开的方式存在。其实文件包含打开的文件和普通的未打开的文件&#xff0c;下面重点谈谈未打开的文件。 我们知道打开的文件是通过操作系统被进程打开&am…

在线BLOG网|基于springboot框架+ Mysql+Java+JSP技术的在线BLOG网设计与实现(可运行源码+数据库+设计文档)

推荐阅读100套最新项目 最新ssmjava项目文档视频演示可运行源码分享 最新jspjava项目文档视频演示可运行源码分享 最新Spring Boot项目文档视频演示可运行源码分享 目录 前台功能效果图 管理员功能登录前台功能效果图 系统功能设计 数据库E-R图设计 lunwen参考 摘要 研究…

【Oracle篇】一文搞清exp/imp逻辑迁移工具的用法(第一篇,总共四篇)

☘️博主介绍☘️&#xff1a; ✨又是一天没白过&#xff0c;我是奈斯&#xff0c;DBA一名✨ ✌✌️擅长Oracle、MySQL、SQLserver、Linux&#xff0c;也在积极的扩展IT方向的其他知识面✌✌️ ❣️❣️❣️大佬们都喜欢静静的看文章&#xff0c;并且也会默默的点赞收藏加关注❣…

{“message“:“Expecting value (near 1:1)“,“status“:400}

按照网页请求数据的方式无法获取数据 {“message”:“Expecting value (near 1:1)”,“status”:400} 将content-type改为以下请求数据方式 content-type: application-json,参考:https://stackoverflow.com/questions/72333040/why-400-response-status-code-when-send-post…

组建对等网

一、概念 对等网络&#xff08;Peer-to-Peer, P2P&#xff09;是一种分布式网络架构&#xff0c;其中每个参与节点&#xff08;称为"对等体"或"节点"&#xff09;既可以作为客户端也可以作为服务器&#xff0c;直接与网络中的其他节点分享资源&#xff08…

【Ubuntu20.04】Clion 配置 Libtorch + OpenCV

首先根据自己的CUDA版本安装正确对应的cuda和cudnn并进行配置。 这里安装的是cuda-11.3版本&#xff0c;以下基于这个版本进行安装。 1. 安装 Clion 因为Clion更容易直接编写CMakelists.txt&#xff0c;所以使用Clion作为IDE。 需要在File -> Setting -> CMake的CMake…

汽车电子零部件(6):DMS/OMS、CMS

前言: 有一个部件过去不曾有,而如今有可能要标准化标配化,那就是Driver Monitoring System (DMS)驾驶员监控系统、Occupant Monitoring System (OMS)乘客监控系统和Camera Monitor System(CMS)摄像头监控系统。 汽车视觉技术的创新推动先进驾驶辅助系统的变革(ADAS),并…

ssh 下连接Mysql 查看数据库数据表的内容的方法及步骤

要通过SSH连接到MySQL数据库&#xff0c;可以按照以下步骤进行操作&#xff1a; 在本地计算机上打开终端或命令提示符。 使用SSH命令连接到远程服务器。命令的格式如下&#xff1a; ssh usernameserver_ip其中&#xff0c;username是指在远程服务器上的用户名&#xff0c;serv…

Testng框架集成新业务

1. 向公司开发人员要setting.xml 修改 <localRepository>/Users/qa/.m2/repository</localRepository> 为自己的仓库地址 2. 如果有开发人员给的下载好的Maven仓库 可以直接解压缩用

FPGA 学习需要哪些东西?

FPGA 学习需要哪些东西&#xff1f; 三样东西&#xff1a;第一就是完整的理论&#xff0c;第二一套开发板&#xff0c;第三可练手的项目 第一&#xff0c;一套完整的课程&#xff0c; 这个课程必须是紧跟技术发展的&#xff0c;适应市场的&#xff0c;这样不至于学完后发现太…

Python从0到100(六):Python分支和循环结构的应用

分支和循环结构的重要性不言而喻&#xff0c;它是构造程序逻辑的基础。 一、程序的结构控制 单分支结构&#xff1a; 单分支结构是分支结构中最简单的一种方式&#xff0c;单分支结构只需要判断一个条件&#xff0c;根据这个条件是否成立来决定是否执行一段语句。 二分支结…

Python递归函数画五角星

递归函数是在函数内部调用自己的函数。递归函数通常有两个部分&#xff1a;基本情况和递归情况。 基本情况是递归函数停止递归的条件&#xff0c;当满足基本情况时&#xff0c;递归函数将不再递归调用自己&#xff0c;而是返回结果。 递归情况是递归函数继续调用自己的条件&a…

RuiYi-Vue开源项目1-下载并实现运行RuiYi-Vue项目

下载并实现运行RuoYi项目 RuiYi-Vue介绍环境需要下载项目项目配置后端项目配置前端项目配置 启动后前端登录页面截图 RuiYi-Vue介绍 RuoYi-Vue 是一个 Java EE 企业级快速开发平台&#xff0c;基于经典技术组合&#xff08;Spring Boot、Spring Security、MyBatis、Jwt、Vue&a…

力扣-1351 统计有序矩阵中的负数

给你一个 m * n 的矩阵 grid&#xff0c;矩阵中的元素无论是按行还是按列&#xff0c;都以非严格递减顺序排列。 请你统计并返回 grid 中 负数 的数目。 示例 1&#xff1a; 输入&#xff1a;grid [[4,3,2,-1],[3,2,1,-1],[1,1,-1,-2],[-1,-1,-2,-3]] 输出&#xff1a;8 解释&…

迁移学习怎么用

如果想实现一个计算机视觉应用&#xff0c;而不想从零开始训练权重&#xff0c;比方从随机初始化开始训练&#xff0c;更快的方式是下载已经训练好权重的网络结构&#xff0c;把这个作为预训练&#xff0c;迁移到你感兴趣的新任务上。ImageNet、PASCAL等等数据库已经公开在线。…

力扣L16--- 189.轮转数组-2024年3月18日

1.题目描述 2.知识点 注1&#xff1a; System.arraycopy() 方法是Java中用于数组复制的一个静态方法。它允许将一个数组的部分或全部内容复制到另一个数组中的指定位置。其语法如下&#xff1a; public static void arraycopy(Object src, int srcPos, Object dest, int dest…

Linux磁盘配额

磁盘配额 概述 Linux系统作为一个多用户的操作系统&#xff0c;在生产环境中&#xff0c;会发生多个用户共同使用一个磁盘的情况&#xff0c;会造成Linux根分区的磁盘空间耗尽&#xff0c;导致Linux系统无法建立新的文件&#xff0c;从而出现服务程序崩溃、系统无法启动等故障…