使用RabbitMQ实现延迟消息的完整指南

news2025/1/8 12:22:38

在分布式系统中,消息队列通常用于解耦服务,RabbitMQ是一个广泛使用的消息队列服务。延迟消息(也称为延时队列或TTL消息)是一种常见的场景应用,特别适合处理某些任务在一段时间后执行的需求,如订单超时处理、延时通知等。

本文将以具体代码为例,展示如何使用RabbitMQ来实现延迟消息处理,涵盖队列和交换机的配置、消息的发送与接收以及死信队列的处理。

什么是延迟消息?

延迟消息是指消息在发送到队列后,经过设定的时间延迟再被消费。RabbitMQ 本身没有直接支持延迟队列的功能,但可以通过 TTL(Time To Live)+ 死信队列(Dead Letter Queue, DLQ) 的组合来实现。当消息超过TTL(消息存活时间)后,不会被立即消费,而是会被转发到绑定的死信队列,从而实现延迟处理。

RabbitMQ中的延迟消息原理

在RabbitMQ中,我们可以通过以下几个概念来实现延迟消息:

  1. TTL(Time To Live):可以为队列设置TTL,消息超过该时间后会被标记为“死信”。
  2. 死信队列(Dead Letter Queue):当消息在正常队列中过期或处理失败时,RabbitMQ可以将它们路由到一个死信队列,死信队列可以用来处理这些过期或未处理的消息。
  3. x-dead-letter-exchangex-dead-letter-routing-key:可以通过配置队列的参数,将过期消息发送到一个专门的死信交换器,并根据指定的路由键转发到死信队列。

 

 消息来到ttl.queue消息队列,过期时间内无人消费,消息来到死信交换机hmall.direct,在direct.queue消息队列无需等待。

1. RabbitMQ的配置

首先,我们需要配置两个队列和两个交换机:一个用于存放延时消息,另一个用于处理超时的死信消息。

package com.heima.stroke.configuration;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class RabbitConfig {
    // 延迟时间 单位:毫秒 (这里设为30秒)
    private static final long DELAY_TIME = 1000 * 30;

    // 行程超时队列
    public static final String STROKE_OVER_QUEUE = "STROKE_OVER_QUEUE";
    // 行程死信队列
    public static final String STROKE_DEAD_QUEUE = "STROKE_DEAD_QUEUE";

    // 行程超时队列交换机
    public static final String STROKE_OVER_QUEUE_EXCHANGE = "STROKE_OVER_QUEUE_EXCHANGE";
    // 行程死信队列交换机
    public static final String STROKE_DEAD_QUEUE_EXCHANGE = "STROKE_DEAD_QUEUE_EXCHANGE";

    // 行程超时交换机 Routing Key
    public static final String STROKE_OVER_KEY = "STROKE_OVER_KEY";
    // 行程死信交换机 Routing Key
    public static final String STROKE_DEAD_KEY = "STROKE_DEAD_KEY";

    /**
     * 声明行程超时队列,并设置其参数
     * x-dead-letter-exchange:绑定的死信交换机
     * x-dead-letter-routing-key:死信路由Key
     * x-message-ttl:消息的过期时间
     */
    @Bean
    public Queue strokeOverQueue() {
        Map<String, Object> args = new HashMap<>(3);
        args.put("x-dead-letter-exchange", STROKE_DEAD_QUEUE_EXCHANGE);
        args.put("x-dead-letter-routing-key", STROKE_DEAD_KEY);
        args.put("x-message-ttl", DELAY_TIME); // 设置TTL为30秒
        return QueueBuilder.durable(STROKE_OVER_QUEUE).withArguments(args).build();
    }

    @Bean
    public DirectExchange strokeOverQueueExchange() {
        return new DirectExchange(STROKE_OVER_QUEUE_EXCHANGE);
    }

    @Bean
    public Binding bindingStrokeOverDirect() {
        return BindingBuilder.bind(strokeOverQueue()).to(strokeOverQueueExchange()).with(STROKE_OVER_KEY);
    }
}

解释:

TTL设置:我们通过x-message-ttl设置消息的过期时间为30秒。

死信队列绑定:通过x-dead-letter-exchangex-dead-letter-routing-key设置,当消息过期时,它会被转发到死信交换机,再路由到死信队列。

2. 生产者发送延迟消息

接下来,我们通过生产者向超时队列发送消息,这些消息将在TTL过期后转发到死信队列。

package com.heima.stroke.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MQProducer {
    private final static Logger logger = LoggerFactory.getLogger(MQProducer.class);

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 发送延时消息到行程超时队列
     *
     * @param strokeVO 消息体
     */
    public void sendOver(StrokeVO strokeVO) {
        String mqMessage = JSON.toJSONString(strokeVO);
        logger.info("send timeout msg:{}", mqMessage);

        rabbitTemplate.convertAndSend(RabbitConfig.STROKE_OVER_QUEUE_EXCHANGE, RabbitConfig.STROKE_OVER_KEY, mqMessage);
    }
}

解释:

sendOver 方法将消息发送到超时队列,消息将在超时后进入死信队列。生产者不需要额外处理TTL或死信的配置,只需发送消息即可。

3. 消费者监听死信队列

当消息超过TTL后,将会被转发到死信队列。消费者需要监听死信队列并处理这些消息。

j

package com.heima.stroke.rabbitmq;

import com.alibaba.fastjson.JSON;
import com.heima.modules.vo.StrokeVO;
import com.heima.stroke.configuration.RabbitConfig;
import com.heima.stroke.handler.StrokeHandler;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class MQConsumer {
    private final static Logger logger = LoggerFactory.getLogger(MQConsumer.class);

    @Autowired
    private StrokeHandler strokeHandler;

    /**
     * 监听死信队列
     *
     * @param message 消息体
     * @param channel RabbitMQ的Channel
     * @param tag 消息的Delivery Tag
     */
    @RabbitListener(
            bindings = {
                    @QueueBinding(
                            value = @Queue(value = RabbitConfig.STROKE_DEAD_QUEUE, durable = "true"),
                            exchange = @Exchange(value = RabbitConfig.STROKE_DEAD_QUEUE_EXCHANGE),
                            key = RabbitConfig.STROKE_DEAD_KEY)
            })
    @RabbitHandler
    public void processStroke(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
        StrokeVO strokeVO = JSON.parseObject(message.getBody(), StrokeVO.class);
        logger.info("get dead msg:{}", message.getBody());
        
        if (strokeVO == null) {
            return;
        }

        try {
            // 处理超时的行程消息
            strokeHandler.timeoutHandel(strokeVO);
            // 手动确认消息
            channel.basicAck(tag, false);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

解释:

@RabbitListener 注解绑定了死信队列的监听器。当消息被转发到死信队列时,该消费者会接收到消息。

使用 channel.basicAck(tag, false) 手动确认消息处理成功,确保消息不会重复消费。

4. 处理超时业务逻辑

在我们的业务中,当消息超时未处理时,将其状态设置为超时。

public void timeoutHandel(StrokeVO strokeVO) {
    // 获取司机行程ID和乘客行程ID
    String inviterTripId = strokeVO.getInviterTripId();
    String inviteeTripId = strokeVO.getInviteeTripId();

    // 检查邀请状态是否为未确认
    String inviteeStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId);
    String inviterStatus = redisHelper.getHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId);

    if (String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviteeStatus) &&
        String.valueOf(InviteState.UNCONFIRMED.getCode()).equals(inviterStatus)) {
        // 更新为超时状态
        redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviteeTripId, inviterTripId, String.valueOf(InviteState.TIMEOUT.getCode()));
        redisHelper.addHash(HtichConstants.STROKE_INVITE_PREFIX, inviterTripId, inviteeTripId, String.valueOf(InviteState.TIMEOUT.getCode()));
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2220390.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CISP/NISP二级练习题-第一卷

目录 另外免费为大家准备了刷题小程序和docx文档&#xff0c;有需要的可以私信获取 1&#xff0e;不同的信息安全风险评估方法可能得到不同的风险评估结果&#xff0c;所以组织 机构应当根据各自的实际情况选择适当的风险评估方法。下面的描述中错误的是 &#xff08;&#…

Cesium 实战 - 自定义纹理材质 - 立体墙(旋转材质)

Cesium 实战 - 自定义纹理材质 - 立体墙(旋转材质) 核心代码完整代码在线示例Cesium 给实体对象(Entity)提供了很多实用的样式,基本满足普通项目需求; 但是作为 WebGL 引擎,肯定不够丰富,尤其是动态效果样式。 对于实体对象(Entity),可以通过自定义材质,实现各种…

MoeCTF 2024 ---Misc方向WP

安全杂项 signin 题目描述&#xff1a; xdsec的小伙伴们和参赛者来上课&#xff0c;碰巧这一天签到系统坏了&#xff0c;作为老师的你&#xff0c;要帮他们 教师代签。 特殊提醒&#xff1a;luo同学今天好像在宿舍打游戏&#xff0c;不想来上课&#xff0c;这是严重的缺勤行为…

VideoCLIP-XL:推进视频CLIP模型对长描述的理解

摘要 对比语言-图像预训练&#xff08;CLIP&#xff09;已被广泛研究并应用于众多领域。然而&#xff0c;预训练过程中对简短摘要文本的重视阻碍了CLIP理解长描述的能力。在视频方面&#xff0c;这个问题尤为严重&#xff0c;因为视频通常包含大量详细内容。在本文中&#xff…

【JavaEE】——TCP应答报文机制,超时重传机制

阿华代码&#xff0c;不是逆风&#xff0c;就是我疯 你们的点赞收藏是我前进最大的动力&#xff01;&#xff01; 希望本文内容能够帮助到你&#xff01;&#xff01; 目录 一&#xff1a;TCP协议&#xff08;面试重点重点&#xff09; 1&#xff1a;报头长度 2&#xff1a;…

Pytest参数详解 — 基于命令行模式!

1、--collect-only 查看在给定的配置下哪些测试用例会被执行 2、-k 使用表达式来指定希望运行的测试用例。如果测试名是唯一的或者多个测试名的前缀或者后缀相同&#xff0c;可以使用表达式来快速定位&#xff0c;例如&#xff1a; 命令行-k参数.png 3、-m 标记&#xff08;…

鲸信私有化即时通信如何平衡安全性与易用性之间的关系?

即时通信已经成为我们生活中不可或缺的一部分。从日常沟通到工作协作&#xff0c;每一个信息的传递都承载着信任与效率。然而&#xff0c;随着网络安全威胁日益严峻&#xff0c;如何在享受即时通信便捷的同时&#xff0c;确保信息的私密性与安全性&#xff0c;成为了摆在我们面…

AGV电子地图之贝塞尔曲线

贝塞尔曲线在AGV系统的电子地图中的重要位置 AGV电子地图之贝塞尔曲线_哔哩哔哩_bilibili 点击关注不迷路&#xff0c;你的关注是我们最大的动力 在AGV&#xff08;自动引导车&#xff09;系统的电子地图中&#xff0c;贝塞尔曲线有着重要的作用&#xff0c;主要体现在以下几个…

如何保证Redis和数据库的数据一致性

文章目录 0. 前言1. 补充知识&#xff1a;CP和AP2. 什么情况下会出现Redis与数据库数据不一致3. 更新缓存还是删除缓存4. 先操作缓存还是先操作数据库4.1 先操作缓存4.1.1 数据不一致的问题是如何产生的4.1.2 解决方法&#xff08;延迟双删&#xff09;4.1.3 最终一致性和强一致…

【大数据算法】一文掌握大数据算法之:大数据算法分析技术。

大数据算法分析技术 1、引言2、 大数据分析技术2.1 时间/空间复杂度2.2 I/O 复杂度2.3 结果质量2.4 通信复杂度 3、总结 1、引言 小屌丝&#xff1a;鱼哥&#xff0c;最近更文有些不频繁了哈。 小鱼&#xff1a;这一个月不见&#xff0c;你这说话方式也变了。 小屌丝&#xff…

C++与C语言的排序算法对比(插入,希尔,归并)

1. 引言 排序算法是计算机科学中的基础概念&#xff0c;广泛应用于数据处理和算法设计中。本文将通过插入排序、希尔排序、归并排序和选择排序这四种常见的排序算法&#xff0c;分别用C和C语言实现&#xff0c;并对它们进行优劣对比&#xff0c;以帮助读者更好地理解这两种语言…

MATLAB支持的字体

listfonts 列出可用的系统字体 {Adobe Devanagari } {Agency FB } {Algerian } {AlienCaret } {AMS } {Arial } {Arial Black …

[LeetCode] 21. 合并两个有序链表

题目描述&#xff1a; 将两个升序链表合并为一个新的 升序 链表并返回。新链表是通过拼接给定的两个链表的所有节点组成的。 示例 1&#xff1a; 输入&#xff1a;l1 [1,2,4], l2 [1,3,4] 输出&#xff1a;[1,1,2,3,4,4]示例 2&#xff1a; 输入&#xff1a;l1 [], l2 […

数据结构(8.3_1)——冒泡排序

交换排序&#xff1a; 冒泡排序和快速排序 冒泡排序&#xff1a; 示例&#xff1a; 从行往前将A[i-1]和A[i]比较若遇到A[i-1]>A[i]则将两个元素交换 注意&#xff1a; 代码实现&#xff1a; //交换 void swap(int& a, int& b) {int temp a;a b;b temp; } //冒…

入门!Linux 常见指令及权限管理全面指南

Linux 操作系统在现代计算机应用中扮演着重要的角色&#xff0c;广泛用于服务器、桌面系统、嵌入式设备及云计算平台等领域。理解和掌握 Linux 常见指令及权限管理机制&#xff0c;是每一位系统管理员和开发人员的基础技能。本文将详细介绍 Linux 系统的基本背景、常用指令、权…

设计模式概览

设计模式是一种解决常见编程问题的经验总结&#xff0c;提供了代码的可重用性、可扩展性和可维护性。常见的设计模式有23个&#xff0c;主要分为三大类&#xff1a;创建型模式、结构型模式和行为型模式。下面是这三类设计模式的详细分类和讲解&#xff1a; 一、创建型模式 创建…

进入 Searing-66 火焰星球:第一周游戏指南

Alpha 第四季已开启&#xff0c;穿越火焰星球 Searing-66&#xff0c;带你开启火热征程。准备好勇闯炙热的沙漠&#xff0c;那里有无情的高温和无情的挑战在等待着你。从高风险的烹饪对决到炙热的冒险&#xff0c;Searing-66 将把你的耐力推向极限。带上充足的水&#xff0c;天…

Fusion创建一个简单的api脚本文件

我的Fusion版本&#xff1a;Fusion 2.0.20476 x86_64 脚本模块在实用程序->附加模型->脚本和附加模块&#xff0c;快捷键为shifts 里面有一些演示脚本&#xff0c;可以直接使用 也可以自己创建一个新的脚本 创建的脚本在此处—— 选择脚本文件&#xff0c;点击编辑&a…

小新学习Docker之Ansible 的脚本 --- playbook 剧本

一、playbook 剧本简介 playbooks 本身由以下各部分组成&#xff1a; &#xff08;1&#xff09;Tasks&#xff1a;任务&#xff0c;即通过 task 调用 ansible 的模板将多个操作组织在一个 playbook 中运行 &#xff08;2&#xff09;Variables&#xff1a;变量 &#xff08;3…

Linux系统基础-动静态库

个人主页&#xff1a;C忠实粉丝 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 C忠实粉丝 原创 Linux系统基础-动态库和静态库 收录于专栏[Linux学习] 本专栏旨在分享学习Linux的一点学习笔记&#xff0c;欢迎大家在评论区交流讨论&#x1f48c; 目录 1. 动…