springboot rabbitmq 非阻塞重试机制实现

news2025/1/11 23:53:32

重试的应用场景

比如,系统之间同步数据,A系统发送数据给B系统,因为网络原因或者B系统正在重启,可能收不到信息,为了确保B能收到消息就得重试几次;经典的比如,微信支付回调

对后台通知交互时,如果微信收到商户的应答不符合规范或超时,微信认为通知失败,微信会通过一定的策略定期重新发起通知,尽可能提高通知的成功率,但微信不保证通知最终能成功。(通知频率为15s/15s/30s/3m/10m/20m/30m/30m/30m/60m/3h/3h/3h/6h/6h

  • 总计 24h4m)

支付宝支付回调

程序执行完后必须打印输出 success。如果商家反馈给支付宝的字符不是 success 这 7
个字符,支付宝服务器会不断重发通知,直到超过 24 小时 22 分钟。一般情况下,25 小时以内完成 8
次通知(通知的间隔频率一般是:4m,10m,10m,1h,2h,6h,15h)。

问题背景

spring-retry的@Retryable方式,是阻塞式的,rabbitmq使用这种方式,如果重试次数过多,后边的消息会阻塞一直得不到处理,重试次数过少则不能保证对方收到回调;那提高消费者数量可以吗?也是不行的,最终会耗尽所有消费者。这就相当于你去银行办业务,轮到你时,你要办的业务正好办不了,窗口就一直等着,后边的人无法办业务;如果增加窗口的数量,同样的原因,最终导致全部窗口阻塞;

解决思路

解决思路是,每个消息只分配给一次机会,失败后,放入延迟队列,然后处理下一个消息,到达延迟时间,消息再次入列,这样消息不会阻塞。这就相当于,轮到你时,要办的业务办不了,那么就让你等一会,先让后边的人办,x时间后再把你放到窗口的队列中;
用一张图概括就是RabbitMQ Non-Blocking Retry Solutions in SpringBoot Solution B — Delay Plugin
在这里插入图片描述

具体步骤

安装延迟插件

首先rabbitmq要安装rabbitmq-delayed-message-exchange插件,我的rabbitmq用的是3.8.27所以使用对应的插件3.8.17

找到放插件的目录

root@xx:/# rabbitmq-plugins directories -s
Plugin archives directory: /opt/rabbitmq/plugins
Plugin expansion directory: /var/lib/rabbitmq/mnesia/rabbit@3f09bf7c586f-plugins-expand
Enabled plugins file: /etc/rabbitmq/enabled_plugins

将插件,放到/opt/rabbitmq/plugins里,启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

springboot配置

application.yml配置

#RabbitMQ配置
  rabbitmq:
    host: localhost
    port: 5672
    username: username
    password: password
    listener:
      simple:
       # consumer收到消息,不回复ack
        acknowledge-mode: NONE
        # 抛出异常时,不再入队列
        default-requeue-rejected: false

#自定义的mq errorhandler,处理失败时的重试次数
retry:
  # 最大重试次数
  max_retry: 5
  # 延迟(毫秒)
  delay: 1000

创建延迟exchange

搜了很多博客,都说要先通过management ui手动创建exchange,其实是不需要的,照下边这样就可以用代码创建exchange。

@Slf4j
@Configuration
public class MqConfig {
    @Autowired
    private AmqpTemplate amqpTemplate;
 /**
     * 延迟交换机名称
     */
    public static final String DELAY_EXCHANGE_NAME="delay-exchange";
    /**
     * 延迟交换机的类型
     */
    public static final String DELAY_EXCHANGE_TYPE = "x-delayed-message";
      /**
     * 错误重试延迟
     */
    private static  Integer DELAY;
    /**
     * 最大重试次数,2880次*30秒=24小时,也就是说,默认重试24小时
     */
    private static  Integer MAX_RETRY;
    @Value("${retry.max_retry}")
    public void setMaxRetry(Integer maxRetry){
        MAX_RETRY = maxRetry;
    }
    @Value("${retry.delay}")
    public void setDelay(Integer delay){
        DELAY = delay;
    }
/**
     * 创建延迟交换机,必须先创建才能监听
     * @return
     */
    @Bean
    public CustomExchange delayExchange() {
        CustomExchange customExchange = new CustomExchange(MqConfig.DELAY_EXCHANGE_NAME,
                MqConfig.DELAY_EXCHANGE_TYPE);
        customExchange.getArguments().put("x-delayed-type",
                ExchangeTypes.DIRECT);
        return customExchange;
    }
// mq的异常处理器
@Bean
    public RabbitListenerErrorHandler retryErrorHandler() {
        RabbitListenerErrorHandler errorHandler = (amqpMessage, message, exception) -> {
            log.error("message监听器出错了",exception);
            MessageProperties messageProperties = amqpMessage.getMessageProperties();
            Map<String, Object> headers = messageProperties.getHeaders();
            Integer xRetryCount = ((Integer) headers.get("retry-count"));
            //根据失败次数,决定是否继续发送到延迟队列
            if (xRetryCount == null) {
                xRetryCount = MAX_RETRY;
            }
            Integer retriedCount = (Integer) headers.get("retried-count");
            if (retriedCount == null) {
                retriedCount = 1;
            }
            log.info("已执行次数:{},最大重试次数:{}",retriedCount,xRetryCount);
            if(retriedCount < xRetryCount){
                log.info("已执行次数小于最大重试次数");
                retriedCount++;
                headers.put("retried-count",retriedCount);
                String routingKey = messageProperties.getConsumerQueue();
                messageProperties.setDelay(DELAY);
                log.info("延迟:{}毫秒", DELAY);
                log.info("路由key:{}",routingKey);
                amqpTemplate.send(DELAY_EXCHANGE_NAME, routingKey, amqpMessage);
                return null;
            }
            log.info("已执行次数达到最大重试次数了,不再进行重试");
            return null;
        };
        return errorHandler;
    }
    }

监听器

 /**
     * 接收到消息后,将消息传给目的地
     * @param message
     */
    @RabbitListener(bindings =@QueueBinding(value = @Queue("队列名称"),
            exchange = @Exchange(value = MqConfig.DELAY_EXCHANGE_NAME,
                    type = MqConfig.DELAY_EXCHANGE_TYPE),
            key = "队列名称"),errorHandler = "retryErrorHandler")
    public void doSynDataToThirdApp(String message){
        //http请求发送给第三方,如果出错,则会执行errorHandler  ,从而实现重试
    }

简单来说,就是mq监听器处理消息失败,errorHandler会捕获到,如果未达到重试次数,则放入延迟队列,延迟时间后,再次处理;如果已经达到重试次数,则结束,不再入队列;

参考

  • Message Listener Container Configuration
  • Scheduling Messages with RabbitMQ
  • RabbitMQ Delayed Message Plugin
  • RabbitMQ Non-Blocking Retry Solutions in SpringBoot

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/172119.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

VScode远程连接Linux

文章目录一、下载安装二、使用三、连接四、基本操作五、VScode内置命令行六、推荐插件一、下载安装 下载的问题就不用多说了把&#xff0c;可能存在的问题就是下载的速度比较慢 前往官网进行下载&#xff1a;前往官网找到适合自己的版本&#xff1a; 但是由于官网是国外的&am…

DT-6_TTL-WiFi透传模块介绍

DT-6_TTL-WiFi透传模块简介TTL-WiFi模块基于ESP-M2WiFi 模块研发&#xff0c;引出串口TTL、EN、STATE等引脚。产品内置我司最新版本的串口透传固件可完成设备TTL 端口到WiFi/云的数据实时透传&#xff0c;具备低功耗控制&#xff0c;状态指示等功能。本模块可直接取代原有的有线…

【手写 Vue2.x 源码】第三十二篇 - diff算法-乱序比对

一&#xff0c;前言 上篇&#xff0c;diff算法-比对优化&#xff08;下&#xff09;&#xff0c;主要涉及以下几个点&#xff1a; 介绍了儿子节点比较的流程介绍并实现了头头、尾尾、头尾、尾头4种特殊情况比对 本篇&#xff0c;继续介绍 diff算法-乱序比对 二&#xff0c;乱…

MATLAB | 全网最全边际图绘制模板(直方图、小提琴图、箱线图、雨云图、散点图... ...)

如标题所言&#xff0c;这应该是全网最全的边际图绘制模板&#xff0c;中心图有8种格式&#xff0c;边际图有11种格式&#xff0c;共计88种组合&#xff0c;另外模板中给了8款配色&#xff0c;我愿称其为888组合&#xff0c;只需要更换一下数据就能绘制出各种类型的边际图: 甚至…

中国机器视觉市场研究报告

目录 机器视觉行业概述机器视觉行业发展现状机器视觉行业典型企业分析机器视觉行业未来发展趋势 机器视觉行业概述 机器视觉定义 机器视觉&#xff08;Machine Vision&#xff0c;MV&#xff09;是人工智能正在快速发展的一个分支。根据美国制造工程师协会&#xff08;SME&…

数字孪生虚拟电厂负荷控制系统可视化

随着国家“双碳”及“构建以新能源为主体的新型电力系统”等目标的提出&#xff0c;清洁化、数字化越来越成为电力系统面临的迫切需求&#xff0c;负控系统的发展对电力营销现代化建设具有重要的意义。负控管理系统是一个着眼于全面加强电力信息管理的&#xff0c;集负荷控制、…

Tips for Confluence Administrators: Part 2

Part 1中&#xff0c;我们谈到了 Confluence 自定义配置的案例&#xff0c;例如&#xff1a;如何禁用附件下载&#xff1f;如何将iFrame放入Confluence&#xff1f;如何使我的页面完全私有&#xff1f;如何防止空间管理员删除他们的空间&#xff1f;任何软件都有bug&#xff0c…

Minecraft 1.19.2 Forge模组开发 11.Mixin

我们本次使用Mixin在1.19.2中制作一个属于自己的不死图腾。 演示效果演示效果演示效果 什么是Mixin&#xff1f; 简单来说是通过注入一些我们的代码&#xff0c;达到对MC原版内容的修改。 详细内容可以参考Minecraft 17.1 Mixin 1.首先我们需要在开发包中引入mixin的依赖&a…

深度学习——双向循环神经网络(笔记)

双向循环神经网络&#xff1a; ①对于序列来讲&#xff0c;假设的目标是&#xff1a;给定观测的情况下&#xff08;在时间序列的上下文或语言模型的上下文&#xff09;&#xff0c;对于下一个输出进行建模 ②对于序列模型来讲&#xff0c;可以从前往后看&#xff0c;也可以从…

Servlet基础

Servlet1. Servlet概述2. 快速入门3. 执行原理4. 生命周期方法5. Servlet3.06. 体系结构7. 相关配置8. HTTP8.1 概念8.2 Request8.3 Response8.4 ServletContext综合案例:文件下载&#xff1a;1. Servlet概述 Servlet是JavaEE规范(接口)之一Servlet是JavaWeb三大组件之一&…

使用sdk-npi-enablement-tool生成SVD文件和芯片头文件

使用sdk-npi-enablement-tool生成SVD文件和芯片头文件 文章目录使用sdk-npi-enablement-tool生成SVD文件和芯片头文件IntroductionOverviewOperation Steps创建芯片配置文件yaml填充外设模块的寄存器映射描述文件xlsx验证生成芯片头文件ConclusionIntroduction 芯片验证与测试…

【Linux杂篇】Linux系统终端常用配置文件更改

目录列表&#xff1a; 1.alias别名永久保存 2.解决vim文件没有颜色的问题 3.vim插件supertap插件安装&#xff08;可支持自动补全&#xff0c;非函数代码补全&#xff0c;仅支持在当前编辑文档内补全&#xff09; 4.vim插件管理 5.YCM下载 6.解决vim中使用backspace无法删…

windows安装npm和cnpm

npm: 代码的包管理器&#xff0c;但是服服器在国外&#xff0c;每一次启动项目都要下载一些依赖&#xff0c;耗时之久&#xff0c;官网下载链接戳 npm。 cnpm&#xff1a;这是淘宝团队出的npm的镜像&#xff0c;可用此代替官方的只读版本&#xff0c;官网链接 cnpm。 先安装np…

Redis6学习笔记【part3】配置文件与订阅/发布

一.Redis配置文件 1.Units单位 配置大小单位,开头定义了一些基本的度量单位&#xff0c;只支持bytes&#xff0c;不支持bit。其中 GB、Gb 大小写不敏感。 2.Include包含 类似 jsp 中的 include 引入公共页面 &#xff0c;redis 在多实例的情况也可以把公用的配置文件提取出来…

9 大指标分析 Solana 的熊市危机

Daniel, 2023 年 1 月Solana 是一个去中心化的区块链网络&#xff0c;由 Solana 实验室设计并在2020年推出&#xff0c;具有快速、可扩展和安全的特点。由于其快速的交易速度和低交易费用&#xff0c;Solana 在 2020 年和 2021 年获得了极大的关注&#xff0c;这使得它对去中心…

远程仓库操作

添加远程仓库 新建一个文件夹&#xff1a; 文件夹右键打开git bash: 初始化为git仓库&#xff1a; 在码云上新建一个git仓库&#xff1a; 复制链接&#xff1a; 在文件夹里使用git bash&#xff1a; git remote add<shortname><url> 添加一个新的远程仓库&…

制作tomcat的docker镜像

环境信息&#xff1a;MacBook Pro&#xff08;M1&#xff09;VMware-fusion(Player 版本 13.0.0 (20802013))Ubuntu 22.10tomcat镜像&#xff1a;centos-7.9.2009jdk1.8.0_341 apache-tomcat-8.5.84制作步骤&#xff1a;&#xff08;1&#xff09;下载好tomcat/jdk(我是在macbo…

学习react

这里写自定义目录标题学习React学习React 安装react的脚手架 npm i create-react-app -g通过脚手架创建demo D:\dev_project>create-react-app react-demo You are running Node 11.1.0. Create React App requires Node 14 or higher. Please update your version of No…

找不到合适好用的redis客户端工具?试试官方的客户端工具RedisInsight

这里是weihubeats,觉得文章不错可以关注公众号小奏技术&#xff0c;文章首发。拒绝营销号&#xff0c;拒绝标题党 背景 之前使用的redis客户端工具是AnotherRedisDesktopManager AnotherRedisDesktopManager github地址: https://github.com/qishibo/AnotherRedisDesktopManag…

千锋JavaScript学习笔记

千锋JavaScript学习笔记 文章目录千锋JavaScript学习笔记写在前面1. JS基础1.1 变量1.2 数据类型1.3 数据类型转换1.4 运算符1.5 条件1.6 循环1.7 函数1.8 对象数据类型1.9 数组和排序1.10 数组常用方法&#xff1a;1.11 字符串常用方法1.12 数字常用方法1.13 时间常用方法1.14…