一,桥接资源配置及规则配置
Emqx桥接配置流程
1,配置资源并测试连接通过
规则引擎——>资源——>新建——>选择MQTT Bridge——>填写参数测试连接
参数描述详见3.1资源配置
2,配置规则
2.1根据实际业务选择合适sql
规则引擎——>规则——>新建——>规则sql
(sql见06手册/实施部署手册/empx 桥接规则配置模版.xlsx)
2.2填写规则id
“rule:当前节点_upload_目标节点”;例如“rule:yantai_upload_shandong”
2.3添加响应动作
动作:选择“桥接数据到 MQTT Broker”
关联资源:选择配置好的目标资源节点(没有目标资源点击新建资源去新建)
转发消息主题:空着即可
(转发消息时使用的主题。如果未提供,则默认为桥接消息的主题)
消息内容模板: 填写”${payload}”
(支持变量。若使用空模板(默认),消息内容为 JSON 格式的所有字段)
3,参数配置
3.1资源配置
1,资源类型:下拉选择MQTT Bridge
2,资源ID:
一对一:“resource:”+当前节点_to_目标节点;例如“resource:yantai_to_shandong”
一对多:“resource:”+当前节点_to_目标节点;例如“resource:guojia_to_provinces”
3,连接池大小:设为默认值8
4,客户端id:当前节点+client;例如“yantai_client”
5,附加GUID:设为默认值true(附加 GUID 选项,设置为 true 时,MQTT 连接使用的 clientid 增加随机后缀以保证全局唯一性。 设置为 false 时,会导致 clientid 使用同一个,连接池中线程互踢,EMQX 多个节点之间的桥接也会互踢,推荐仅在单节点 EMQX 且连接池大小为 1 时开启此选项。)
6,用户名:连接远程Broker的用户名
7,密码: 连接远程Broker的密码
8,桥接主题的挂载点:示例: 本地节点向 topic1
发消息,远程桥接节点的主题会变换为 bridge/aws/${node}/topic1
,程序中应设置为空
9,磁盘缓存:设为默认值off
10,协议版本:设为默认值mqttV4
11,心跳间隔:设为默认值60s
12,重连间隔:设为默认值30s
13,重传间隔:默认值20s
14,桥接模式:false
15,开启SSL连接:false
16,服务器名称知识:指定用于对端证书验证时使用的主机名,
或者设置为 disable 以关闭此项验证。(默认不填即可)
注意:配置完毕点击测试连接,显示连接成功即可应用
二,过多的消息发布
ERROR,MQTT(32202): 正在发布过多的消息
解决方案
1,增大maxInflight(最低需要paho1.2.0版本)
2,配置多个mqtt client
由于mqttMessageHandler只会引用一个paho客户端,并且在内部对paho客户端做了封装,所以直接修改MqttPahoMessageHandler复杂度较高,我们可以重新写一个MultiMqttMessageHandler,内部初始化多个MqttPahoMessageHandler,这样通过MessageingGateway发送消息时,直接通过MultiMqttMessageHandler来处理mqtt消息,MultiMqttMessageHandler可以通过负载均衡的方式来把消息分派给各个MqttPahoMessageHandler
1,自定义MyMqttPahoMessageHandler类,继承MqttPahoMessageHandler,注意权限由protected改成public。handleMessageInternal()会由channel通过dispatcher间接调用;重写onInit()用来手动初始化MqttPahoMessageHandler。
@Override
public void doStop() {
super.doStop();
}
@Override
public void handleMessageInternal(Message<?> message) throws Exception {
super.handleMessageInternal(message);
}
@Override
public void onInit() {
try {
super.onInit();
} catch (Exception e) {
e.printStackTrace();
}
}
2,自定义MultiMqttMessageHandler类,继承AbstractMessageHandler,并implements Lifecycle,自定义一个MessageHandler,添加一个Map成员属性,用来维系多个MyMqttPahoMessageHandler;handlerCount变量可配置多个mqtt client。这里只用了radom随机数来做负载均衡
private final AtomicBoolean running = new AtomicBoolean();
private volatile Map<Integer, MessageHandler> mqttHandlerMap;
@Value("${spring.mqtt.sender.count}")
private Integer handlerCount;
@Autowired
private MqttSenderConfig senderConfig;
@Override
public void start() {
if (!this.running.getAndSet(true)) {
doStart();
}
}
private void doStart(){
mqttHandlerMap = new ConcurrentHashMap<>();
for(int i=0;i<handlerCount;i++){
mqttHandlerMap.put(i, senderConfig.createMqttOutbound());
}
}
@Override
public void stop() {
if (this.running.getAndSet(false)) {
doStop();
}
}
private void doStop(){
for(Map.Entry<Integer, MessageHandler> e : mqttHandlerMap.entrySet()){
MessageHandler handler = e.getValue();
((MyMqttPahoMessageHandler)handler).doStop();
}
}
@Override
public boolean isRunning() {
return this.running.get();
}
@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
Random random = new Random();
MyMqttPahoMessageHandler messageHandler = (MyMqttPahoMessageHandler)mqttHandlerMap.get(random.nextInt(handlerCount));
messageHandler.handleMessageInternal(message);
}
3,消息发布配置类
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs(hostUrl);
factory.setUserName(username);
factory.setPassword(password);
return factory;
}
public MessageHandler createMqttOutbound(){
String tempId = MqttAsyncClient.generateClientId();
MyMqttPahoMessageHandler messageHandler = new MyMqttPahoMessageHandler(clientId + "sender" + tempId, mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
messageHandler.setDefaultQos(1);
messageHandler.onInit();
return messageHandler;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
return new MultiMqttMessageHandler();
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}
三,报文内容过大
调整emqx参数值
zone.external.max_packet_size
mqtt.max_packet_size
四,队列已满
调整emqx参数值
zone.external.max_mqueue_len
消息队列最大长度。当飞行窗口满,或客户端离线后,消息会被存储至该队列中。0 表示不限制。
五,发送大消息的时候,客户端会被强制kill掉
emqx升级到4.4.10版本之后
六,其他重点相关优化参数
参数介绍api链接
https://www.emqx.io/docs/zh/v4.3/configuration/configuration.html#cluster
//集群节点发现方式。可选值为:
manual: 手动加入集群
static: 配置静态节点。配置几个固定的节点,新节点通过连接固定节点中的某一个来加入集群。
mcast: 使用 UDP 多播的方式发现节点。
dns: 使用 DNS A 记录的方式发现节点。
etcd: 使用 etcd 发现节点。
k8s: 使用 Kubernetes 发现节点。
cluster.discovery
//指定多久之后从集群中删除离线节点。
cluster.autoclean
//当使用 static 方式集群时,指定固定的节点列表,多个节点间使用逗号分隔
cluster.static.seeds
//节点名。格式为 <name>@<host>。其中 <host> 可以是 IP 地址,也可以是 FQDN:注意格式限制
node.name
//系统调优参数,设置 Erlang 允许的最大进程数,这将影响 emqx 节点能处理的连接数
//integer 1024 - 134217727 默认:2097152
node.process_limit
//系统调优参数,设置 Erlang 允许的最大 Ports 数量
//integer 1024 - 134217727 1048576
node.max_ports
//系统调优参数,设置 Erlang 分布式通信使用的最大缓存大小
//bytesize 1KB - 2GB 8MB
node.dist_buffer_size
//系统调优参数,设置 Erlang 运行时允许的最大 ETS 表数量 integer 默认262144
node.max_ets_tables
//系统调优参数,设置 Erlang 运行多久强制进行一次全局垃圾回收。默认15m
node.global_gc_interval
//系统调优参数,设置 Erlang 运行时多少次 generational GC 之后才进行一次 fullsweep GC。
//integer 0 - 65535 默认:1000
node.fullsweep_after
//系统调优参数,当一个节点持续无响应多久之后,认为其已经宕机并断开连接 默认120
node.dist_net_ticktime
//MQTT 服务器会为每个主题存储最新一条保留消息,以方便消息发布后才上线的客户端在订阅主题时仍可以接收到该消息。
mqtt.retain_available
//是否忽略自己发送的消息:默认false
mqtt.ignore_loop_deliver
//当收到一定数量的消息,或字节,就强制执行一次垃圾回收。
//16000|16MB 表示当收到 16000 条消息,或 16MB 的字节流入就强制执行一次垃圾回收
zone.external.force_gc_policy
//允许客户端订阅主题的最大层级。0 表示不限制,层级多会有性能问题
zone.external.max_topic_levels
//飞行窗口大小。飞行窗口用于存储未被应答的 QoS 1 和 QoS 2 消息
zone.external.max_inflight
//消息重发间隔。EMQX 在每个间隔检查是否需要进行消息重发
zone.external.retry_interval
//消息队列是否存储 QoS 0 消息。
zone.external.mqueue_store_qos0
//ACL机制
MQTT 授权(authorization)是指对 MQTT 客户端的发布和订阅操作进行 权限控制。 控制的内容主要是哪些客户端可以发布或者订阅哪些 MQTT 主题。
EMQX 支持集中类型的授权。
权限列表(亦即 ACL)。可以从例如 MongoDB, MySQL,PostgreSQL,Redis,或者 EMQX 的内置数据库中读取这个列表。
加载一个包含全局的 ACL 的文件。
动态访问一个 HTTP 后端服务,并通过该 HTTP 调用的返回值来客户端是否有访问的权限。
通过提取认证过程中携带的授权数据,例如 JWT 的某个字段。
EMQX 最大文件句柄数
done:ulimit -n 1048576
done: /etc/security/limits.conf
done: /etc/sysctl.conf
done: /etc/systemd/system.conf
done: 重启 emqx 服务:ulimit -n 1048576; ./emqx stop; ./emqx start
done: 确认 EMQX Web 后台显示
tcp并发数
/etc/systemd/system.conf
查看默认值
$ systemctl --user show syncthing | grep LimitNOFILE
LimitNOFILE=4096
LimitNOFILESoft=1024
设置
DefaultLimitNOFILE=1048576
7,jmeter压测emqx
1. 下载jmeter,解压
https://jmeter.apache.org/download_jmeter.cgi
以 5.4.3 为例,下载地址: https://dlcdn.apache.org//jmeter/binaries/apache-jmeter-5.4.3.zip
linux下解压: unzip apache-jmeter-5.4.3.zip
2. 下载mqtt-jmeter插件
下载地址:
https://github.com/emqx/mqtt-jmeter/releases
https://github.com/emqx/mqtt-jmeter/releases/download/v2.0.2/mqtt-xmeter-2.0.2-jar-with-dependencies.jar
3. 将插件放置于jmeter的lib/ext目录下,windows/linux同样操作
4. 本文先在windows下生成的jmx脚本,然后传至linux下使用
4.1 新建两个线程组
第一个仅包含一个 MQTT DisConnect,执行一次
第二个里面包含具体的压测,开启1000个线程,1s内将线程创建完毕,无限循环。创建两个计数器,pub_counter用来技术发布消息数,thread_counter用来线程计数
4.2 事先创建1000个设备,名称为cosmoiottest000001 - cosmoiottest000001000(可自己定义)。添加一次性控制器(mqtt连接一次,后续pub消息),写上配置信息。
4.3 添加循环控制器,循环一次。包含固定定时器,休眠1000ms,一个发布MQTT Pub Sampler,即每个线程进来执行一次发布消息然后休眠1000ms进入下一次循环。每个消息包含100个点位(根据自己需要设置),每个点位随机生成一个整数。配置详见截图
4.4 添加观察结果树、汇总报告、聚合报告等,可在windows下面查看结果
4.5 配置截图如下:
循环执行线程
![在这里插入图片描述](https://img-blog.csdnimg.cn/36d1092d3def4177a5541ad75683576c.png
pub_counter计数器
thread_counter计数器
mqtt connect设置:
MQTT发布消息:
5. linux压测命令:(需要先将bin/jmeter添加可执行权限)
chmod +x bin/jmeter
./bin/jmeter -n -t mqtt_test.jmx -l result.jtl
6. 将结果jtl生成可视化报告,放置于result目录
mkdir result
./bin/jmeter -g result.jtl -o result
将结果目录拉下来,点开即可查看图形化结果
注,可能遇到问题:
1. 执行jmeter压测后,进程不退出,编辑 jmeter.properties,打开配置
jmeterengine.force.system.exit=true
2. jmx文件传到linux后可能出错,建议英文环境下生成jmx文件,语言控制jmeter.properties
#language=en (默认英文,切换为中文为:zh_CN)
3. mqtt-jmeter 的jar包需要传至lib/ext目录,否则不可用
4. 生成报告时报错:Consumer failed with message :Begin size 0 is not equal to fixed size 5
将jdk换成8版本
5. jtl结果文件,也可拉到windows,使用jmeter直接查看,新建线程组->聚合报告,选择jtl文件