rabbitmq-amqp事务消息+消费失败重试机制+prefetch限流

news2025/4/27 1:56:07

1. 安装和配置

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

1.2 yml 配置

### 生产端的配置
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: / # 虚拟主机
    username: guest
    password: guest
    publisher-returns: true  #确认消息已经发送到队列,生产上无需开启
    # simple:同步等待confirm结果,直到超时
    #开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallback
    publisher-confirm-type: correlated #确认消息已发送到交换机
## 生产端的配置
spring:
  rabbitmq:
    host: localhost
    port: 5672
    virtual-host: / # 虚拟主机
    username: guest
    password: guest
    publisher-returns: true  #确认消息已经发送到队列,生产上无需开启
    # simple:同步等待confirm结果,直到超时
    #开启消息确认 :correlated:异步回调,MQ返回结果时会回调这个ComfirmCallback
    publisher-confirm-type: correlated #确认消息已发送到交换机

2.生产端的消息确认发送代码

/**
 * (1) RabbitTemplate.ConfirmCallback 这个接口是用来确定消息是否到达交换器的
 * (2) RabbitTemplate.ReturnsCallback 这个则是用来确定消息是否到达队列的,未到达队列时会被调用
 */
@Service
@Slf4j
public class RabbitMqConfirmCallback implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{

    private RabbitTemplate rabbitTemplate;


    public void queueConfirm(Map<String, String> map) {
        // 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息
        rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));
        // 故意输入一个不存在的交换机
        rabbitTemplate.convertAndSend("confirm_exchange_2222", "confirm_key1", map, new CorrelationData("22222"));
        // 故意输入一个不存在的队列
        rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1_333333", map, new CorrelationData("3333"));
        log.info("Confirm -- 消息--发送结束");
    }


    /**
     * 需要给ConfirmCallback赋值 不然不会走回调方法,默认是null
     * //将当前类的实例设置为 RabbitMQ 的确认回调处理器,跟下面的confirm方法联合使用,
     * // 还需要打开配置:spring: rabbitmq: publisher-confirm-type: correlated
     */
    @PostConstruct
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Autowired
    public RabbitMqConfirmCallback(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
//        rabbitTemplate.setConfirmCallback(this);
    }

 
    /** 此方法用于监听消息是否发送到交换机
     * 回调
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("confirm -- 监听消息成功发送到交换机--回调id = {}", correlationData);

        } else {
            log.info("confirm -- 消息没有发送到交换机回调id= {},消息发送失败:{}。", correlationData, cause);
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        log.info("消息未到达队列 --- returnedMessage= " + returnedMessage);
    }
}

2.2 生产端的截图

3.消费端代码

@Component
@Slf4j
public class RabbitConfirmConsumer {

    // 交换机
    public static final String confirm_exchange_name = "confirm_exchange";
    // 队列
    public static final String confirm_queue_name="confirm_queue";
    // routingkey
    public static final String confirm_routing_key = "confirm_key1";

    // 声明交换机
    @Bean("confirmExchange")
    public DirectExchange confirmExchange(){
        return new DirectExchange(confirm_exchange_name);
    }
    // 声明队列
    @Bean("confirmQueue")
    public Queue confirmQueue() {
        return QueueBuilder.durable(confirm_queue_name).build();
    }
    // 绑定队列到交换机
    @Bean
    public Binding queueBingExchange(Queue confirmQueue,DirectExchange confirmExchange){
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(confirm_routing_key);
    }


    /**
     * ack:成功处理消息,RabbitMQ从队列中删除该消息
     * nack:消息处理失败,RabbitMQ需要再次投递消息
     * reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
     */
    @RabbitListener(queues = "confirm_queue")
    public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,
                                @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        //获取消息的唯一标记
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);
        if(message.getBody() != null){
            //获取消息的内容
            byte[] body = message.getBody();
            //basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
            channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功
            log.info("接收的消息为:{}", map);
        }else{
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
            log.info("未消费数据");
        }
    }

}

3.2消费端截图

4 消费端重试机制

@Service
@Slf4j
public class RabbitRetryConsumer {

    @Bean
    public Queue retryQueue(){
        Map<String,Object> params = new HashMap<>();
        return QueueBuilder.durable("retry_queue").withArguments(params).build();

    }

    @Bean
    public TopicExchange retryTopicExchange(){
        return new TopicExchange("retry_exchange",true,false);
    }
    //队列与交换机进行绑定
    @Bean
    public Binding BindingRetryQueueAndRetryTopicExchange(Queue retryQueue, TopicExchange retryTopicExchange){
        return BindingBuilder.bind(retryQueue).to(retryTopicExchange).with("retry_key");
    }


    int count  = 0;
    //测试重试,需要在yml配置 retry
    @RabbitListener(queues = "retry_queue")
    public void retryConsumer(Map<String, String> map, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        log.info("retryConsumer 重试次数 = {},重试接收数据为:{}",count++, map);
        int i = 10 /0;
        channel.basicAck(tag,false);
    }

}

4.2 重试机制截图

5. 限流设置--消费端

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual # 开启手动确认模式
        prefetch: 5 #控制消费者从队列中预取(prefetch)消息的数量,以此来实现流控制

5.1 生产端--发送19条信息

@GetMapping("/xianliu")
    public String xianliuTest(){
        for(int i = 1; i < 20; i++){
            Map<String, String> map = new HashMap<>();
            map.put("key","限流测试--" + i);
            rabbitMqProducer.xianliuTest(map);
        }
        return "限流测试发送成功";
    }


/***
     * 限流消息的发送测试
     */
    public void xianliuTest(Map<String, String> map) {
        // 第一个参数表示交换机,第二个参数表示 routing key,第三个参数即消息
        rabbitTemplate.convertAndSend("confirm_exchange", "confirm_key1", map, new CorrelationData("111"));
    }

5.2 消费端

 /**
     * ack:成功处理消息,RabbitMQ从队列中删除该消息
     * nack:消息处理失败,RabbitMQ需要再次投递消息
     * reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息
     */
    @RabbitListener(queues = "confirm_queue")
    public void consumerConfirm(Message message, Channel channel, @Payload Map<String, Object> map,
                                @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
        //获取消息的唯一标记
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        log.info("接收的消息为:{},消息的唯一标记={}, 直接注入的tag= {}",message, deliveryTag, tag);
        if(message.getBody() != null){
            //basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。
            //channel.basicAck(deliveryTag,false);//false 表示仅确认当前消息消费成功
            log.info("接收的消息为:{}", map);
        }else{
            //否定确认
            //channel.basicNack(deliverTag,false,true);//requeue为false,则变成死信队列
            channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
            log.info("未消费数据");
        }
    }

5.3 注释掉channel.basicAck--堵塞了

5.4 注释掉了 prefetch -- 19条全部被消费,即使没有ack

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2312388.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

golang 从零单排 (一) 安装环境

1.下载安装 打开网址The Go Programming Language 直接点击下载go1.24.1.windows-amd64.msi 下载完成 直接双击下一步 下一步 安装完成 环境变量自动设置不必配置 2.验证 win r 输入cmd 打开命令行 输入go version

康谋分享 | 3DGS:革新自动驾驶仿真场景重建的关键技术

随着自动驾驶技术的迅猛发展&#xff0c;构建高保真、动态的仿真场景成为了行业的迫切需求。传统的三维重建方法在处理复杂场景时常常面临效率和精度的挑战。在此背景下&#xff0c;3D高斯点阵渲染&#xff08;3DGS&#xff09;技术应运而生&#xff0c;成为自动驾驶仿真场景重…

【够用就好008】开新坑自学esb32烧录进军物联网和嵌入式

见字如面&#xff0c;这里是AKA AIGC创意人竹相左边。 学习使用了三年的AI工具&#xff0c;现在最大的自信就是业余时间可以学习任何自己感兴趣的事&#xff0c;感觉手搓火箭也不是梦。 今天开个新坑&#xff0c;也是逐步探索想要进入的新世界。物联网&#xff08;IoT&#…

Go红队开发—格式导出

文章目录 输出功能CSV输出CSV 转 结构体结构体 转 CSV端口扫描结果使用CSV格式导出 HTML输出Sqlite输出nmap扫描 JSONmap转json结构体转jsonjson写入文件json编解码json转结构体json转mapjson转string练习&#xff1a;nmap扫描结果导出json格式 输出功能 在我们使用安全工具的…

element-plus中table组件的使用

1、table组件的基本使用 注意&#xff1a; ①对象集合&#xff0c;要从后端查询。 ②prop是集合中的对象的属性名&#xff1b;label是表格表头的名称。 2、将性别一列的71转为男&#xff0c;72转为女 问题描述&#xff1a; 解决步骤&#xff1a; ①将el-table-column变成双标签…

Go加spy++隐藏窗口

最近发现有些软件的窗口就像狗皮膏药一样&#xff0c;关也关不掉&#xff0c;一点就要登录&#xff0c;属实是有点不爽了。 窗口的进程不能杀死&#xff0c;但是窗口我不想要。思路很简单&#xff0c;用 spy 找到要隐藏的窗口的句柄&#xff0c;然后调用 Windows 的 ShowWindo…

网络安全通信架构图

&#x1f345; 点击文末小卡片 &#xff0c;免费获取网络安全全套资料&#xff0c;资料在手&#xff0c;涨薪更快 在安全通信里面我经常听到的2个东西就是SSL和TLS&#xff0c;这2个有什么区别呢&#xff1f;以及HTTPS是怎么通信的&#xff1f;包括对称加密、非对称加密、摘要、…

当中国“智算心跳”与全球共振:九章云极DataCanvas首秀MWC 2025

3月3日&#xff0c;西班牙巴塞罗那&#xff0c;全球通信与科技领域的盛会“2025世界移动通信大会&#xff08;MWC 2025&#xff09;”正式拉开帷幕。中国人工智能基础设施领军企业九章云极DataCanvas公司以全球化战略视野与硬核技术实力&#xff0c;全方位、多维度地展示了在智…

Clion快捷键、修改字体

文章目录 一、Clion快捷键1.撤销&#xff1a;crtl Z2.重做&#xff1a;crtl shift Z3.删除该行&#xff1a;crtl Y4.多行后退&#xff1a;选中多行 Tab5.多行缩进&#xff1a;选中多行 shift Tab 二、修改注释的斜体 一、Clion快捷键 1.撤销&#xff1a;crtl Z 2.重做…

基于PySide6的CATIA零件自动化着色工具开发实践

引言 在汽车及航空制造领域&#xff0c;CATIA作为核心的CAD设计软件&#xff0c;其二次开发能力对提升设计效率具有重要意义。本文介绍一种基于Python的CATIA零件着色工具开发方案&#xff0c;通过PySide6实现GUI交互&#xff0c;结合COM接口操作实现零件着色自动化。该方案成…

在Uniapp中实现特殊字符弹出框并插入输入框

在开发Uniapp项目时&#xff0c;我们经常会遇到需要用户输入特殊字符的场景。为了提升用户体验&#xff0c;我们可以封装一个特殊字符弹出框&#xff0c;用户点击键盘图标后弹出该字符集&#xff0c;选择字符后自动插入到输入框中。本文将详细介绍如何实现这一功能。 1. 功能概…

深入解析 BitBake 日志机制:任务调度、日志记录与调试方法

1. 引言&#xff1a;为什么 BitBake 的日志机制至关重要&#xff1f; BitBake 是 Yocto 项目的核心构建工具&#xff0c;用于解析配方、管理任务依赖&#xff0c;并执行编译和打包任务。在 BitBake 构建过程中&#xff0c;日志记录机制不仅用于跟踪任务执行情况&#xff0c;还…

《原型链的故事:JavaScript 对象模型的秘密》

原型链&#xff08;Prototype Chain&#xff09; 是 JavaScript 中实现继承的核心机制。每个对象都有一个内部属性 [[Prototype]]&#xff08;可以通过 __proto__ 访问&#xff09;&#xff0c;指向其原型对象。每个对象都有一个原型&#xff0c; 原型本身也是一个对象&#xf…

Linux 配置静态 IP

一、简介 在 Linux CentOS 系统中默认动态分配 IP 地址&#xff0c;每次启动虚拟机服务都是不一样的 IP&#xff0c;因此要配置静态 IP 地址避免每次都发生变化&#xff0c;下面将介绍配置静态 IP 的详细步骤。 首先先理解一下动态 IP 和静态 IP 的概念&#xff1a; 动态 IP…

【Python 数据结构 10.二叉树】

目录 一、二叉树的基本概念 1.二叉树的定义 2.二叉树的特点 3.特殊的二叉树 Ⅰ、斜树 Ⅱ、满二叉树 Ⅲ、完全二叉树 Ⅳ、完全二叉树和满二叉树的区别 4.二叉树的性质 5.二叉树的顺序存储 Ⅰ、完全二叉树 Ⅱ、非完全二叉树 Ⅲ、稀疏二叉树 6.二叉树的链式存储 7.二叉树的遍历概念…

SwanLab简明教程:从萌新到高手

目录 1. 什么是SwanLab&#xff1f; 1.1 核心特性 2. 安装SwanLab 3. 登录SwanLab账号&#xff08;云端版&#xff09; 4. 5分钟快速上手 更多案例 5. SwanLab功能组件 5.1 图表视图 5.2 表格视图 5.3 硬件监控 5.4 环境记录 5.5 组织协同 6. 训练框架集成 6.1 基…

SQLiteStudio:一款免费跨平台的SQLite管理工具

SQLiteStudio 是一款专门用于管理和操作 SQLite 数据库的免费工具。它提供直观的图形化界面&#xff0c;简化了数据库的创建、编辑、查询和维护&#xff0c;适合数据库开发者和数据分析师使用。 功能特性 SQLiteStudio 提供的主要功能包括&#xff1a; 免费开源&#xff0c;可…

贝塞尔曲线学习

1、一阶贝塞尔曲线 一阶贝塞尔曲线其实是一条直线——给定点 P0、P1&#xff0c;线性贝塞尔曲线就是一条两点之间的直线&#xff0c;公式如下&#xff1a; 一阶曲线很好理解, 就是根据t来线性插值。 void MainWindow::mousePressEvent(QMouseEvent *e) {list.append(e->pos…

机器学习(六)

一&#xff0c;决策树&#xff1a; 简介&#xff1a; 决策树是一种通过构建类似树状的结构&#xff08;颠倒的树&#xff09;&#xff0c;从根节点开始逐步对数据进行划分&#xff0c;最终在叶子节点做出预测结果的模型。 结构组成&#xff1a; 根节点&#xff1a;初始的数据集…

【江协科技STM32】ADC数模转换器-学习笔记

ADC简介 ADC&#xff08;Analog-Digital Converter&#xff09;模拟-数字转换器ADC可以将引脚上连续变化的模拟电压转换为内存中存储的数字变量&#xff0c;建立模拟电路到数字电路的桥梁&#xff0c;ADC是一种将连续的模拟信号转换为离散的数字信号的设备或模块12位逐次逼近型…