RabbitMQ手动ACK与死信队列

news2024/11/29 4:26:47

为了保证消息从队列可靠的达到消费者,RabbitMQ 提供了消息确认机制(Message Acknowledgement)。

默认情况下RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。所以在实际项目中会使用手动Ack。

1、手动应答

  1. Channel.basicAck (用于肯定确认):RabbitMQ 已知道该消息成功被处理,可以将其丢弃了。
  2. Channel.basicNack (用于否定确认)
  3. Channel.basicReject (用于否定确认):与 Channel.basicNack 相比少一个参数,不处理该消息了直接拒绝,可以将其丢弃了。

消费者端的配置,相关属性值改为自己的:

server.port=8082
#rabbitmq服务器ip
spring.rabbitmq.host=localhost
#rabbitmq的端口
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=lonewalker
#密码
spring.rabbitmq.password=XX
#配置虚拟机
spring.rabbitmq.virtual-host=demo
#设置消费端手动 ack   none不确认  auto自动确认  manual手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

修改消费代码:请勿复制使用,会卡死

package com.example.consumer.service;

import com.alibaba.fastjson.JSONObject;
import com.example.consumer.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @description:
 * @author: LoneWalker
 * @create: 2022-04-04
 **/
@Service
@Slf4j
public class ConsumerService {


    @RabbitListener(queues ="publisher.addUser")
    public void addUser(String userStr,Channel channel,Message message){
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("我一直在重试");
            int a = 1/0;
            User user = JSONObject.parseObject(userStr,User.class);
            log.info(user.toString());
            //手动ack  第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息
            channel.basicAck(deliveryTag,false);
        } catch (Exception e) {
            //手动nack 告诉rabbitmq该消息消费失败  第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为true
            try {
                channel.basicNack(deliveryTag,false,true);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }
}

先启动发布者发送消息,查看控制台:有一条消息待消费·

启动消费端,因为代码中有除0,所以会报错,这里就会出现一条unacked消息:

因为设置的是将消息重新请求,所以它会陷入死循环

防止出现这种情况,可以将basicNack最后一个参数改为false,让消息进去死信队列

2、什么是死信队列

说简单点就是备胎队列,而死信的来源有以下几种:

  1. 消息被否定确认,使用 channel.basicNackchannel.basicReject ,并且此时requeue 属性被设置为false
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

3、配置死信队列

一般会为每个重要的业务队列配置一个死信队列。可以分为以下步骤:

  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列

从控制台将之前的交换机都删除,然后修改代码。

首先看一下发布者的配置代码:

package com.example.publisher.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

/**
 * @author LoneWalker
 * @date 2023/4/8
 * @description
 */
@Slf4j
@Configuration
public class RabbitMqConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        //设置给rabbitTemplate
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
    /************ 正常配置 ******************/
    /**
     * 正常交换机,开启持久化
     */
    @Bean
    DirectExchange normalExchange() {
        return new DirectExchange("normalExchange", true, false);
    }

    @Bean
    public Queue normalQueue() {
        // durable: 是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
        // exclusive: 默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。
        // autoDelete: 是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
        Map<String, Object> args = deadQueueArgs();
        // 队列设置最大长度
        args.put("x-max-length", 5);
        return new Queue("normalQueue", true, false, false, args);
    }

    @Bean
    public Queue ttlQueue() {
        Map<String, Object> args = deadQueueArgs();
        // 队列设置消息过期时间 60 秒
        args.put("x-message-ttl", 60 * 1000);
        return new Queue("ttlQueue", true, false, false, args);
    }

    @Bean
    Binding normalRouteBinding() {
        return BindingBuilder.bind(normalQueue())
                .to(normalExchange())
                .with("normalRouting");
    }

    @Bean
    Binding ttlRouteBinding() {
        return BindingBuilder.bind(ttlQueue())
                .to(normalExchange())
                .with("ttlRouting");
    }


    /**************** 死信配置 *****************/
    /**
     * 死信交换机
     */
    @Bean
    DirectExchange deadExchange() {
        return new DirectExchange("deadExchange", true, false);
    }

    /**
     * 死信队列
     */
    @Bean
    public Queue deadQueue() {
        return new Queue("deadQueue", true, false, false);
    }

    @Bean
    Binding deadRouteBinding() {
        return BindingBuilder.bind(deadQueue())
                .to(deadExchange())
                .with("deadRouting");
    }

    /**
     * 转发到 死信队列,配置参数
     */
    private Map<String, Object> deadQueueArgs() {
        Map<String, Object> map = new HashMap<>();
        // 绑定该队列到死信交换机
        map.put("x-dead-letter-exchange", "deadExchange");
        map.put("x-dead-letter-routing-key", "deadRouting");
        return map;
    }


    /**
     * 消息成功到达交换机会触发
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("交换机收到消息成功:" + correlationData.getId());
        }else {
            log.error("交换机收到消息失败:" + correlationData.getId() + "原因:" + cause);
        }
    }

    /**
     * 消息未成功到达队列会触发
     * @param returnedMessage
     */
    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.error("{}--消息未成功到达队列",returnedMessage.getMessage().getMessageProperties().getMessageId());
    }
}

properties

server.port=8081
#rabbitmq服务ip
spring.rabbitmq.host=localhost
#rabbitmq端口号
spring.rabbitmq.port=5672
#用户名
spring.rabbitmq.username=用户名改为自己的
#密码
spring.rabbitmq.password=密码改为自己的
#虚拟机
spring.rabbitmq.virtual-host=demo

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true

发送消息:

@RequiredArgsConstructor
@Service
public class PublisherServiceImpl implements PublisherService{

    private final RabbitTemplate rabbitTemplate;

    @Override
    public void addUser(User user) {

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(UUID.randomUUID().toString());

        rabbitTemplate.convertAndSend("normalExchange","normalRouting",user,correlationData);
    }
}

4、模拟场景

4.1消息处理异常

文章开篇说到的消息手动ack,一旦出现异常会陷入死循环,那么不把消息放回原队列,而是放入死信队列,然后抛异常由人工处理:

package com.example.consumer.service;

import com.alibaba.fastjson.JSONObject;
import com.example.consumer.entity.User;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

import java.io.IOException;

/**
 * @description:
 * @author: LoneWalker
 * @create: 2022-04-04
 **/
@Service
@Slf4j
public class ConsumerService {


    @RabbitListener(queues ="normalQueue")
    public void addUser(String userStr,Channel channel,Message message){
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            int a = 1/0;
            User user = JSONObject.parseObject(userStr,User.class);
            log.info(user.toString());
            //手动ack  第二个参数为false是表示仅仅确认当前消息 true表示确认之前所有的消息
            channel.basicAck(deliveryTag,false);
        } catch (Exception e) {
            //手动nack 告诉rabbitmq该消息消费失败  第三个参数:如果被拒绝的消息应该被重新请求,而不是被丢弃或变成死信,则为true
            try {
                channel.basicNack(deliveryTag,false,false);
            } catch (IOException ex) {
                throw new RuntimeException("消息处理失败");
            }
        }
    }
}

注意basicNack的第三个参数,设置为false后就不会重新请求。

4.2队列达到最大长度

配置上面的代码已经有过了:

测试的话我们发6条消息,加上4.1测试产生的死信,预期死信队列中应该会有两条:

4.3消息TTL过期

过期时间TTL表示可以对消息设置预期的时间,超过这个时间就删除或者放入死信队列。修改routingKey为ttlRouting。上述代码中配置过期时间为60s

死信队列中的消息处理和正常的队列没什么区别,就不赘述了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/576528.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

spring-Bean管理-springboot原理-Maven高级

spring-Bean管理-springboot原理-Maven高级 配置优先级Bean管理1.获取bean2.bean作用域3.第三方bean SpringBoot原理Maven高级1.分模块设计与开发2.继承与聚合3.私服1.介绍2.资源上传与下载 配置优先级 优先级(低→高) application.yaml&#xff08;忽略) application.yml appl…

利用Servlet编写第一个“hello world“(续)

利用Servlet编写第一个“hello world“ &#x1f50e;通过插件 Smart Tomcat 简化 打包代码 与 部署 操作下载Smart Tomcat配置Smart Tomcat &#x1f50e;Servlet 中的常见错误404(Not Found)&#x1f36d;请求路径出错&#x1f36d;war 包未被正确加载 405(Method Not Allowe…

【Android-JetpackCompose】13、实战在线课程 App

文章目录 一、BottomNavigation 底部导航1.1 底部导航栏的布局、点击1.2 设置 bottomBar 的颜色1.3 设置顶部 actionBar 的颜色 二、主页 StudyScreen2.1 顶部状态栏2.2 一、BottomNavigation 底部导航 1.1 底部导航栏的布局、点击 首先&#xff0c;构造 NavigationItem 的 d…

安装stable-diffusion

安装流程&#xff1a; 下载stable-diffusion源码 <https://github.com/AUTOMATIC1111/stable-diffusion-webui/releases/tag/v1.2.1>安装python <https://www.python.org/ftp/python/3.10.6/python-3.10.6-amd64.exe>添加host 打开C:\Windows\System32\drivers\etc…

django基于scrapy的音乐歌曲分析及推荐系统

而在线音乐网站作为一个网络载体&#xff0c;在音乐的传播&#xff0c;创作&#xff0c;欣赏等方面对音乐的发展产生了前所未有的影响—。 &#xff08;1&#xff09;电脑网络技术的发展使人们通过音乐网站接触到了多的音乐模式。 &#xff08;2&#xff09;网民数量的激增使更…

两台群晖NAS之间使用FTP或SFTP进行数据高速拷贝问题

两台群晖NAS之间使用FTP或SFTP进行数据高速拷贝问题 为了更好的浏览体验&#xff0c;欢迎光顾勤奋的凯尔森同学个人博客http://www.huerpu.cc:7000 在有些时候&#xff0c;我们新买了一台全新群晖NAS需要把旧群晖NAS里的数据拷贝到新设备里&#xff0c;特别像电影、电视剧、小…

Python实战基础13-装饰器

1、先明白这段代码 第一波 def foo():print(foo)foo # 表示是函数 foo() # 表示执行foo函数第二波 def foo():print(foo)foo lambda x: x 1foo() # 执行lambda表达式&#xff0c;而不再是原来的foo函数&#xff0c;因为foo这个名字被重新指向了另外一个匿名函数函数名仅仅是…

攻不下dfs不参加比赛(九)

标题 为什么练dfs题目为什么练dfs 相信学过数据结构的朋友都知道dfs(深度优先搜索)是里面相当重要的一种搜索算法,可能直接说大家感受不到有条件的大家可以去看看一些算法比赛。这些比赛中每一届或多或少都会牵扯到dfs,可能提到dfs大家都知道但是我们为了避免眼高手低有的东…

Python入门(十三)函数(一)

函数&#xff08;一&#xff09; 1.函数概述2.函数定义2.1向函数传递信息2.2实参和形参 作者&#xff1a;xiou 1.函数概述 函数是带名字的代码块&#xff0c;用于完成具体的工作。要执行函数定义的特定任务&#xff0c;可调用该函数。需要在程序中多次执行同一项任务时&#…

win10微软Edge浏览器通过WeTab新标签页免费无限制使用ChatGPT的方法,操作简单,使用方便

目录 一、使用效果 二、注册使用教程 1.打开Edge浏览器扩展 2.选择Edge浏览器外接程序 3.搜索WeTab 4.进入管理扩展 5.启用扩展 ​编辑 6.进入WeTab新标签页 7.打开Chat AI 8.注册 9.使用 ChatGPT是OpenAI推出的人工智能语言模型&#xff0c;能够通过理解和学习人类…

opencv_c++学习(二十五)

一、Harris角点介绍 1、海瑞斯角点不可能出现在图像平滑的区域&#xff08;上图1&#xff09;&#xff1b; 2、图像边缘的支线出不可能出现海瑞斯角点&#xff08;上图2&#xff09;&#xff1b; 3、海瑞斯角点会出现在顶点处。&#xff08;上图3&#xff09;&#xff1b; 上图…

一文带你了解MySQL之redo日志

前言 本文以及接下来的几篇文章将会频繁的使用到我们前边唠叨的InnoDB记录行格式、页面格式、索引原理、表空间的组成等各种基础知识&#xff0c;如果大家对这些东西理解的不透彻&#xff0c;那么阅读下边的文字可能会特别的些费力&#xff0c;为保证您能正常的理解&#xff0…

Android 12系统源码_WindowInsets (一)WindowInsets相关类和功能介绍

一、什么是WindowInsets? WindowInsets源码解释为Window Content的一系列插值集合,可以理解为可以将其理解为不同的窗口装饰区域类型,比如一个Activity相对于手机屏幕需要空出的地方以腾给StatusBar、Ime、NavigationBar等系统窗口,具体表现为该区域需要的上下左右的宽高。…

Oracle Linux 8.8 发布 - Oracle 提供支持 RHEL 兼容发行版

Oracle Linux 8.8 发布 - Oracle 提供支持 RHEL 兼容发行版 Oracle Linux with Unbreakable Enterprise Kernel (UEK) & Red Hat compatible kernel (RHCK) 请访问原文链接&#xff1a;https://sysin.org/blog/oracle-linux-8/&#xff0c;查看最新版。原创作品&#xff…

opencv实践项目-图像卡通化

目录 1.如何使图像卡通画2.铅笔素描滤波器3. 细节增强滤波器4. 双边过滤器5. 铅笔边缘滤波器 1.如何使图像卡通画 我们通常需要执行两个主要步骤将图像转换为卡通图像&#xff1a;边缘检测和区域平滑。 边缘检测的主要目的显然是为了强调图像的边缘&#xff0c;因为卡通图像通…

银行从业——法律法规

第一章、经济基础知识 第一节、宏观经济分析 【 知识点1】 宏观经济发展目标 宏观经济发展的总体目标一般包括四个&#xff1a; 宏观经济发展的总体目标 衡量指标1、经济增长国内生产总值&#xff08;GDP&#xff09;2、充分就业 失业率3、物价稳定通货膨胀率4、国际…

Sangria:类似Nova folding scheme的relaxed PLONK for PLONK

1. 引言 前序博客有&#xff1a; Nova: Recursive Zero-Knowledge Arguments from Folding Schemes学习笔记SuperNova&#xff1a;为多指令虚拟机执行提供递归证明基于Nova/SuperNova的zkVMSangria&#xff1a;PLONK Folding 主要见2023年4月 Geometry团队Nicolas Mohnblat…

多线程编程(1)

本篇重点 了解进程和线程的区别和联系使用Java&#xff0c;实现创建线程的五种写法 目录 使用Java进行多线程编程方法一&#xff1a;继承 Thread, 重写 run方法二: 实现 Runnable, 重写 run方法三: 继承 Thread, 重写 run, 使用匿名内部类方法四: 实现 Runnable, 重写 run, 使用…

【剑指offer】数据结构——数

目录 数据结构——数直接解【剑指offer】43. 1&#xff5e;n 整数中 1 出现的次数【剑指offer】44. 数字序列中某一位的数字【剑指offer】49. 丑数【剑指offer】60. n个骰子的点数【剑指offer】62. 圆圈中最后剩下的数字【剑指offer】64. 求12…n 特殊解——分治法 &#xff1a…

【网络】- TCP/IP四层(五层)协议 - 网际层(网络层) - 划分子网、构造超网

目录 一、概述二、分类IP地址不合理的地方三、划分子网四、无分类编址方法 一、概述 前面的文章介绍了网络层的网际协议IP&#xff0c;介绍了IP地址的定义&#xff0c;知道了IP地址分为网络标识(网络地址)、主机标识(主机地址)两部分&#xff0c;也清楚了最初IP地址是按照分类被…