第十二章 RabbitMQ之失败消息处理策略

news2024/10/12 1:32:57

目录

一、引言

二、RepublishMessageRecoverer 实现

2.1. 实现步骤

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

2.2.2. 常规交换机队列配置类 

2.2.3. 消费者代码

2.2.4. 消费者yml配置 

2.2.5. 生产者代码 

2.2.6. 生产者yml配置 

 2.2.7. 运行效果


一、引言

Spring AMQP提供了消费者失败重试机制,在消费者出现异常时利用本地重试,而不是无限地requeue到mq。我们可以通过在application.yaml文件中添加配置来开启重试机制:

spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: Wangzhexiao
    password: Wangzhexiao
    virtual-host: /hangzhou
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack
        # 消费者重试机制配置
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false




在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:

RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息(默认方式)

ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机(推荐)

二、RepublishMessageRecoverer 实现

在实际项目的生产环境中,通过 RepublishMessageRecoverer 方式我们可以定义一个异常队列和交换机,来接收其他交换机队列转发的无法处理的异常消息。然后我们可以查看其中的异常消息并进行人工处理。

2.1. 实现步骤

1. 将失败处理策略改为RepublishMessageRecoverer

2. 定义接收失败消息的交换机、队列及其绑定关系

3. 定义RepublishMessageRecoverer

2.2. 实现代码

2.2.1. 异常交换机队列回收期配置类

package com.example.consumer;

import jakarta.annotation.Resource;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 异常交换机/队列/消息回收器配置类
 * ConditionalOnProperty 通过yml中的重试配置来选择该配置类是否启用
 */
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfig {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Bean
    Queue errorQueue() {
        return new Queue("error.queue");
    }

    @Bean
    DirectExchange errorExchange() {
        return new DirectExchange("error.direct");
    }

    @Bean
    Binding errorBind(Queue errorQueue, DirectExchange errorExchange) {
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }

    @Bean
    public MessageRecoverer messageRecoverer() {
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

2.2.2. 常规交换机队列配置类 

package com.example.consumer;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 常规的RabbitMQ 交换机/队列绑定配置类
 */
@Configuration
public class RabbitMQConfig {

    @Bean
    Queue simpleQueue() {
        // 使用 QueueBuilder 创建一个持久化队列
        return QueueBuilder.durable("simple.queue").build();
    }
}

2.2.3. 消费者代码

package com.example.consumer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * 消费者
 */
@Slf4j
@Component
public class SimpleListener {

    @RabbitListener(queues = "simple.queue")
    public void listener1(String msg) throws Exception {
//        System.out.println("消费者1:人生是个不断攀登的过程【" + msg + "】");
        throw new Exception();
    }
}

2.2.4. 消费者yml配置 

# 消费者application.yml配置
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: Wangzhexiao
    password: Wangzhexiao
    virtual-host: /hangzhou
    listener:
      simple:
        prefetch: 1
        acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack
        # 消费者重试机制配置
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初始的失败等待时长为1秒
          multiplier: 1 # 下次失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false




2.2.5. 生产者代码 

package com.example.publisher;

import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

/**
 * 生产者
 */
@Slf4j
@SpringBootTest
class PublisherApplicationTests {

    @Resource
    private RabbitTemplate rabbitTemplate;

    @Test
    void test() {
        rabbitTemplate.convertAndSend("simple.queue", "只要学不死,就往死里学!");
    }
}

2.2.6. 生产者yml配置 

# 生产者application.yml配置
spring:
  rabbitmq:
    # MQ连接配置
    host: 127.0.0.1
    port: 5672
    username: Wangzhexiao
    password: Wangzhexiao
    virtual-host: /hangzhou





 2.2.7. 运行效果

最终效果是,我们在消费者的代码逻辑中会抛出异常,消息在反复投递消费失败后被重新入列到我们定义的异常交换机队列中:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2206501.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【瑞萨RA8D1 CPK开发板】串口的使用和STDOUT输出重定向

串口 本次串口的使用关于时钟导致串口的波特率不对,坑了我很久的时间 使能时钟 串口发现一个问题就是,只能使用下边的时钟配置,修改时钟源和分频系数都会导致串口波特率不正常,这种问题出现在mdkrasc的使用场景之下&#xff1b…

bclinux安装minio和mc及从服务器上下载文件

下载MinIO服务器二进制文件 访问MinIO的官方网站或使用wget、curl等工具直接从MinIO的官方GitHub存储库下载最新版本的MinIO服务器二进制文件。例如,使用以下命令: 下载命令:wget https://dl.min.io/server/minio/release/linux-amd64/ 授…

Hadoop三大组件的工作原理

Hadoop三大组件的工作原理 一、引言 Hadoop是一个开源的分布式计算框架,在大数据处理领域具有举足轻重的地位。其核心组件包括HDFS(分布式文件系统)、MapReduce(分布式计算框架)和YARN(资源管理系统&…

Vue3 ECharts看板

获取 ECharts - 入门篇 - 使用手册 - Apache ECharts npm install echarts <template><div id"main" style"height:400px;"></div> </template><script lang"ts" setup> import { ref, onMounted } from "…

AcWing 905:区间选点 ← 贪心算法

【题目来源】https://www.acwing.com/problem/content/907/【题目描述】 给定 N 个闭区间 [ai,bi]&#xff0c;请你在数轴上选择尽量少的点&#xff0c;使得每个区间内至少包含一个选出的点。 输出选择的点的最小数量。 位于区间端点上的点也算作区间内。【输入格式】 第一行包…

【论文阅读笔记】End-to-End Object Detection with Transformers

代码地址&#xff1a;https://github.com/facebookresearch/detr 论文小结 本文是Transformer结构应用于目标检测&#xff08;OD&#xff09;任务的开山之作。方法名DETE&#xff0c;取自Detection Transformer。   作为2020年的论文&#xff0c;其表现精度在当时也不算高的…

Linux:信号保存与处理

使用kill -l命令查看信号&#xff1a; 信号量和信号确实一点关系没有 信号是操作系统发出的进程与进程之间的通知于中断&#xff0c;是进程之间时间异步通知的一种方式 先了解同步通信&#xff1a;同步通信是一种比特同步通信技术&#xff0c;要求发收双方具有同频同相的同步…

学以致用 SAP HCM 顾问excel函数实战系列

EXCEL函数&#xff1a;在上学的时候&#xff0c;对word、excel、PPT感觉都很简单&#xff0c;稀里糊涂的学&#xff0c;稀里糊涂的忘&#xff0c;然后走向工作岗位的时候&#xff0c;突然发现这三大宝剑无比锋利&#xff0c;可惜自己太菜&#xff0c;曾经努力学习&#xff0c;但…

前端 | Uncaught (in promise) undefined

前端 | Uncaught (in promise) undefined 最近开发运行前端项目时&#xff0c;经常预计控制台报错 &#xff0c;如下图&#xff1a; 这里我总结下&#xff0c;这种报错的场景和原因&#xff0c;并通过实际代码案例帮助小伙伴更好理解下 。 文章目录 前端 | Uncaught (in promi…

数据丢失的终极克星来了!EasyRecovery17数据恢复软件

数据丢失的终极克星来了&#xff01; 各位亲爱的朋友们&#xff0c;你们有没有经历过那种“哎呀妈呀&#xff0c;重要文件找不到了&#xff01;”的绝望时刻&#xff1f;别急&#xff0c;今天我要向你们安利一款神器——EasyRecovery17数据恢复软件&#xff0c;简直是我们这些“…

Javascript笔试题目(二)

1.如何使用ES6语法对函数所有参数进行求和?请写出具体代码 function sumAll(...args) { // args 是一个数组&#xff0c;包含了函数接收到的所有参数 return args.reduce((accumulator, currentValue) > accumulator currentValue, 0); } // 测试函数 console.log…

查找企业联系电话的几种方法

在商业合作和销售拓展的过程中&#xff0c;找到企业的联系电话是至关重要的一步。无论是精准营销还是客户开发&#xff0c;拥有有效的联系方式可以大大提高成功率。那么&#xff0c;如何快速有效地查找企业联系电话呢&#xff1f;下面介绍几种常见的方法&#xff0c;以及如何借…

摩托车一键启动智能钥匙提高了便捷性和安全性

摩托车一键启动无钥匙进入功能是一种便捷的智能配置 通过PKE智能感应技术实现无钥匙启动&#xff1a; 技术原理与操作 摩托车一键启动无钥匙进入系统采用了RFID无线射频技术和车辆身份编码识别系统&#xff0c;实现了双重射频系统、双重防盗保护。操作简便&#xff0c;只需携…

最后倒计时,SIGMOD 2025全球数据库盛会,你准备好了吗?

一、会议资讯&#xff1a; ACM SIGMOD/PODS International Conference on Management of Data是计算机科学领域中一个顶级的国际学术会议&#xff0c;专注于数据库管理和数据系统的前沿研究。 SIGMOD Conference 每年由 ACM 主办&#xff0c;汇集了全球顶尖的学者、研究人员和…

【使用Java循环输出菱形,空心金字塔】

使用Java循环输出图形的探索之旅 在这篇博客中&#xff0c;我们将探讨如何使用Java中的循环结构来输出各种几何图形&#xff0c;特别是金字塔和菱形。通过这一过程&#xff0c;不仅能够加深对循环的理解&#xff0c;还能提升编程能力。 1. 打印矩形 首先&#xff0c;我们从最…

初级学习:Python实现AI并搭建

随着人工智能(AI)的迅猛发展,越来越多的人希望能够学习如何通过编程实现AI应用。Python,因为其简洁易用,被广泛认为是AI开发的理想编程语言。本文将介绍Python在AI开发中的基础应用,帮助初学者入门并构建自己的AI项目。 为什么选择Python 在了解如何用Python实现AI之前,…

探索 Python 装饰器的终极利器:wrapt 库

文章目录 探索 Python 装饰器的终极利器&#xff1a;wrapt 库背景&#xff1a;装饰器的进化之旅初识 wrapt&#xff1a;它究竟是什么&#xff1f;安装 wrapt&#xff1a;简单几步&#xff0c;轻松上手函数的魔法&#xff1a;wrapt 的简单使用创建简单装饰器装饰器的高级用法&am…

浅谈导热油蒸汽发生器在电子及半导体行业中应用

导热油蒸汽发生器在电子或半导体行业制造过程中有着广泛的应用。为促进温控行业交流发展上海中壹展览等于十二月在上海举办主办首届“TCU China温控展”。以下是一些具体的应用场景和优势&#xff1a; 应用场景 1.清洗与洁净室控制: 半导体生产过程中&#xff0c;生产设备和工…

TikTok代理IP全面使用指南

对于那些希望通过社交媒体打造个人品牌的人来说&#xff0c;TikTok是现在热门的平台&#xff0c;他的流量与曝光不可小觑&#xff0c;相信很多跨境营销会选择他进行多账号营销。问题是&#xff0c;TikTok多账号很容易遇到封禁问题&#xff0c;那么如何解决&#xff1f; 一、什么…

第十三章 RabbitMQ之消息幂等性

目录 一、引言 二、消息幂等解决方案 2.1. 方案一 2.2. 方案二 一、引言 幂等是一个数学概念&#xff0c;用函数表达式来描述是这样的&#xff1a;f(x) f(f(x)) 。在程序开发中&#xff0c;则是指同一个业务&#xff0c;执行一次或多次对业务状态的影响是一致的。有些业务…