emqx桥接配置+常见问题解决+jmeter压测emqx

news2025/1/20 5:50:32

一,桥接资源配置及规则配置

Emqx桥接配置流程

1,配置资源并测试连接通过

规则引擎——>资源——>新建——>选择MQTT Bridge——>填写参数测试连接
参数描述详见3.1资源配置

2,配置规则

2.1根据实际业务选择合适sql
规则引擎——>规则——>新建——>规则sql
(sql见06手册/实施部署手册/empx 桥接规则配置模版.xlsx)

2.2填写规则id
“rule:当前节点_upload_目标节点”;例如“rule:yantai_upload_shandong”

2.3添加响应动作
动作:选择“桥接数据到 MQTT Broker”
关联资源:选择配置好的目标资源节点(没有目标资源点击新建资源去新建)
转发消息主题:空着即可
(转发消息时使用的主题。如果未提供,则默认为桥接消息的主题)
消息内容模板: 填写”${payload}”
(支持变量。若使用空模板(默认),消息内容为 JSON 格式的所有字段)

3,参数配置

3.1资源配置

1,资源类型:下拉选择MQTT Bridge
2,资源ID:
一对一:“resource:”+当前节点_to_目标节点;例如“resource:yantai_to_shandong”
一对多:“resource:”+当前节点_to_目标节点;例如“resource:guojia_to_provinces”
3,连接池大小:设为默认值8
4,客户端id:当前节点+client;例如“yantai_client”
5,附加GUID:设为默认值true(附加 GUID 选项,设置为 true 时,MQTT 连接使用的 clientid 增加随机后缀以保证全局唯一性。 设置为 false 时,会导致 clientid 使用同一个,连接池中线程互踢,EMQX 多个节点之间的桥接也会互踢,推荐仅在单节点 EMQX 且连接池大小为 1 时开启此选项。)
6,用户名:连接远程Broker的用户名
7,密码: 连接远程Broker的密码
8,桥接主题的挂载点:示例: 本地节点向 topic1 发消息,远程桥接节点的主题会变换为 bridge/aws/${node}/topic1,程序中应设置为空
9,磁盘缓存:设为默认值off
10,协议版本:设为默认值mqttV4
11,心跳间隔:设为默认值60s
12,重连间隔:设为默认值30s
13,重传间隔:默认值20s
14,桥接模式:false
15,开启SSL连接:false
16,服务器名称知识:指定用于对端证书验证时使用的主机名,
或者设置为 disable 以关闭此项验证。(默认不填即可)

注意:配置完毕点击测试连接,显示连接成功即可应用

二,过多的消息发布

ERROR,MQTT(32202): 正在发布过多的消息
解决方案
1,增大maxInflight(最低需要paho1.2.0版本)
2,配置多个mqtt client

由于mqttMessageHandler只会引用一个paho客户端,并且在内部对paho客户端做了封装,所以直接修改MqttPahoMessageHandler复杂度较高,我们可以重新写一个MultiMqttMessageHandler,内部初始化多个MqttPahoMessageHandler,这样通过MessageingGateway发送消息时,直接通过MultiMqttMessageHandler来处理mqtt消息,MultiMqttMessageHandler可以通过负载均衡的方式来把消息分派给各个MqttPahoMessageHandler

1,自定义MyMqttPahoMessageHandler类,继承MqttPahoMessageHandler,注意权限由protected改成public。handleMessageInternal()会由channel通过dispatcher间接调用;重写onInit()用来手动初始化MqttPahoMessageHandler。

@Override
	public void doStop() {
		super.doStop();
	}
	@Override
	public void handleMessageInternal(Message<?> message) throws Exception {
		super.handleMessageInternal(message);
	}
	@Override
	public void onInit() {
		try {
			super.onInit();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

2,自定义MultiMqttMessageHandler类,继承AbstractMessageHandler,并implements Lifecycle,自定义一个MessageHandler,添加一个Map成员属性,用来维系多个MyMqttPahoMessageHandler;handlerCount变量可配置多个mqtt client。这里只用了radom随机数来做负载均衡

private final AtomicBoolean running = new AtomicBoolean();
private volatile Map<Integer, MessageHandler> mqttHandlerMap;

@Value("${spring.mqtt.sender.count}")
private Integer handlerCount;

@Autowired
private MqttSenderConfig senderConfig;

@Override
public void start() {
	if (!this.running.getAndSet(true)) {
		doStart();
	}
}

private void doStart(){
	mqttHandlerMap = new ConcurrentHashMap<>();
	for(int i=0;i<handlerCount;i++){
		mqttHandlerMap.put(i, senderConfig.createMqttOutbound());
	}
}

@Override
public void stop() {
	if (this.running.getAndSet(false)) {
		doStop();
	}
}

private void doStop(){
	for(Map.Entry<Integer, MessageHandler> e : mqttHandlerMap.entrySet()){
		MessageHandler handler = e.getValue();
		((MyMqttPahoMessageHandler)handler).doStop();
	}
}

@Override
public boolean isRunning() {
	return this.running.get();
}

@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
	Random random = new Random();
	MyMqttPahoMessageHandler messageHandler = (MyMqttPahoMessageHandler)mqttHandlerMap.get(random.nextInt(handlerCount));
	messageHandler.handleMessageInternal(message);
}

3,消息发布配置类

@Bean
public MqttPahoClientFactory mqttClientFactory() {
	DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
	factory.setServerURIs(hostUrl);
	factory.setUserName(username);
	factory.setPassword(password);
	return factory;
}
public MessageHandler createMqttOutbound(){
	String tempId = MqttAsyncClient.generateClientId();
	MyMqttPahoMessageHandler messageHandler = new MyMqttPahoMessageHandler(clientId + "sender" + tempId, mqttClientFactory());
	messageHandler.setAsync(true);
	messageHandler.setDefaultTopic(defaultTopic);
	messageHandler.setDefaultQos(1);
	messageHandler.onInit();
	return messageHandler;
}

@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
	return new MultiMqttMessageHandler();
}

@Bean
public MessageChannel mqttOutboundChannel() {
	return new DirectChannel();
}

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
	void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}

三,报文内容过大

调整emqx参数值
zone.external.max_packet_size
mqtt.max_packet_size

四,队列已满

调整emqx参数值
zone.external.max_mqueue_len

消息队列最大长度。当飞行窗口满,或客户端离线后,消息会被存储至该队列中。0 表示不限制。

五,发送大消息的时候,客户端会被强制kill掉

emqx升级到4.4.10版本之后

六,其他重点相关优化参数

参数介绍api链接
https://www.emqx.io/docs/zh/v4.3/configuration/configuration.html#cluster

//集群节点发现方式。可选值为:
    manual: 手动加入集群
    static: 配置静态节点。配置几个固定的节点,新节点通过连接固定节点中的某一个来加入集群。
    mcast: 使用 UDP 多播的方式发现节点。
    dns: 使用 DNS A 记录的方式发现节点。
    etcd: 使用 etcd 发现节点。
    k8s: 使用 Kubernetes 发现节点。
cluster.discovery

//指定多久之后从集群中删除离线节点。
cluster.autoclean

//当使用 static 方式集群时,指定固定的节点列表,多个节点间使用逗号分隔
cluster.static.seeds

//节点名。格式为 <name>@<host>。其中 <host> 可以是 IP 地址,也可以是 FQDN:注意格式限制
node.name

//系统调优参数,设置 Erlang 允许的最大进程数,这将影响 emqx 节点能处理的连接数
//integer 	1024 - 134217727 	默认:2097152
node.process_limit

//系统调优参数,设置 Erlang 允许的最大 Ports 数量
//integer 	1024 - 134217727 	1048576
node.max_ports

//系统调优参数,设置 Erlang 分布式通信使用的最大缓存大小 
//bytesize 	1KB - 2GB 	8MB
node.dist_buffer_size

//系统调优参数,设置 Erlang 运行时允许的最大 ETS 表数量 integer 	默认262144
node.max_ets_tables

//系统调优参数,设置 Erlang 运行多久强制进行一次全局垃圾回收。默认15m
node.global_gc_interval

//系统调优参数,设置 Erlang 运行时多少次 generational GC 之后才进行一次 fullsweep GC。
//integer 	0 - 65535 	默认:1000
node.fullsweep_after

//系统调优参数,当一个节点持续无响应多久之后,认为其已经宕机并断开连接 默认120
 node.dist_net_ticktime

//MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。
mqtt.retain_available

//是否忽略自己发送的消息:默认false
mqtt.ignore_loop_deliver

//当收到一定数量的消息,或字节,就强制执行一次垃圾回收。
//16000|16MB 表示当收到 16000 条消息,或 16MB 的字节流入就强制执行一次垃圾回收
zone.external.force_gc_policy

//允许客户端订阅主题的最大层级。0 表示不限制,层级多会有性能问题
 zone.external.max_topic_levels

//飞行窗口大小。飞行窗口用于存储未被应答的 QoS 1 和 QoS 2 消息
zone.external.max_inflight

//消息重发间隔。EMQX 在每个间隔检查是否需要进行消息重发
zone.external.retry_interval

//消息队列是否存储 QoS 0 消息。
zone.external.mqueue_store_qos0

//ACL机制
MQTT 授权(authorization)是指对 MQTT 客户端的发布和订阅操作进行 权限控制。 控制的内容主要是哪些客户端可以发布或者订阅哪些 MQTT 主题。
EMQX 支持集中类型的授权。
    权限列表(亦即 ACL)。可以从例如 MongoDB, MySQL,PostgreSQL,Redis,或者 EMQX 的内置数据库中读取这个列表。
    加载一个包含全局的 ACL 的文件。
    动态访问一个 HTTP 后端服务,并通过该 HTTP 调用的返回值来客户端是否有访问的权限。
    通过提取认证过程中携带的授权数据,例如 JWT 的某个字段。
EMQX 最大文件句柄数
    done:ulimit -n 1048576
    done: /etc/security/limits.conf
    done: /etc/sysctl.conf
    done: /etc/systemd/system.conf
    done: 重启 emqx 服务:ulimit -n 1048576; ./emqx stop; ./emqx start
    done: 确认 EMQX Web 后台显示
tcp并发数
/etc/systemd/system.conf

查看默认值
	$ systemctl --user show syncthing | grep LimitNOFILE
	LimitNOFILE=4096
	LimitNOFILESoft=1024
	设置
	DefaultLimitNOFILE=1048576

7,jmeter压测emqx

1. 下载jmeter,解压

https://jmeter.apache.org/download_jmeter.cgi
以 5.4.3 为例,下载地址: https://dlcdn.apache.org//jmeter/binaries/apache-jmeter-5.4.3.zip
linux下解压: unzip apache-jmeter-5.4.3.zip

2. 下载mqtt-jmeter插件

下载地址:
https://github.com/emqx/mqtt-jmeter/releases
https://github.com/emqx/mqtt-jmeter/releases/download/v2.0.2/mqtt-xmeter-2.0.2-jar-with-dependencies.jar

3. 将插件放置于jmeter的lib/ext目录下,windows/linux同样操作

4. 本文先在windows下生成的jmx脚本,然后传至linux下使用

4.1 新建两个线程组
第一个仅包含一个 MQTT DisConnect,执行一次
第二个里面包含具体的压测,开启1000个线程,1s内将线程创建完毕,无限循环。创建两个计数器,pub_counter用来技术发布消息数,thread_counter用来线程计数

4.2 事先创建1000个设备,名称为cosmoiottest000001 - cosmoiottest000001000(可自己定义)。添加一次性控制器(mqtt连接一次,后续pub消息),写上配置信息。

4.3 添加循环控制器,循环一次。包含固定定时器,休眠1000ms,一个发布MQTT Pub Sampler,即每个线程进来执行一次发布消息然后休眠1000ms进入下一次循环。每个消息包含100个点位(根据自己需要设置),每个点位随机生成一个整数。配置详见截图

4.4 添加观察结果树、汇总报告、聚合报告等,可在windows下面查看结果

4.5 配置截图如下:

循环执行线程
![在这里插入图片描述](https://img-blog.csdnimg.cn/36d1092d3def4177a5541ad75683576c.png

pub_counter计数器
在这里插入图片描述

thread_counter计数器
在这里插入图片描述

mqtt connect设置:

在这里插入图片描述

MQTT发布消息:

在这里插入图片描述

5. linux压测命令:(需要先将bin/jmeter添加可执行权限)

chmod +x bin/jmeter
./bin/jmeter -n -t mqtt_test.jmx -l result.jtl

6. 将结果jtl生成可视化报告,放置于result目录

mkdir result
./bin/jmeter -g result.jtl -o result
将结果目录拉下来,点开即可查看图形化结果

注,可能遇到问题:

1. 执行jmeter压测后,进程不退出,编辑 jmeter.properties,打开配置
jmeterengine.force.system.exit=true

2. jmx文件传到linux后可能出错,建议英文环境下生成jmx文件,语言控制jmeter.properties
#language=en (默认英文,切换为中文为:zh_CN)

3. mqtt-jmeter 的jar包需要传至lib/ext目录,否则不可用

4. 生成报告时报错:Consumer failed with message :Begin size 0 is not equal to fixed size 5
将jdk换成8版本

5. jtl结果文件,也可拉到windows,使用jmeter直接查看,新建线程组->聚合报告,选择jtl文件

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/371716.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

洛谷P5015 [NOIP2018 普及组] 标题统计 C语言/C++

[NOIP2018 普及组] 标题统计 题目描述 凯凯刚写了一篇美妙的作文&#xff0c;请问这篇作文的标题中有多少个字符&#xff1f; 注意&#xff1a;标题中可能包含大、小写英文字母、数字字符、空格和换行符。统计标题字 符数时&#xff0c;空格和换行符不计算在内。 输入格式 …

kubernates(k8s)全解

目录说明Kubernetes介绍应用部署方式演变kubernetes简介kubernetes组件kubernetes概念kubernetes集群环境搭建安装方式kubeadm二进制包集群类型安装要求最终目标准备环境环境初始化服务部署kubeadm中的命令(一般用不着)资源管理YAML语言介绍资源管理方式命令式对象管理kubectl命…

微服务架构设计介绍

软件架构是一个包含各种组织的系统组织&#xff0c;这些组件包括 Web服务器, 应用服务器, 数据库,存储, 通讯层), 它们彼此或和环境存在关系。系统架构的目标是解决利益相关者的关注点。 image Conway’s law: Organizations which design systems[...] are constrained to pro…

Ubuntu 20.04安装SQL Server

SQL Server 目前在 Red Hat Enterprise Server、SUSE Linux Enterprise Server 和 Ubuntu 上受支持。 还支持使用 Docker 在容器中运行。 Linux上安装SQL&#xff0c;不支持的功能或者服务 以前都是在Windows上安装SQL&#xff0c;其实SQL Server 的核心数据库引擎在 Linux 上…

【图像分类】卷积神经网络之AlexNet网络模型实现花卉图像识别(附代码和数据集,PyTorch框架)

写在前面: 首先感谢兄弟们的关注和订阅,让我有创作的动力,在创作过程我会尽最大能力,保证作品的质量,如果有问题,可以私信我,让我们携手 共进,共创辉煌。 在上一篇博文中我们对AlexNet网络模型的结构进行了剖析,本篇博文,我们将使用PyTorch搭建AlexNet实现花卉识别,…

JavaScript 俄罗斯方块 - Canvas基础 画线

JavaScript 是前端核心, 掌握这门语言是步入前端高手行列必经之路, 现代前端当然别忘了还有TypeScript, 学习它需要面向对象(OOP)基础知识, 底层的浏览器原理、HTTP协议也必不可少, 本专栏从基础开始一步步带你实现俄罗斯方块小游戏, 让你从有趣的实战中学习JavaScript,事…

ubuntu安装gitlab

gitlab是什么?他有什么作用呢&#xff1f; gitlab是一个开源的git仓库管理软件&#xff0c;并提供web界面&#xff0c;方便管理git仓库。和github很相似&#xff0c;不过github暂时没有开源版本&#xff0c;项目必须托管到github官方网站&#xff0c;不能本地部署。很多公司考…

数据增强,扩充了数据集,增加了模型的泛化能力

数据增强&#xff08;Data Augmentation&#xff09;是在不实质性的增加数据的情况下&#xff0c;从原始数据加工出更多的表示&#xff0c;提高原数据的数量及质量&#xff0c;以接近于更多数据量产生的价值。 其原理是&#xff0c;通过对原始数据融入先验知识&#xff0c;加工…

【NFC音乐相册】简易制作

欢迎来到 Claffic 的博客 &#x1f49e;&#x1f49e;&#x1f49e; 前言&#xff1a; NFC音乐相册在前段时间火了一把&#xff0c;想必大家都听过了&#xff0c;最近我刷到了这个东西&#xff0c;闲来无事就弄了几个&#xff0c;这篇博客就记录下制作工序。 注&#xff1a;我所…

keepalive + nginx 来实现 对于nginx的高可用, 以及如何搭建主备模式

keepalive nginx 来实现 对于nginx的高可用, 以及如何搭建主备模式。 keeplived简介 Keepalived是用纯ANSI/ISO C编写的。该软件围绕一个中央I/O多路复用器进行连接&#xff0c;以提供实时网络设计。 1.1 Keepalived进程被分为3个不同进程 A.一个极简的父进程&#xff0c…

NLP大纲

第一章&#xff1a;概述 1. 什么是自然语言处理&#xff1f; 计算机具备人类的听、说、读、写、译、问、答、搜索、摘要、对话和聊天等能力 知识和常识进行推理和决策 支持客服、诊断、法律、教学等场景 2. 自然语言处理的主要任务有哪些&#xff1f; 分析、理解、转换、…

SpringBoot-运维实用篇

SpringBoot运维实用篇 1.SpringBoot程序的打包与运行 ​ 刚开始做开发学习的小伙伴可能在有一个知识上面有错误的认知&#xff0c;我们天天写程序是在Idea下写的&#xff0c;运行也是在Idea下运行的。 ​ 但是实际开发完成后&#xff0c;我们的项目是不可能运行在自己的电脑上…

图解LeetCode——剑指 Offer 46. 把数字翻译成字符串

一、题目 给定一个数字&#xff0c;我们按照如下规则把它翻译为字符串&#xff1a;0 翻译成 “a” &#xff0c;1 翻译成 “b”&#xff0c;……&#xff0c;11 翻译成 “l”&#xff0c;……&#xff0c;25 翻译成 “z”。一个数字可能有多个翻译。请编程实现一个函数&#x…

spring boot3 一、 spring alibaba cloud 整合 网关 nacos config

jm-apis服务jm-user-api用户服务jm-apis-common公共模块jm-apis-common-bean公共beanjm-apis-common-conf公共配置jm-apis-common-tool公共工具jm-dubbo-apisRPC服务jm-dubbo-user-api用户RPC服务 jianmu-springboot3-springalibabacloud pom.xml <?xml version"1.0&…

电容学习(1)

电解电容 是具有极性的电容&#xff1b; 电解电容的体积大&#xff0c;只有在需要较大容值的时候才需要&#xff1b; 缺点&#xff1a;电解电容容值不稳定&#xff0c;容易随着温度和其他参数变化而变化&#xff1b;因此相对来说非电解电容更稳点一些&#xff1b; 电容的公式&…

如何定制一个智能洒水装置(养狗/养花人士请进)

目录 如何用智能地教狗狗上厕所如何定制一个智能洒水装置 背景 上一篇文章中提到了&#xff0c;我实现了一个自动检测狗狗有没有进厕所的功能。现在我们家的狗狗用它那不算大的小脑瓜&#xff0c;已经百分百学会&#xff08;但是&#xff01;也不知道它是不是聪明过头了&…

非常好看的html网页个人简历

一. 前言 文末获取gitee链接 在前几天逛b站的时候&#xff0c;发现了个比较实用的东西-----个人简介网页版&#xff0c;相当于网页版的个人简历&#xff0c;相较于PDF形式的&#xff0c;网页版所能呈现内容更加丰富&#xff0c;而且更加美观&#xff0c;在BOOS上被HR小姐姐要…

FL Studio21MAC电脑中文升级版安装图文教程

FL Studio版本有很多,每个版本各有优点。除了最新版本外,还有历史经典版本,用户可以根据自己的需求进行下载,FL Studio21是一款功能十分丰富和强大的音乐编辑软件&#xff0c;能够帮助用户进行编曲、剪辑、录音、混音等操作&#xff0c;让用户能够全面地调整音频&#xff0c;软…

2288hv5超融合服务器 数码管报888

【问题现象】 2288hv5超融合服务器&#xff0c;前面板数码管报888&#xff0c;电源灯黄灯闪烁&#xff0c;开不了机&#xff0c;ibmc网络是通的&#xff0c;但是web网页打不开 【问题原因】 iBMC的版本过低&#xff0c;iBMC在智能诊断数据库保护机制存在异常&#xff0c;导…

【算法笔记】前缀和与差分

第一课前缀和与差分 算法是解决问题的方法与步骤。 在看一个算法是否优秀时&#xff0c;我们一般都要考虑一个算法的时间复杂度和空间复杂度。 现在随着空间越来越大&#xff0c;时间复杂度成为了一个算法的重要指标&#xff0c;那么如何估计一个算法的时间复杂度呢&#xf…