RabbitMQ死信队列延迟交换机

news2025/1/11 7:07:57

RabbitMQ死信队列&延迟交换机

1.什么是死信

死信&死信队列
1644476424544.png

死信队列的应用:

  • 基于死信队列在队列消息已满的情况下,消息也不会丢失
  • 实现延迟消费的效果。比如:下订单时,有15分钟的付款时间

2. 实现死信队列

2.1 准备Exchange&Queue
package com.llp.rabbitmq.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 死信队列配置
 */
@Configuration
public class DeadLetterConfig {

    public static final String NORMAL_EXCHANGE = "normal-exchange";
    public static final String NORMAL_QUEUE = "normal-queue";
    public static final String NORMAL_ROUTING_KEY = "normal.#";

    public static final String DEAD_EXCHANGE = "dead-exchange";
    public static final String DEAD_QUEUE = "dead-queue";
    public static final String DEAD_ROUTING_KEY = "dead.#";

    @Bean
    public Exchange normalExchange(){
        return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE).build();
    }

    @Bean
    public Queue normalQueue(){
        //普通队列,绑定死信队列
        return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.abc").build();
    }

    @Bean
    public Binding normalBinding(Queue normalQueue,Exchange normalExchange){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
    }

    @Bean
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
    }

    @Bean
    public Queue deadQueue(){
        return QueueBuilder.durable(DEAD_QUEUE).build();
    }

    @Bean
    public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }
}

2.2 实现效果
  • 基于消费者进行reject或者nack实现死信效果

    package com.llp.rabbitmq.topic;
    
    import com.llp.rabbitmq.config.DeadLetterConfig;
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    public class DeadListener {
    
        @RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE)
        public void consume(String msg, Channel channel, Message message) throws IOException {
            System.out.println("接收到normal队列的消息:" + msg);
            //设置消息决绝消费,不需要重新放入到队列中
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
            //或者
            //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
        }
    }
    
  • 消息的生存时间

    • 给消息设置生存时间

      @Test
      public void publishExpire(){
          String msg = "dead letter expire";
          rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE, "normal.abc", msg, new MessagePostProcessor() {
              @Override
              public Message postProcessMessage(Message message) throws AmqpException {
                  message.getMessageProperties().setExpiration("5000");
                  return message;
              }
          });
      }
      
    • 给队列设置消息的生存时间

      @Bean
      public Queue normalQueue(){
          return QueueBuilder.durable(NORMAL_QUEUE)
                  .deadLetterExchange(DEAD_EXCHANGE)
                  .deadLetterRoutingKey("dead.abc")
                  .ttl(10000)
                  .build();
      }
      
  • 设置Queue中的消息最大长度

    @Bean
    public Queue normalQueue(){
        return QueueBuilder.durable(NORMAL_QUEUE)
                .deadLetterExchange(DEAD_EXCHANGE)
                .deadLetterRoutingKey("dead.abc")
                .maxLength(1)
                .build();
    }
    

    只要Queue中已经有一个消息,如果再次发送一个消息,这个消息会变为死信!

3.延迟交换机

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/tag/3.8.9

死信队列实现延迟消费时,如果延迟时间比较复杂,比较多,直接使用死信队列时,需要创建大量的队列还对应不同的时间,可以采用延迟交换机来解决这个问题。

image-20230501154020297

将下载的文件上传到linux服务器并使用如下指令,将文件方到rabbitmq容器的plugins目录下

docker cp rabbitmq_delayed_message_exchange-3.8.9-0199d11c.ez 9da57c5038ba:/opt/rabbitmq/plugins

image-20230501154420697

在rabbitmq容器的/opt/rabbitmq/sbin目录下执行

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

重启容器生效

docker restart 9da57c5038ba

可以看到添加插件后多了一个延迟交换机的选项

image-20230501154955764

  • 构建延迟交换机

    package com.llp.rabbitmq.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * 延迟队列
     */
    @Configuration
    public class DelayedConfig {
    
        public static final String DELAYED_EXCHANGE = "delayed-exchange";
        public static final String DELAYED_QUEUE = "delayed-queue";
        public static final String DELAYED_ROUTING_KEY = "delayed.#";
    
        @Bean
        public Exchange delayedExchange(){
            Map<String, Object> arguments = new HashMap<>();
            arguments.put("x-delayed-type","topic");
            Exchange exchange = new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);
            return exchange;
        }
    
        @Bean
        public Queue delayedQueue(){
            return QueueBuilder.durable(DELAYED_QUEUE).build();
        }
    
        @Bean
        public Binding delayedBinding(Queue delayedQueue,Exchange delayedExchange){
            return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
        }
    }
    
    
  • 发送消息

    package com.llp.rabbitmq;
    
    import com.llp.rabbitmq.config.DelayedConfig;
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    public class DelayedPublisherTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void publish(){
            rabbitTemplate.convertAndSend(DelayedConfig.DELAYED_EXCHANGE, "delayed.abc", "xxxx", new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //设置消息指定多少时间被消费,单位毫秒
                    message.getMessageProperties().setDelay(30000);
                    return message;
                }
            });
        }
    }
    

**延迟交换机存在的问题:**在延迟推送消息的过程中rabbitmq重启了、或者说服务器宕机了就会导致消息丢失

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/480337.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络编程代码实例:IO复用版

文章目录 前言代码仓库内容代码&#xff08;有详细注释&#xff09;server.cclient_select.cclient_poll.cclient_epoll.c 结果总结参考资料作者的话 前言 网络编程代码实例&#xff1a;IO复用版。 代码仓库 yezhening/Environment-and-network-programming-examples: 环境和…

[Linux]网络连接、资源共享

​⭐作者介绍&#xff1a;大二本科网络工程专业在读&#xff0c;持续学习Java&#xff0c;输出优质文章 ⭐作者主页&#xff1a;逐梦苍穹 ⭐所属专栏&#xff1a;Linux基础操作。本文主要是分享一些Linux系统常用操作&#xff0c;内容主要来源是学校作业&#xff0c;分享出来的…

详解c++---vector模拟实现

目录标题 准备工作构造函数迭代器的完善性质相关的函数实现reservepush_back[ ]emptyresizeinserteraseerase后迭代器失效问题swapclear~vector老式拷贝构造迭代器构造新式拷贝构造老式赋值重载新式赋值重载N个数据的构造vector的浅拷贝问题 准备工作 首先我们知道vector是一个…

HTB靶机06-Beep-WP

beep 靶机IP&#xff1a;10.10.10.7 攻击机IP&#xff1a;10.10.14.6 web RCE漏洞利用、nmap提权 扫描 nmap 常规扫描&#xff1a; ┌──(xavier㉿xavier)-[~/HTB/005-Beep] └─$ sudo nmap -sSV -sC 10.10.10.7 -oN nmap1.out Starting Nmap 7.91 ( https://nmap.org …

《道德经》

《道德经》是春秋时期老子&#xff08;李耳&#xff09;的哲学作品&#xff0c;又称《道德真经》、《老子》、《五千言》、《老子五千文》&#xff0c;是中国古代先秦诸子分家前的一部著作&#xff0c;是道家哲学思想的重要来源。 道德经分上下两篇&#xff0c;原文上篇《德经…

网络安全: CIDR无类别路由

网络安全&#xff1a; CIDR无类别路由 CIDR是无类别路由&#xff0c;出现CIDR的原因是因为ipv4的地址被使用完客&#xff0c;CIDR的出现暂缓了ipv4用完的速度。 原本的ipv4很刻板&#xff0c;网络号分成8位&#xff0c;16位&#xff0c;24位作为掩码&#xff0c;也就是 xxx.0…

DRY编码原则

基本情况 DRY&#xff0c;Don’t repeat yourself&#xff0c;就是不要重复你自己的意思。 不要重复&#xff0c;是多么简单的意思了&#xff0c;重复就是多了一个一样的东西&#xff0c;为什么多一个呢&#xff0c;一个就可以了&#xff0c;这样才简单&#xff0c;这是一个常…

【报错】arXiv上传文章出现XXX.sty not found

笔者在overleaf上编译文章一切正常&#xff0c;但上传文章到arxiv时出现类似于如下报错&#xff1a; 一般情况下观察arxiv的编译log&#xff0c;不通过的原因&#xff0c;很多时候都是由于某一行导入了啥package&#xff0c;引起的报错&#xff1b;但是如果没有任何一个具体的…

AppSmith(安装与练习4套)

AppSmith官网文档&#xff1a; https://docs.appsmith.com/getting-started/setup/installation-guides/docker安装前需要已经安装好docker&#xff0c;需要版本如下&#xff1a; Docker ( 20.10.7或者更高) Docker-Compose ( 1.29.2或者更高) 安装Appsmith&#xff1a; 准备…

【Linux】第二站:Linux基本指令(一)

文章目录 一、操作系统OS概念1.OS是什么&#xff1f;2.为什么要有OS?1.一个好的操作系统&#xff0c;他的衡量指标是什么&#xff1f;2.操作系统的核心工作 3.理解我们在计算机上的操作4.Linux和Windows的特点 二、Linux基本指令1. 指令概述2.ls指令1> ls -l2> ls -a3&g…

ChatGPT其实并不想让开发人员做这5件事情

前言 ChatGPT已经火爆了快半年了吧&#xff0c;紧接着国内也开始推出了各种仿制品&#xff0c;我甚至一度怀疑&#xff0c;如果人家没有推出ChatGPT&#xff0c;这些仿制品会不会出现。而很多人也嗨皮得不行&#xff0c;利用各种方法开始科学上网&#xff0c;用ChatGPT做各种觉…

不得不说的行为型模式-解释器模式

解释器模式&#xff1a; 解释器模式&#xff08;Interpreter Pattern&#xff09;是一种行为型设计模式&#xff0c;它定义了一种语言&#xff0c;用于解释执行特定的操作&#xff0c;例如正则表达式、查询语言、数学表达式等。该模式通过定义一个解释器来解释语言中的表达式…

分治与减治算法实验:题目6 淘汰赛冠军问题

目录 前言 实验内容 实验流程 实验分析 实验过程 流程演示 写出伪代码 实验代码 运行结果 改进算法 总结 前言 淘汰赛冠军问题是一个经典的算法设计与分析的问题&#xff0c;它要求我们在给定的n个参赛者中&#xff0c;通过一系列的比赛&#xff0c;找出最终的冠军…

nginx负载均衡+RabbitMq集群及镜像队列(2)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、nginx是什么&#xff1f;二、搭建步骤1.软件和环境2.安装nginx3.负载均衡配置nginx.conf4.应用程序配置 总结 前言 提示&#xff1a;这里可以添加本文要记…

Linux套接字编程-3

在之前的套接字编程内容中&#xff0c;我们讲述完了UDP和TCP的主要内容&#xff0c;但是对于TCP通信中具体的实现还存在一些问题没有解决&#xff0c;所以我们本篇博客将对进行分析和解决。 目录 1.引入 2.多进程 3.多线程 1.引入 在上一篇博客中&#xff0c;当我们使用T…

sed进阶之模式替换

shell脚本编程系列 &符号可以代表替换命令中的匹配模式&#xff0c;不管模式匹配到了什么样的文本&#xff0c;都可以使用&符号代表这些内容。这样就能处理匹配模式的任何单词了。 echo "The cat sleeps in his hat." | sed s/.at/"&"/g&…

告别低效繁琐的Prometheus告警管理,Nightingale助你快速响应故障!

Prometheus的告警规则、记录规则都是采用配置文件管理&#xff0c;适合奉行Infrastructure as Code的公司或团队内部使用。但如果要把监控能力开放给全公司&#xff0c;就要支持协同操作的 UI&#xff0c;让各个团队互不干扰的同时共享成果。 开源方案&#xff1a; Grafana 擅…

No.053<软考>《(高项)备考大全》【冲刺7】《软考之 119个工具 (5)》

《软考之 119个工具 &#xff08;5&#xff09;》 84.文档审查:85.信息收集技术:86.核对表分析:87.假设分析:88.图解技术:89.SWOT 分析:90.风险概率和影响评估:91.概率和影响矩阵(包含在风险管理计划中):92.风险数据质量评估:93.风险分类(包含在风险管理计划中):94.风险紧迫性评…

快速多关键字统计

实例需求&#xff1a;在每个章节中统计关键字&#xff08;“√”, “”, “〇”, “空缺”&#xff09;的个数&#xff0c;B列中的章节编号作为章节划分的标识&#xff0c;例如1.1.1 ~ 1.1.5为第1.1章节&#xff0c;对应工作表的12 ~ 16行&#xff0c;其中黄色列为需要统计的数…

【软考数据库】第五章 计算机网络

目录 5.1 网络功能和分类 5.2 OSI七层模型 5.3 TCP/IP协议 5.4 传输介质 5.5 通信方式和交换方式 5.6 IP地址 5.7 IPv6 5.8 网络规划和设计 5.9 其他考点补充 5.10 网络安全技术 5.11 网络安全协议 前言&#xff1a; 笔记来自《文老师软考数据库》教材精讲&#xff…