RabbitMQ——死信队列和延迟队列

news2025/1/4 17:22:34

文章目录

  • RabbitMQ——死信队列和延迟队列
    • 1、死信队列
    • 2、基于插件的延迟队列
      • 2.1、安装延迟队列插件
      • 2.2、代码实例

RabbitMQ——死信队列和延迟队列

1、死信队列

死信队列(Dead Letter Queue,DLQ)是 RabbitMQ 中的一种重要特性,用于处理无法被消费的消息,防止消息丢失。

死信的来源

在消息队列中,当消息满足一定条件而无法被正常消费时,这些消息会被发送到死信队列。满足条件的情况包括但不限于:

  • 消息被拒绝(basic.rejectbasic.nack)且不重新入队(requeue 参数为 false)。
  • 消息过期(TTL,Time-To-Live)。
  • 队列长度超过限制,无法再添加数据到mq中。

生产者

package com.weipch.rabbitmq.dlq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.GetResponse;
import com.weipch.util.RabbitMqUtils;

/**
 * @Author 方唐镜
 * @Create 2024-03-03 14:08
 * @Description
 */
public class Produce {


	private static final String NORMAL_EXCHANGE = "normal_exchange";


	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
        //模拟消息过期 10s
		//AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
		for (int i = 0; i < 10; i++) {
			String message = "hello world" + i;
			channel.basicPublish(NORMAL_EXCHANGE, "normal-routing-key", null, message.getBytes());
		}
	}
}

消费者

正常队列:

package com.weipch.rabbitmq.dlq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.weipch.util.RabbitMqUtils;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author 方唐镜
 * @Create 2024-03-03 13:50
 * @Description
 */
public class Consumer01 {


	private static final String NORMAL_EXCHANGE = "normal_exchange";
	private static final String DEAD_EXCHANGE = "dead_exchange";

	private static final String NORMAL_QUEUE = "normal_queue";
	private static final String DEAD_QUEUE = "dead_queue";


	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		//声明死信交换机和队列
		channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
		channel.queueDeclare(DEAD_QUEUE, false, false, false, null);
		//绑定
		channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "dead-routing-key");


		//声明普通交换机和队列
		channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
		//死信配制 指定死信交换机和死信路由键
		Map<String, Object> map = new HashMap<>();
		map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
		map.put("x-dead-letter-routing-key", "dead-routing-key");
		//最大长度
		//map.put("x-max-length", 6);
		channel.queueDeclare(NORMAL_QUEUE, false, false, false, map);
		channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "normal-routing-key");
		DeliverCallback deliverCallback = (consumerTag, delivery) -> {
			String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
			if (message.contains("5")){
				System.out.println("Consumer01接收消息:" + message + ",此消息被拒绝");
				//拒绝消息并把消息丢入死信队列
				channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false);
			}else {
				System.out.println("Consumer01接收消息:" + message);
				channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
			}
		};
		channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, (consumerTag, e) -> {});
	}
}

死信队列:

package com.weipch.rabbitmq.dlq;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.weipch.util.RabbitMqUtils;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author 方唐镜
 * @Create 2024-03-03 13:50
 * @Description
 */
public class Consumer02 {
	
	private static final String DEAD_QUEUE = "dead_queue";

	public static void main(String[] args) throws Exception {
		Channel channel = RabbitMqUtils.getChannel();
		channel.basicConsume(DEAD_QUEUE, true,
			(consumerTag, delivery) -> System.out.println("Consumer02:" + new String(delivery.getBody(), StandardCharsets.UTF_8)),
			(consumerTag, e) -> {});
	}
}

生产者发送消息到正常队列,而消费者负责消费正常队列的消息。当消息被消费者拒绝并不再重新投递时,消息会被发送到死信队列。

2、基于插件的延迟队列

延迟队列是一种消息队列中的一种特殊类型,它允许消息在一定的延迟时间后再被消费。延迟队列的元素是希望在指定时间到了以后或之前取出处理。在实际应用中,延迟队列通常用于处理需要延时执行的任务或事件。

使用场景

  1. 定时任务执行: 在需要定时执行任务的应用中,可以使用延迟队列来实现。将任务消息发送到延迟队列,设置消息的过期时间为任务执行的时间,当消息过期时,消费者即可执行相应的任务。
  2. 消息重试机制: 当某个操作失败时,可以将操作消息发送到延迟队列,并设置合适的重试时间。在消息重试的过程中,如果操作成功,消息将正常被消费;如果一直失败,可以选择在一定时间后放弃重试,将消息发送到死信队列或进行其他处理。
  3. 订单超时处理: 在电商等场景中,对于长时间未支付的订单,可以将订单消息发送到延迟队列,并设置订单的过期时间。当订单过期时,系统可以取消订单、释放库存等操作。
  4. 限流与流控: 通过使用延迟队列,可以实现消息的有序处理和限流,确保系统在高峰期不会因为瞬时大量请求而过载。
  5. 系统通知与提醒: 在需要发送系统通知或提醒的场景中,可以使用延迟队列来实现消息的定时推送。
  6. 缓解数据库压力: 对于一些需要定期清理的数据,可以使用延迟队列来触发数据清理操作,减轻数据库压力。

2.1、安装延迟队列插件

下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

以docker方式安装

1、把下载好的插件从服务器拷贝到 RabbitMQ 容器内plugins目录

docker cp rabbitmq_delayed_message_exchange-3.13.0.ez 7c8726620871:/plugins

插件版本和rabbitmq版本一致

2、进入容器查看插件

在这里插入图片描述

3、启动插件

root@my-rabbit:/plugins# rabbitmq-plugins enable rabbitmq_delayed_message_exchange

4、重启容器

docker restart 7c8726620871

5、安装成功

在这里插入图片描述

2.2、代码实例

配置类

package springbootrabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;


@Configuration
public class DelayedQueueConfig {
    //    队列
    public static final String DELAYED_QUEUE_NAME = "delayed.queue";
    //    交换机
    public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
    //    routingKey
    public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";

    //    声明队列
    @Bean
    public Queue delayedQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    //    声明交换机 基于插件的交换机
    @Bean
    public CustomExchange delayedExchange() {

        HashMap<String, Object> arguments = new HashMap<>();
        arguments.put("x-delayed-type", "direct");
        /*
         * 1.交换机名称
         * 2.交换机类型
         * 3.是否需要持久化
         * 4.是否需要自动删除
         * 5.其他参数
         * */
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, arguments);
    }

    //    绑定
    @Bean
    public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue delayedQueue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {
        return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

生产者

@GetMapping("/sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {
    log.info("当前时间:{},发送一条时长{}毫秒消息给延迟队列delayed.queue:{}", new Date(), delayTime, message);
    rabbitTemplate.convertAndSend(DelayedQueueConfig.DELAYED_EXCHANGE_NAME, DelayedQueueConfig.DELAYED_ROUTING_KEY, message, msg -> {
        //            发送消息的时候 延迟时长
        msg.getMessageProperties().setDelay(delayTime);
        return msg;
    });
}

消费者

@Slf4j
@Component
public class DelayedQueueConsumer {
    //监听消息
    @RabbitListener(queues = DelayedQueueConfig.DELAYED_QUEUE_NAME)
    public void receiveDelayedQueue(Message message) {
        String msg = new String(message.getBody());
        log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1524885.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

实用crontab教程-一文读懂crontab

文章目录 Crontab是什么类似的工具有哪些Systemd (systemctl)Upstart (initctl)SysVinit (/etc/init.d scripts) 作用用途&#xff1a; crontab的配置文件格式crontab表达式检查工具Crontab Guru:Cron Maker: 运行身份原理&#xff1a;指定以特定用户身份运行&#xff1a;使用用…

如何使用人工智能打造超用户预期的个性化购物体验

回看我的营销职业生涯&#xff0c;我见证了数字时代如何重塑客户期望。从一刀切的方法过渡到创造高度个性化的购物体验已成为企业的关键。在这个客户期望不断变化的新时代&#xff0c;创造个性化的购物体验不再是奢侈品&#xff0c;而是企业的必需品。人工智能 &#xff08;AI&…

使用IDEA进行Scala编程相关安装步骤

一、相关安装包&#xff08;jdk最好用1.8版本&#xff0c;其他不做要求&#xff09; IDEA安装包 jdk-8u101-windows-x64.exe scala-2.12.19 二、安装顺序 在安装IDEA之前&#xff0c;首先要安装好java和scala环境&#xff0c;以便后续配置 三、jdk和scala安装要求 1.jdk安…

【爬虫逆向】Python逆向采集猫眼电影票房数据

进行数据抓包&#xff0c;因为这个网站有数据加密 !pip install jsonpathCollecting jsonpathDownloading jsonpath-0.82.2.tar.gz (10 kB)Preparing metadata (setup.py) ... done Building wheels for collected packages: jsonpathBuilding wheel for jsonpath (setup.py) .…

KKVIEW远程:远程工具软件有哪些

远程工具软件有哪些 随着科技的发展和全球化进程的加快&#xff0c;远程工作、在线协作已成为我们日常生活和工作中不可或缺的一部分。为了更有效地进行远程操作和管理&#xff0c;我们需要借助各种远程工具软件。接下来&#xff0c;我将为大家介绍几种市场上广受欢迎的远程工…

JSON 的了解和使用

目录 1. JSON 2. JSONcpp 的安装 3. JSONcpp 相关API的使用 3.1. 将 Json::Value 对象转化为 std::string 3.1.1. Json::Value 类 3.1.2. Json::Value::toStyledString 接口 3.1.3. Json::StyledWriter 类 3.1.4. Json::StyledWriter::write 接口 3.1.5. Json::Fas…

【计算机网络】基本概念

基本概念 IP 地址端口号协议协议分层封装分用客户端服务器请求和响应两台主机之间的网络通信流程 IP 地址 概念&#xff1a;IP 地址主要是用于唯一标识网络主机、其他网络设备&#xff08;如路由器&#xff09;的网络地址。简单来说&#xff0c;IP地址用来唯一定位主机。格式&…

【GPT-SOVITS-02】GPT模块解析

说明&#xff1a;该系列文章从本人知乎账号迁入&#xff0c;主要原因是知乎图片附件过于模糊。 知乎专栏地址&#xff1a; 语音生成专栏 系列文章地址&#xff1a; 【GPT-SOVITS-01】源码梳理 【GPT-SOVITS-02】GPT模块解析 【GPT-SOVITS-03】SOVITS 模块-生成模型解析 【G…

【开源鸿蒙】模拟运行OpenHarmony轻量系统QEMU RISC-V版

文章目录 一、准备工作1.1 编译输出目录简介 二、QEMU安装2.1 安装依赖2.2 获取源码2.3 编译安装2.4 问题解决 三、用QEMU运行OpenHarmony轻量系统3.1 qemu-run脚本简介3.2 qemu-run脚本参数3.3 qemu-run运行效果3.4 退出QEMU交互模式 四、问题解决五、参考链接 开源鸿蒙坚果派…

AntV L7深圳智慧城市

本案例使用L7库和Mapbox GL JS构建深圳智慧城市。 文章目录 1. 引入 CDN 链接2. 引入组件3. 创建地图4. 创建场景5. 获取数据6. 创建面图层7. 演示效果8. 代码实现 1. 引入 CDN 链接 <!-- 1.引入CDN链接 --> <script src"https://unpkg.com/antv/l7"><…

Ubuntu 22.04 Nvidia Audio2Face Error:Failed to build TensorRT engine

背景 1.在Ubuntu22.04上安装Audio2Face后启动&#xff0c;嘴形不会实时同步。控制台显示如【图一】&#xff1a; 【图一】 2.log日志如下: Error: Error during running command: [‘/home/admin/omniverse/libs/deps/321b626abba810c3f8d1dd4d247d2967/exts/omni.audio2fac…

uniapp修改头像,选择图片

一、页面效果 二、手机上的效果 使用过的实例&#xff1a; 手机上就会显示类似如下&#xff1a; 三、代码 <view class"cleaner-top" click"chooseImg"><view class"cleaner-avatar"><image :src"imgArr" mode"…

K8S POD 启动探针 startupProbe 的使用

当我们启动一个POD 时&#xff0c; 当k8s detect 里面的容器启动成功时&#xff0c; 就会认为这个POD 启动完成了&#xff0c; 通常就会在状态里表示 ready 1/1 … 例如 rootk8s-master:~# kubectl get pods NAME READY STATUS RESTARTS AGE bq-api-demo 1…

基于STM32G4的0.96寸OLED显示屏驱动程序(HAL库),支持硬件/软件I2C

基于STM32G474的0.96寸OLED(SSD1306)显示屏驱动程序&#xff08;4针脚I2C接口&#xff09;&#xff0c;支持硬件IIC/软件IIC&#xff0c;HAL库版。 这款驱动程序比较完善&#xff0c;可以实现 英文、整数、浮点数、汉字、图像、二进制数、十六进制数 等内容显示&#xff0c;可…

Vue 3响应式系统详解:ref、toRefs、reactive及更多

&#x1f31f; 前言 欢迎来到我的技术小宇宙&#xff01;&#x1f30c; 这里不仅是我记录技术点滴的后花园&#xff0c;也是我分享学习心得和项目经验的乐园。&#x1f4da; 无论你是技术小白还是资深大牛&#xff0c;这里总有一些内容能触动你的好奇心。&#x1f50d; &#x…

195基于matlab的凸轮机构GUI界面

基于matlab的凸轮机构GUI界面 &#xff0c; 凸轮设计与仿真 绘制不同的凸轮轮廓曲线 &#xff0c;凸轮机构运动参数包括推程运动角&#xff0c;回程运动角&#xff0c;远休止角&#xff0c;近休止角。运动方式&#xff0c;运动规律。运动仿真过程可视化。内容齐全详尽。用GUI打…

ARM Cortex R52内核 01 概述

ARM Cortex R52内核 01 Introduction 1.1 Cortex-R52介绍 Cortex-R52处理器是一种中等性能、有序、超标量处理器&#xff0c;主要用于汽车和工业应用。它还适用于各种其他嵌入式应用&#xff0c;如通信和存储设备。 Cortex-R52处理器具有一到四个核心&#xff0c;每个核心实…

redis 常见的异常

目录 一、缓存穿透 1、概念 解决方案 &#xff08;1&#xff09;布隆过滤器 (2)、缓存空对象 二、缓存雪崩 1、概念 解决方案 &#xff08;1&#xff09;redis高可用 &#xff08;2&#xff09;限流降级 &#xff08;3&#xff09;数据预热 一、缓存穿透 1、概念 缓…

java----网络编程(一)

一.什么是网络编程 用户在浏览器中&#xff0c;打开在线视频网站&#xff0c;如优酷看视频&#xff0c;实质是通过网络&#xff0c;获取到网络上的一个视频资源。 与本地打开视频文件类似&#xff0c;只是视频文件这个资源的来源是网络。所谓网络资源就是网络中获取数据。而所…

sqlite 常见命令 表结构

在 SQLite 中&#xff0c;将表结构保存为 SQL 具有一定的便捷性和重要性&#xff0c;原因如下 便捷性&#xff1a; 备份和恢复&#xff1a;将表结构保存为 SQL 可以方便地进行备份。如果需要还原或迁移数据库&#xff0c;只需执行保存的 SQL 脚本&#xff0c;就可以重新创建表…