RabbitMQ实现延迟消息发送——实战篇

news2025/3/1 16:04:35

在项目中,我们经常需要使用消息队列来实现延迟任务,本篇文章就向各位介绍使用RabbitMQ如何实现延迟消息发送,由于是实战篇,所以不会讲太多理论的知识,还不太理解的可以先看看MQ的延迟消息的一个实现原理再来看这篇文章跟着练手哦~

需求背景

我这儿的一个需求背景大概是干部添加完活动后,由管理员进行审批,审批通过后,该活动id连同设置的过期时间会被放入消息队列中,等到活动结束时间到的时候,自动将活动的状态设置为已完成,这里华丽一个活动图,各位参考一下。

ce9bd03466514d6294a1a1de81f7772d.png

需求了解完之后我们就可以开始的写代码啦~(手动微笑)

相关知识点拓展

这里还是简单提一下MQ实现延迟队列的一个方法,一种是用插件,还有一种是使用死信队列,当然本文我们使用的就是通过死信队列来实现的。

当我们的一个正常消息因为设置了过期时间或者被消费者拒绝消费的时候,这条消息就会被放入死信队列中,然后死信队列再进行消费。

然后啰嗦一下,说一下MQ的交换机类型,以及死信交换机一般选用哪种:

1. Direct Exchange(直连交换机)

  • 特点
    • 根据消息的 Routing Key 精确匹配队列的 Binding Key
    • 完全匹配时,消息才会被路由到对应的队列。
  • 适用场景
    • 点对点消息传递,消息需要精确路由到特定队列。
  • 示例
    • 消息的 Routing Key 为 order.created,队列的 Binding Key 也为 order.created,则消息会被路由到该队列。

2. Fanout Exchange(扇出交换机)

  • 特点
    • 将消息广播到所有绑定到该交换机的队列,忽略 Routing Key。
  • 适用场景
    • 广播消息,消息需要发送到多个队列。
  • 示例
    • 消息发送到 Fanout Exchange,所有绑定到该交换机的队列都会收到消息。

3. Topic Exchange(主题交换机)

  • 特点
    • 根据消息的 Routing Key 和队列的 Binding Key 进行模式匹配。
    • Binding Key 支持通配符:
      • *:匹配一个单词。
      • #:匹配零个或多个单词。
  • 适用场景
    • 消息需要根据模式路由到多个队列。
  • 示例
    • 消息的 Routing Key 为 order.created.us,队列的 Binding Key 为 order.created.*,则消息会被路由到该队列。

4. Headers Exchange(头交换机)

  • 特点
    • 根据消息的 Headers(键值对)匹配队列的 Binding Arguments。
    • 忽略 Routing Key。
  • 适用场景
    • 消息需要根据复杂的条件路由到队列。
  • 示例
    • 消息的 Headers 包含 type=order 和 region=us,队列的 Binding Arguments 要求 x-match=all 且 type=order,则消息会被路由到该队列。

5. Default Exchange(默认交换机)

  • 特点
    • RabbitMQ 默认创建的交换机,类型为 Direct Exchange。
    • 每个队列都会自动绑定到默认交换机,Binding Key 为队列名称。
  • 适用场景
    • 默认情况下,消息可以直接发送到队列。

死信交换机适合使用哪种类型?

死信交换机(DLX, Dead Letter Exchange)的类型选择取决于你的业务需求。以下是常见的选择:

1. Direct Exchange

  • 适用场景
    • 死信消息需要精确路由到特定的死信队列。
  • 示例
    • 将死信消息路由到 dlx-queue,用于统一处理所有死信消息。

2. Topic Exchange

  • 适用场景
    • 死信消息需要根据不同的 Routing Key 路由到不同的死信队列。
  • 示例
    • 将死信消息根据业务类型(如 order.deadpayment.dead)路由到不同的死信队列。

3. Fanout Exchange

  • 适用场景
    • 死信消息需要广播到多个死信队列。
  • 示例
    • 将死信消息同时发送到日志队列和报警队列。

推荐选择

  • 大多数情况下,死信交换机使用 Direct Exchange,因为死信消息通常需要精确路由到一个死信队列,用于统一处理。
  • 如果死信消息需要根据不同的条件路由到多个队列,可以使用 Topic Exchange

代码部分

首先,我们需要定义一个死信交换机和死信队列,用来接收来自普通队列的消息。

//    创建死信交换机,处理延迟消息通知
    @Bean("dead_letter_exchange")
    public DirectExchange delayExchange(){
        return new DirectExchange("dead_letter_exchange",true,false);
    }
//    创建死信队列
    public Queue deadLetterQueue(){
        Queue queue = new Queue("dead_letter_queue", true);
        rabbitAdmin.declareQueue(queue);
        log.info("死信队列声明成功:" + queue.getName());
        return queue;    }

然后,我们需要配置一个普通的消息队列和一个普通的交换机,这个消息队列需要设置对应的死信交换机和死信路由,同时我们这个普通队列需要接收一个过期时间,保证一到过期时间消息就会被发送到死信队列当中。

//    创建一个普通队列,接受一个过期时间,出列活动结束后,发送到死信队列
    public Queue normalQueue(Long expireTime){
        Map<String,Object> args = new HashMap<>();
        if (expireTime != null && expireTime > 0) {  // 确保 TTL 是正数
            args.put("x-message-ttl", expireTime);
        }
        // 设置死信交换机
        args.put("x-dead-letter-exchange",deadLetterExchange);
        // 设置死信路由键
        args.put("x-dead-letter-routing-key","dead_letter_routing_key");
        Queue queue = new Queue("normal_queue", true, false, false, args);
        log.info("普通队列声明成功:" + queue.getName());
        return queue;    }
//    创建一个普通交换机,处理活动结束自动设置活动状态为结束
    @Bean("activity_end_exchange")
    public DirectExchange activityEndExchange(){
        return new DirectExchange("activity_end_exchange");
    }

然后我们需要分别将死信交换机和死信队列,普通交换机和普通队列分别进行绑定。

//    将死信队列和死信交换机进行绑定
    public void bindDeadLetterRouting(){
        Queue queue=queueDeclareConfig.deadLetterQueue();
        Binding binding = BindingBuilder.bind(queue)
                .to(deadLetterExchange)
                .with("dead_letter_routing_key");
        rabbitAdmin.declareBinding(binding);
        log.info("死信队列绑定成功,死信队列名称----》" + queue.getName() + ",死信交换机名称----》" + deadLetterExchange.getName());
    }

//    绑定活动结束交换机和普通队列
    public void bindActivityEndRouting(Long expireTime) {
        Queue queue = queueDeclareConfig.normalQueue(expireTime);
        Binding binding = BindingBuilder.bind(queue)
                .to(activityEndExchange)
                .with("activity_end_routing_key");
        rabbitAdmin.declareBinding(binding);
    }

当然,我们还需要配置生产者来发送消息到交换机里面

//活动结束后,发送消息到死信队列,自动设置活动结束状态
    public void sendActivityEndMessage(Long expireTime, Integer activityId) {
        rabbitMQBindRoutingConfig.bindDeadLetterRouting();
        rabbitMQBindRoutingConfig.bindActivityEndRouting(expireTime);
        try {
            // 将消息发送到普通队列,等待消息过期发送到死信交换机
            rabbitTemplate.convertAndSend("activity_end_exchange", "activity_end_routing_key"
                    , activityId
                    , msg -> {
                        msg.getMessageProperties().setExpiration(expireTime.toString());
                        return msg;
                    }
            );
        } catch (Exception e) {
            log.error("发送消息失败------->" + activityId);
            throw new RuntimeException("发送消息失败---->" + activityId);
        }
    }

这里生产者的代码可以根据你的业务逻辑具体进行更改~

消费者逻辑也需要进行编写一下

//    使用MQ延迟队列,活动结束,修改活动状态
    @RabbitListener(queues = "dead_letter_queue")
    public void updatePlaceOccupyStatus(Message message, Channel channel){
        try {
            String messageBody = new String(message.getBody(), StandardCharsets.UTF_8);
            Integer activityId = Integer.parseInt(messageBody);
            ActivityInfo activityInfo = baseMapper.selectById(activityId);
            LambdaUpdateWrapper<ActivityInfo> wrapper = new LambdaUpdateWrapper<>();
            wrapper.eq(ActivityInfo::getActivityId,activityId)
                    .set(ActivityInfo::getProgress,StatusConstant.FINISH);
            if(baseMapper.update(activityInfo,wrapper)>0){
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        } catch (Exception e) {
            log.error("处理消息时发生错误:" + e.getMessage());
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } catch (IOException ioException) {
                ioException.printStackTrace();
            }
        }

消费者这边需要注意的是如果你选择的提交类型不是自动提交的话,在处理完消息之后需要手动ack一下消息,不然消费的消息不会被认为已经消费,从而导致消息积压,也会在之后的消费中重复进行消费,因此你需要告诉生产者这条消息已经被消费了。

当然,如果在消费的过程中出现了什么问题,可以设置以下这行代码:

419e93610db14c019fcfb49ad0d23703.png

basicNack方法接收三个参数:
deliveryTag: 消息的标识符。
multiple: 是否对多个消息进行否定确认。
requeue: 是否将消息重新放入队列。 

可以根据你的需求进行设定~

然后的然后,我们需要再application.yml当中进行配置相关信息:

rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
#    确认消息发送到交换机上
    publisher-confirm-type: correlated
    #    消息发送到队列确认,失败回调
    publisher-returns: true
    listener:
      direct:
        acknowledge-mode: manual
        retry:
          enabled: true
#          重试的时间间隔为1s
          initial-interval: 1000ms
#          最大重试3次
          max-attempts: 3
#          最大的重试时间间隔为2s
          max-interval: 2000ms
#          每次重试时间间隔为1s,每次重试时间间隔倍数
          multiplier: 1.0
          #重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)
        default-requeue-rejected: false
      simple:
        default-requeue-rejected: false
        acknowledge-mode: manual
#        最小消费者数量
        concurrency: 1
#        最大消费者数量
        max-concurrency: 10
        retry:
          enabled: true
          initial-interval: 1000ms
          max-attempts: 3
          max-interval: 2000ms
          multiplier: 1.0

上面给出了一个比较全的配置,你可以根据你的需求进行选择,但是需要注意的是default-requeue-rejected: false这一行配置一定要先配置,不然你的消息在普通队列中过期了,是不会发送到死信队列当中进行消费的~

到这儿,基本上所有的代码都写的差不多了,当然我们还需要再rabbitmq控制平台上分别建一个普通交换机和一个死信交换机,一个普通队列和一个私信队列,然后分别绑定就可以了。

注意的是,普通交换机也需要在平台上配置一次死信队列和死信路由:

5f1c7325f13a4910bd2d28e8b62a7f60.png

1d9202ed97554cc4ab1601d6f839b0ef.png

到这儿,如果没有什么问题的话基本上已经可以直接运行了,所以我的这篇文章到这儿基本上也已经结束了,如果你有什么问题,可以评论区留言,我们相互学习~

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2278456.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《Keras 3 在 TPU 上的肺炎分类》

Keras 3 在 TPU 上的肺炎分类 作者&#xff1a;Amy MiHyun Jang创建日期&#xff1a;2020/07/28最后修改时间&#xff1a;2024/02/12描述&#xff1a;TPU 上的医学图像分类。 &#xff08;i&#xff09; 此示例使用 Keras 3 在 Colab 中查看 GitHub 源 简介 设置 本教程将介…

1.17组会汇报

STRUC-BENCH: Are Large Language Models Good at Generating Complex Structured Tabular Data? STRUC-BENCH&#xff1a;大型语言模型擅长生成复杂的结构化表格数据吗&#xff1f;23年arXiv.org 1概括 这篇论文旨在评估大型语言模型&#xff08;LLMs&#xff09;在生成结构…

PyTorch使用教程(2)-torch包

1、简介 torch包是PyTorch框架最外层的包&#xff0c;主要是包含了张量的创建和基本操作、随机数生成器、序列化、局部梯度操作的上下文管理器等等&#xff0c;内容很多。我们基础学习的时候&#xff0c;只有关注张量的创建、序列化&#xff0c;随机数、张量的数学数学计算等常…

idea gradle compiler error: package xxx does not exist

idea 编译运行task时报项目内的包不存在&#xff0c;如果你试了网上的其它方法还不能解决&#xff0c;应该是你更新了新版idea&#xff0c;项目用的是旧版jdk&#xff0c;请在以下编译器设置中把项目JDK字节码版本设为8&#xff08;jdk1.8&#xff0c;我这里是17请自行选择&…

Nmap之企业漏洞扫描(Enterprise Vulnerability Scanning for Nmap)

简介 Namp是一个开源的网络连接端扫描软件&#xff0c;主要用于网络发现和安全审核。‌它可以帮助用户识别网络上的设备、分析它们的服务、检测操作系统类型&#xff0c;甚至发现潜在的安全漏洞。Nmap由Fyodor开发&#xff0c;最初是为了满足网络管理员的需求&#xff0c;但随…

RabbitMQ前置概念

文章目录 1.AMQP协议是什么&#xff1f;2.rabbitmq端口介绍3.消息队列的作用和使用场景4.rabbitmq工作原理5.整体架构核心概念6.使用7.消费者消息推送限制&#xff08;work模型&#xff09;8.fanout交换机9.Direct交换机10.Topic交换机&#xff08;推荐&#xff09;11.声明队列…

RabbitMQ---TTL与死信

&#xff08;一&#xff09;TTL 1.TTL概念 TTL又叫过期时间 RabbitMQ可以对队列和消息设置TTL&#xff0c;当消息到达过期时间还没有被消费时就会自动删除 注&#xff1a;这里我们说的对队列设置TTL,是对队列上的消息设置TTL并不是对队列本身&#xff0c;不是说队列过期时间…

MySQL8数据库全攻略:版本特性、下载、安装、卸载与管理工具详解

大家好&#xff0c;我是袁庭新。 MySQL作为企业项目中的主流数据库&#xff0c;其5.x和8.x版本尤为常用。本文将详细介绍MySQL 8.x的特性、下载、安装、服务管理、卸载及管理工具&#xff0c;旨在帮助用户更好地掌握和使用MySQL数据库。 1.MySQL版本及下载 企业项目中使用的…

хорошо哈拉少wordpress俄语主题

хорошо哈拉少wordpress俄语主题 wordpress俄文网站模板&#xff0c;推荐做俄罗斯市场的外贸公司建俄语独立站使用。 演示 https://www.jianzhanpress.com/?p7360

【STM32-学习笔记-10-】BKP备份寄存器+时间戳

文章目录 BKP备份寄存器Ⅰ、BKP简介1. BKP的基本功能2. BKP的存储容量3. BKP的访问和操作4. BKP的应用场景5. BKP的控制寄存器 Ⅱ、BKP基本结构Ⅲ、BKP函数Ⅳ、BKP使用示例 时间戳一、Unix时间戳二、时间戳的转换&#xff08;time.h函数介绍&#xff09;Ⅰ、time()Ⅱ、mktime()…

Python毕业设计选题:基于python的酒店推荐系统_django+hadoop

开发语言&#xff1a;Python框架&#xff1a;djangoPython版本&#xff1a;python3.7.7数据库&#xff1a;mysql 5.7数据库工具&#xff1a;Navicat11开发软件&#xff1a;PyCharm 系统展示 管理员登录 管理员功能界面 用户管理 酒店客房管理 客房类型管理 客房预定管理 用户…

【c++继承篇】--继承之道:在C++的世界中编织血脉与传承

目录 引言 一、定义二、继承定义格式2.1定义格式2.2继承关系和访问限定符2.3继承后子类访问权限 三、基类和派生类赋值转换四、继承的作用域4.1同名变量4.2同名函数 五、派生类的默认成员构造函数5.1**构造函数调用顺序&#xff1a;**5.2**析构函数调用顺序&#xff1a;**5.3调…

Elasticsearch:Jira 连接器教程第二部分 - 6 个优化技巧

作者&#xff1a;来自 Elastic Gustavo Llermaly 将 Jira 连接到 Elasticsearch 后&#xff0c;我们现在将回顾最佳实践以升级此部署。 在本系列的第一部分中&#xff0c;我们配置了 Jira 连接器并将对象索引到 Elasticsearch 中。在第二部分中&#xff0c;我们将回顾一些最佳实…

【狂热算法篇】探秘图论之 Floyd 算法:解锁最短路径的神秘密码(通俗易懂版)

&#xff1a; 羑悻的小杀马特.-CSDN博客羑悻的小杀马特.擅长C/C题海汇总,AI学习,c的不归之路,等方面的知识,羑悻的小杀马特.关注算法,c,c语言,青少年编程领域.https://blog.csdn.net/2401_82648291?spm1010.2135.3001.5343 在本篇文章中&#xff0c;博主将带大家去学习所谓的…

npm的包管理

从哪里下载包 国外有一家 IT 公司&#xff0c;叫做 npm,Inc.这家公司旗下有一个非常著名的网站: https://www.npmjs.com/&#xff0c;它是全球最大的包共享平台&#xff0c;你可以从这个网站上搜索到任何你需要的包&#xff0c;只要你有足够的耐心!到目前位置&#xff0c;全球约…

GitLab:添加SSH密钥之前,您不能通过SSH来拉取或推送项目代码

1、查看服务器是否配置过 [rootkingbal-ecs-7612 ~]# cd .ssh/ [rootkingbal-ecs-7612 .ssh]# ls authorized_keys id_ed25519 id_ed25519.pub id_rsa id_rsa.pub2、创建密钥 $ ssh-keygen -t rsa -C kingbalkingbal.com # -C 后写你的邮箱 一路回车 3、复制密钥 [rootk…

为ARM64架构移植Ubuntu20.04换源的发现

在为ARM64架构(RK3566)移植ubuntu20.04的时候发现在更换为国内源之后&#xff0c;无法正常完成apt update,报错为: Ign:25 http://mirrors.aliyun.com/ubuntu focal-updates/main arm64 Packages …

LARGE LANGUAGE MODELS ARE HUMAN-LEVEL PROMPT ENGINEERS

题目 大型语言模型是人类级别的提示工程师 论文地址&#xff1a;https://arxiv.org/abs/2211.01910 项目地址&#xff1a;https://github.com/keirp/automatic_prompt_engineer 摘要 通过对自然语言指令进行调节&#xff0c;大语言模型 (LLM) 显示了作为通用计算机的令人印象深…

Redisson发布订阅学习

介绍 Redisson 的消息订阅功能遵循 Redis 的发布/订阅模式&#xff0c;该模式包括以下几个核心概念&#xff1a; 发布者&#xff08;Publisher&#xff09;&#xff1a;发送消息到特定频道的客户端。在 Redis 中&#xff0c;这通过 PUBLISH 命令实现。 订阅者&#xff08;Sub…

git操作(Windows中GitHub)

使用git控制GitHub中的仓库版本&#xff0c;并在Windows桌面中创建与修改代码&#xff0c;与GitHub仓库进行同步。 创建自己的GitHub仓库 创建一个gen_code实验性仓库用来学习和验证git在Windows下的使用方法&#xff1a; gen_code仓库 注意&#xff0c;创建仓库时不要设置…