rocketmq延时消息自定义配置;topic下tag使用

news2024/12/24 20:40:50

概述

使用的是开源版本的rocketmq4.9.4

rocketmq也是支持延时消息的。
rocketmq一般是4个部分:

  • nameserver:保存路由信息
  • broker:保存消息
  • 生产者:生产消息
  • 消费者:消费消息

延时消息的处理是在其中的broker中。
但是rocketmq不支持自定义延时消息,rabbitmq倒是可以,但也有延时时间上限.

rocketmq支持18个等级的延时时间

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

Broker在启动时,内部会创建一个内部主题:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列,也就是说18level对应了18个队列。注意,这并不是说这个内部主题只会有18个队列,因为Broker通常是集群模式部署的,因此每个节点都有18个队列。

延迟级别的值可以进行修改,以满足自己的业务需求,可以修改/添加新的level。例如:你想支持2天的延迟,修改最后一个level的值为2d,这个时候依然是18level;也可以增加一个2d,这个时候总共就有19level

在检查某一个延时队列中的消息过期时,只会检查第一个队列元素,第一个没过期后面的元素就不会再去检测.

延时消息的流转过程

这边捞一张网图
在这里插入图片描述

增加一个延时队列等级

按照原理,broker中根据18个延时等级创建了18个队列来监控,那么只需要再增加延时等级个数,那么broker自然就会再新增一个队列来监控。

比如在broker的配置文件中增加一个延时等级为19的延时15秒的配置

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
namesrvAddr = localhost:9876
brokerIP1 = 192.168.0.89
brokerIP2 = 192.168.0.89
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h 15s

测试结果成功
在这里插入图片描述

topic下tag使用

有时候同一个topic下还想继续分组,那么此时可以使用tag来进一步的区分。

坑点:同一个consumeGroup并且同一个topic的订阅者,如果2个实例订阅的不同的tag,那么可能会发生消息丢失。

因为往topic队列中存数据时是时按照全部队列去分配的,但是队列1队列5分属不同的tag,那么实例a只订阅了tag_a,因此被实例a订阅的队列中只有tag_a被消费了,tag_b就还在队列中没有被消费。造成消息丢失的假象。

一般情况下都是同一个消费者启动多个实例,所以tag_atag_b都是有订阅的。

捞一张网图
在这里插入图片描述

tag的使用demo
使用的时候要注意,springboot下默认的消费者监听了所有的tag,所以如果没有具体的tag消费者,那么就会被默认监听所有tag的当前topic所消费。同理,如果同时存在监听所有tag和具体tag的消费者,那么就会产生广播的效果。

举例说明
实例下名为Qtopic下有2个监听着,第一个监听tag=“*”,第二个监听tag=“666”。那么给Q topic发送tag666的消息时,这两者都会收到消息。

发送demo
就是在topic后面拼接“:”即可,发送的时候只能指定一个tag,但是监听可以监听多个tag
监听多个tag的则用“||”分隔

发送消息

public void sendMsgTag(@RequestBody Map<String,Object> map){
    String topic = "efg";

    String topicTag = topic.concat(":").concat(((String) map.get("tag")));

    //异步发送
    org.apache.rocketmq.spring.core.RocketMQTemplate.asyncSend(topicTag, map, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            log.info("发送:{},成功", topicTag);
        }

        @Override
        public void onException(Throwable e) {
            log.info("发送:{},失败", topicTag);
        }
    });

}

监听

mq:
  consumerGroup: mq
  tag: 666||777
package com.fchan.mq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.annotation.SelectorType;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
@RocketMQMessageListener(
        consumerGroup = "${mq.consumerGroup}",
        topic = "efg",
        //selectorType 默认就是tag
        selectorType = SelectorType.TAG,
        selectorExpression = "${mq.tag}"
)
@Slf4j
public class MqListenerTag implements RocketMQListener<String> {


    @Override
    public void onMessage(String s) {
        log.info("时间收到了mq消息:{}", LocalDateTime.now(), s);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/360199.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

我为什么放弃WinUI3

基于WinUI3开发HiNote已经有一个多月的时间了&#xff0c;算是做出来一个简单能用的C端软件。 基于个人的经历&#xff0c;说说其中的开发体验。 UI设计语言 无论是否抄袭苹果&#xff0c;WinUI3给人的感觉都是眼前一亮的。简洁美观&#xff0c;现代化&#xff0c;毛玻璃的美…

rk3568网口CAN串口通信速率性能

通信接口性能参数外设接口性能参数测试结果为实验室实测值&#xff0c;可作为设计参考&#xff0c;但因测试环境和器件批次差异&#xff0c;可能会存在一定的误差&#xff0c;且测试结果依赖评估板性能&#xff0c;核心板搭配不同底板性能也可能存在差异&#xff0c;请结合实际…

Redis之分布式锁

随着业务发展的需要&#xff0c;原单体单机部署的系统被演化成分布式集群系统后&#xff0c;由于分布式系统多线程、多进程并且分布在不同机器上&#xff0c;这将使原单机部署情况下的并发控制锁策略失效&#xff0c;单纯的 Java API并不能提供分布式锁的能力。为了解决这个问题…

Java:顶级Java应用程序服务器 — Tomcat、Jetty、GlassFish、WildFly

如果你想编写Java web应用程序&#xff0c;首先需要做出一个艰难的决定&#xff1a;选择运行应用程序的Java应用程序服务器。什么是应用服务器?一般来说&#xff0c;应用程序服务器执行Java应用程序。在操作系统中启动它们&#xff0c;然后将应用程序部署到其中。将应用程序服…

盘点2023年大企业都在用的优秀项目管理软件

行内有句话&#xff1a;每个成功的项目背后肯定有一个成功的项目经理&#xff0c;而每个项目经理背后都少不了一些专业的项目管理工具。要在任何项目中取得成功&#xff0c;对项目进行全面的管理非常关键&#xff0c;包括项目的执行、计划、推进、监控、结果等&#xff0c;有了…

谈谈我对ai发展的看法

最近难得有时间&#xff0c;通过白话&#xff0c;聊聊我对AI的看法&#xff0c;仅代表个人观点首先表明我的观点&#xff1a;人类当前的人工智能成果&#xff0c;仍然停留在一知半解程度。技术的发展是需要长期的积累和进步&#xff0c;目前AI的发展仍处于入门阶段人类的发展必…

ar远程协助可视化云平台提高患者的医疗体验

“专家的指导意见很科学&#xff0c;患者恢复很快&#xff0c;情况稳定就可以转入普通病房了!这种云急救对基层医院来说太及时太必要了!大专家不用奔波&#xff0c;我们又提高了救治能力和效率!”这是来自某市基层主任的心里话。 传统远程协助的局限性 传统远程带来了便捷的线上…

【Java基础】输入与输出

输入与输出 输入 获取用键盘输入常用的两种方法 方法 1&#xff1a;通过 Scanner Scanner input new Scanner(System.in); String s input.nextLine(); input.close();方法 2&#xff1a;通过 BufferedReader BufferedReader input new BufferedReader(new InputStrea…

实验室设计|实验室设计要点SICOLAB

一、实验室设计规划要素1、实验室布局&#xff1a;实验室的布局要符合实验室工作流程&#xff0c;可以将实验室划分为干净区和污染区&#xff0c;以确保实验室的卫生和实验的准确性。2、设备选购&#xff1a;根据实验需要选择适当的设备&#xff0c;并确保设备的质量和性能符合…

LA@ML特征分解@奇异值分解@伪逆

文章目录特征分解几何示意图二次型和生成子空间奇异值分解理论数学风格的描述奇异值分解和特征分解的联系&#x1f60a;&#x1f388;机器学习风格的描述对角矩阵的记法酉矩阵unitary matrix性质Moore-Penrose 伪逆矩阵的逆和线性方程组的解(review)伪逆应用迹运算方阵行列式和…

代谢组+转录组分析为腰果树果实发育成熟过程中代谢网络提供见解

文章标题&#xff1a;Metabolomic and transcriptomic analyses provide insights into metabolic networks during cashew fruit development and ripening 发表期刊&#xff1a;Food Chemistry 影响因子&#xff1a;9.231 作者单位&#xff1a;海南大学 百趣生物提供服务…

推荐系统[八]算法实践总结V0:腾讯音乐全民K歌推荐系统架构及粗排设计

1.前言:召回排序流程策略算法简介 推荐可分为以下四个流程,分别是召回、粗排、精排以及重排: 召回是源头,在某种意义上决定着整个推荐的天花板;粗排是初筛,一般不会上复杂模型;精排是整个推荐环节的重中之重,在特征和模型上都会做的比较复杂;重排,一般是做打散或满足…

Docker--------Day1

1.简介 您要如何确保应用能够在这些环境中运行和通过质量检测&#xff1f;并且在部署过程中不出现令人头疼的版本、配置问题&#xff0c;也无需重新编写代码和进行故障修复&#xff1f; Docker之所以发展如此迅速&#xff0c;也是因为它对此给出了一个标准化的解决方案-----…

【牛客刷题专栏】0x0B:JZ3 数组中重复的数字(C语言编程题)

前言 个人推荐在牛客网刷题(点击可以跳转)&#xff0c;它登陆后会保存刷题记录进度&#xff0c;重新登录时写过的题目代码不会丢失。个人刷题练习系列专栏&#xff1a;个人CSDN牛客刷题专栏。 题目来自&#xff1a;牛客/题库 / 在线编程 / 剑指offer&#xff1a; 目录前言问题…

软件包被拦截、删除、无法运行,,,卸载掉自带杀毒软件Windows Defender、关闭防火墙,,,网上各种办法都试过了,不起作用。。。最后一招解决

文章目录1 问题描述2 解决办法3 下载文件被阻止或运行被阻止4 防火墙关闭方法&#xff08;补充&#xff09;5 卸载windows自带的杀毒软件Windows Defender&#xff08;最简单方法&#xff09;1 问题描述 我的win10没有安装任何杀毒软件&#xff0c;只有系统自带的Windows Defe…

不同投票需要的不同上传方式outlook 投票功能怎么设置投票 html5

“艺空间手造坊”网络评选投_投票方式的选择_免费图文教学投票教学关于微信投票&#xff0c;我们现在用的最多的就是小程序投票&#xff0c;今天的网络投票&#xff0c;在这里会教大家如何用“活动星投票”小程序来进行投票。我们现在要以“艺空间手造坊”为主题进行一次投票活…

光谱仪工作过程及重要参数定义

标题光谱仪工作过程CCD、PDA薄型背照式BT-CCD狭缝Slit暗电流Dark Current分辨率Resolution色散Dispersion光栅和闪耀波长Grating波长精度、重复性和准确度Precision带宽Band widthF数F/#光谱仪工作过程 CCD、PDA 电荷耦合器件&#xff08;Charger Coupled Device&#xff0c;缩…

Java线程的6 种状态

Java 线程的状态 Java线程有六种状态&#xff1a; 初始&#xff08;NEW&#xff09;、运行&#xff08;RUNNABLE&#xff09;、阻塞&#xff08;BLOCKED&#xff09;、 等待&#xff08;WAITING&#xff09;、超时等待&#xff08;TIMED_WAITING&#xff09;、终止&#xff08…

Win11的两个实用技巧系列之蓝屏自动重启的解决办法

Win11蓝屏收集错误信息重启怎么修复? Win11蓝屏自动重启的解决办法Win11蓝屏收集错误信息重启怎么修复&#xff1f;Win11蓝屏了&#xff0c;该怎么解决呢&#xff1f;下面我们就来看看Win11蓝屏自动重启的解决办法电脑蓝屏收集错误信息重启&#xff0c;这是有使用Win11系统用户…

前端手写面试题总结

异步并发数限制 /*** 关键点* 1. new promise 一经创建&#xff0c;立即执行* 2. 使用 Promise.resolve().then 可以把任务加到微任务队列&#xff0c;防止立即执行迭代方法* 3. 微任务处理过程中&#xff0c;产生的新的微任务&#xff0c;会在同一事件循环内&#xff0c;追加…