生产者延迟消息和重试机制

news2024/12/27 15:30:05
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
延迟消息级别

在这里插入图片描述

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {

    //事务消息处理
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
            || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
        // 如果是延迟消息
        if (msg.getDelayTimeLevel() > 0) {
            // 如果设置的值过大,则设置为最大延迟级别
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }

            // 修改Topic
            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            // 根据延迟级别,决定要将其投递到那个队列中
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // 记录原始的 topic 和 队列信息
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            // 修改topic和队列信息
            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }

    public static final String RMQ_SYS_SCHEDULE_TOPIC = "SCHEDULE_TOPIC_XXXX";
    
RocketMQBroker端在存储生产者写入消息时,首先将其写入CommitLog里,为了不让用户立刻就能消费到这条消息,
这里先将Topic的名称修改为SCHEDULE_TOPIC_XXXX,并且根据设置的延迟级别选择将消息投放到哪一个队列里。

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
整个流程;

  1. 生产者发送延迟消息到Broker里
  2. 把消息转发到SCHEDULE_TOPIC_XXXX主题下的队列中
  3. 延迟服务定期消费SCHEDULE_TOPIC_XXXX主题下的消息,到时间了就把它拿到CommitLog中
  4. 消息重新被投放到目标Topic里
  5. 消费者消费延迟消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1712117.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于高光谱数据集的创新点实现-高斯核函数卷积神经网络

一、高光谱数据集简介 1.1 数据集简介 数据集链接在这:高光谱数据集(.mat.csv)-科研学术 数据集包含下面三个文件: 文件中包含.mat与.csv,145x145x220, 其实主要使用avirissub.csv文件,在代码上只是将mat文件转成了csv文件。具体avirissub.csv如下&am…

算法学习笔记(7.1)-贪心算法(分数背包问题)

##问题描述 给定 𝑛 个物品,第 𝑖 个物品的重量为 𝑤𝑔𝑡[𝑖−1]、价值为 𝑣𝑎𝑙[𝑖−1] ,和一个容量为 𝑐𝑎&…

【kubernetes】关于k8s集群如何将pod调度到指定node节点(亲和与反亲和等)

目录 一、调度约束 1.1K8S的 List-Watch 机制 ⭐⭐⭐⭐⭐ 1.1.1Pod 启动典型创建过程 二、调度过程 2.1Predicate(预选策略) 常见的算法 2.2priorities(优选策略)常见的算法 三、k8s将pod调度到指定node的方法 3.1指定…

白酒:传统产区的创新之路与品牌重塑

云仓酒庄豪迈白酒作为传统产区的品牌,面临着市场需求的不断变化和消费者口味的多样化。为了保持品牌竞争力和市场地位,传统产区需要不断创新和重塑品牌形象,以满足消费者的需求和期望。 首先,传统产区需要注重产品的品质和口感。品…

Owinps静态IP代理:跨境电商的优选解决方案

在快速发展的电子商务领域,尤其是跨境电商行业,网络的稳定性和安全性是成功经营的关键因素之一。在这背后,少不得一个重要的跨境电商工具——代理IP,而这其中,静态IP因其独特的稳定性和安全性,正逐渐成为众…

IC开发——Ubuntu安装VCS2018

1. 简介 VCS是一种常用的Verilog仿真和综合工具,由Synopsys公司开发。它提供了一个完整的设计验证环境,用于验证硬件设计的正确性和性能。以下是VCS工具的一些主要特点和功能: 仿真功能:VCS支持基于事件驱动的数字电路级仿真&am…

MySQL学习——连接服务器和输入查询

MySQL是一个流行的关系型数据库管理系统(RDBMS),由瑞典的MySQL AB公司开发,后来被Oracle公司收购。它使用SQL(结构化查询语言)作为访问和操作数据库的标准语言。 要查看 mysql 客户端程序提供的选项列表&a…

合约的值类型

基本数据类型:整数、枚举、布尔(类似java的数据类型)Address、Contract(这两种是solidity特有的数据类型)Fixed byte array(定长字节数组) Integer(int/uint) int/uint 以8位字节递增&#xf…

硬盘有EFI分区格式化不了,也删不了怎么办,不能读取磁盘

问题:EFI为系统引导分区表明这是一块系统盘,常规操作无法格式化也无法删除,也不能读取 解决: 1.管理员运行cmd 2.输入diskpart 3.输入list disk 查看系统磁盘,并找到你格式化不了的那块磁盘 4.select disk 编号 选择…

【Go专家编程——并发控制——三剑客】

并发控制 我们考虑这么一种场景,协程在A执行过程中需要创建子协程A1、A2、A3…An,协程创建完子协程后就等待子协程退出。 针对这种场景,Go提供了三种解决方案: Channel:使用channel控制子协程 优点:实现…

【稳定检索】2024年核能科学与材料、物理应用国际会议(NESMPA 2024)

2024年核能科学与材料、物理应用国际会议 2024 International Conference on Nuclear Energy Science and Materials, Physical Applications 【1】会议简介 2024年核能科学与材料、物理应用国际会议即将拉开帷幕,这是一场汇聚全球核能科学、材料研究及物理应用领域…

全志T527 适配双目tp2815_mipi

一、硬件信息 TP2815: 确认硬件信息: 1、通信接口:TWI2总线,引脚组为PE1 、PE2 2、RESET脚: 二、软件配置 1、设备树 t527 dtsi: bsp/configs/linux-5.15/sun55iw3p1.dtsi t527 uboot-board.dts device/config/chi…

必看丨SSL证书是什么?怎么免费申请一张?

SSL证书是一种网络安全证书,它能帮助网站实现数据加密传输,保障用户信息在浏览器和网站服务器之间的安全交流。想象一下SSL证书就像一封密信的封蜡,确保信件内容在途中不会被他人偷看或篡改。 SSL证书的作用主要有两点: 1. 身份验…

gin框架精通篇(二)

原生数据库使用 导入模块:go get -u github.com/go-sql-driver/mysql 安装 mysql 数据库 安装数据库可能遇到的问题:(网上的方法基本可以解决) ERROR 1045 (28000): Access denied for user ‘-root’‘localhost’ (using passwo…

【leetcode2765--最长交替子数组】

要求:给定一个数组,找出符合【x, x1,x,x-1】这样循环的最大交替数组长度。 思路:用两层while循环,第一个while用来找到符合这个循环的开头位置,第二个用来找到该循环的结束位置,并比较一下max进行记录。 …

LLVM技术在GaussDB等数据库中的应用

目录 LLVM和数据库 LLVM适用场景 LLVM对所有类型的SQL都会有收益吗? LLVM在OLTP中就一定没有收益吗? GaussDB中的LLVM 1. LLVM在华为应用于数据库的时间线 2. GaussDB LLVM实现简析 3. GaussDB LLVM支持加速的场景 支持LLVM的表达式&#xff1a…

河南道路与桥梁乙级资质升级门槛条件解读

河南道路与桥梁乙级资质升级门槛条件解读如下: 一、企业基本条件 法人资格: 企业需具备独立企业法人资格,能够独立承担民事责任。注册资金: 企业的注册资金应不少于100万元人民币,这一数字直接体现了企业的经济实力和…

Yann LeCun 和 Elon Musk 就 AI 监管激烈交锋

🦉 AI新闻 🚀 Yann LeCun 和 Elon Musk 就 AI 监管激烈交锋 摘要:昨天,Yann LeCun 和Elon Musk 在社交媒体就人工智能的安全性和监管问题展开激烈辩论。LeCun 认为目前对 AI 的担忧和监管为时过早,主张开放和共享。而…

【linux:基础IO】

目录 系统调用的文件接口: open read: write: lseek: close: 系统调用的文件接口: open 当文件存在时:int open (const char*pathname,int flags)当文件不存在时:int open (const char* pathname,int flags,mode_t mode) 返…

拉格朗日插值法的推导

1、插值的基本定义   设函数 y f ( x ) yf(x) yf(x)在区间 [ a , b ] [a,b] [a,b]上有定义&#xff0c;且已知它在 n 1 n1 n1个互异点 a ≤ x 0 < x 1 < . . . < x n ≤ b a\leq x_0<x_1<...<x_n\leq b a≤x0​<x1​<...<xn​≤b上的函数值 y 0 …