RabbitMq手动ack的超简单案例+Confirm和Return机制的配置和使用

news2024/11/20 10:31:44

最简单的例子

先简单介绍一下这三个方法

basicAck

表示确认成功,使用此方法后,消息会被rabbitmq broker删除

 

basicNack

表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列

basicReject

拒绝消息,与basickNack区别在于不能进行批量操作,其他用法很相似

形参

multiple表示是否批量处理

requeue表示是否重复入队


deliverTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliverTag都会增加。手动消息确认模式下,我们可以对指定deliverTag的消息进行ack、nack、reject等操作。

mutiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 delivertag 的消息


依赖

  <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.51</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>
    </dependencies>

springboot配置

spring:
  rabbitmq:
    host: 192.168.88.130
    port: 5672
    username: itcast
    password: 123321
    virtual-host: /   #虚拟主机,默认是/,RabbitMQ 使用虚拟主机来隔离不同的消息环境,虚拟主机用于将消息、交换器、队列和绑定隔离开来
    publisher-confirm-type: correlated #发布者消息确认功能(异步)
    listener:
      simple:
        retry:
          enabled: true  #开启消费者失败重试
          max-attempts: 5 #最大重试次数
          initial-interval: 1000ms #初始失败等待时长为1秒
          multiplier: 1 #下次失败的等待时长倍数,下次等待时长=multipiler*last-interval
        acknowledge-mode: manual #开启手动ack机制 auto是自动 none是发送后直接ack(这个不会用上的)
    publisher-returns: true #发布者返回消息功能

logging:
  level:
    com.atguigu.mq.config.MQProducerAckConfig: info

交换机和队列配置

package com.example.rabbitmq.Configuration;


import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.ImmediateRequeueMessageRecoverer;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueueConfig {

    @Bean(name = "confirmTestQueue")
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue", true, false, false);
    }

    @Bean(name = "confirmTestExchange")
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirmTestExchange");
    }

    @Bean
    public Binding confirmTestFanoutExchangeAndQueue(
            @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
            @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
    }



}

消费者

我设置了最多重试三次

有一个小细节

就是他重试的时候是在队头重试的

所以他重试的时候会阻塞一段时间,此时后面的消息是不能消费的

package com.example.rabbitmq.Listener;


import com.alibaba.fastjson2.JSON;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import com.rabbitmq.tools.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

@Slf4j
@Component
public class ReceiverMessage1 {

    //用来存放消息唯一标识的map,设置一定的重试次数
    public static final ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();

    @RabbitListener(queues = "confirm_test_queue")
    public void getMessage3(String msg, Channel channel, Message message) throws IOException {


        //得到当前信息的唯一标识
        String messageId=message.getMessageProperties().getMessageId();

        try {
            System.out.println("成功接收到消息:" + msg);

       int i=1/0;
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
log.info("确认成功");
        } catch (Exception e) {



            map.put(messageId,map.getOrDefault(messageId, 0)+1);

            log.error("接收消息失败");
          log.info("开始重试");
          log.info(messageId);

          //重复处理失败
          if(map.get(messageId)<=3) {
              log.info("确认失败,重新入队");
             channel.basicNack(message.getMessageProperties().getDeliveryTag(),false, true);
   map.put(messageId, map.getOrDefault(messageId, 0)+1);

          }
else {

    log.info("重试仍然失败,所以我们决定丢弃这个消息");

              channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);

          }


        }


    }



    }

生产者(测试类)

package com.example.rabbitmq;

import org.apache.catalina.Executor;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doThrow;

@SpringBootTest
class RabbitmqApplicationTests {

    @Test
    void contextLoads() {
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;



    @Test
    void sendMessage() {
       rabbitTemplate.convertAndSend("confirmTestExchange",
               "confirm_test_queue", "这是一个测试消息",message -> {
           message.getMessageProperties().setMessageId(UUID.randomUUID().toString());  //把消息的唯一标识设置为UUID
           return message;
               });
    }



}

修改了messageId

因为messageId是基于交换机,内容,队列来生成的

相同的消息可能messageId是一样的

所以我发送消息的时候把底层改成了UUID


Return和Confirm机制

Confirm机制是消息发送到交换机成功或失败时的处理机制

Retrun机制是消息发送到队列失败时的处理机制

配置(加了日志输出)

package com.example.rabbitmq.Configuration;

import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class RabbitConfig implements RabbitTemplate.ReturnsCallback,RabbitTemplate.ConfirmCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

//在Bean注入前就要执行的方法
    @PostConstruct
    private  void initRabbitTemplate(){

        rabbitTemplate.setReturnsCallback(this);
        rabbitTemplate.setConfirmCallback(this);

    }


    //消息发送到交换机成功或失败时调用这个方法
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("confirm()回调函数大打印correlationData:"+correlationData);
log.info("confirm()回调函数大打印ack:"+ack);
log.info("confirm()回调函数大打印cause:"+cause);

    }

    //发送到队列失败的时候调用这个方法
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.info("消息主体: " + new String(returned.getMessage().getBody()));
        log.info("应答码: " + returned.getReplyCode());
        log.info("描述:" + returned.getReplyText());
        log.info("消息使用的交换器 exchange : " + returned.getExchange());
        log.info("消息使用的路由键 routing : " + returned.getRoutingKey());

    }
}

遇到的小问题

它会有个报错

2024-07-28T14:00:10.866+08:00 ERROR 45104 --- [168.88.130:5672] o.s.a.r.c.CachingConnectionFactory : Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)

这个是因为我们的多次ack,导致的错误

例如我们可能多次ac或者reject了,就会发生这种错误

这是因为我们的配置默认是自动ack

如果我们不开启手动ack,我们的自己写的手动ack代码就算是再次ack了

所以会出现这个通道错误

这个会让我们的mq通道断连,然后再重连,这样子会导致部分消息丢失,所以记得配置开启手动ack


发送消息回队尾

这个我处理失败了,我也不知道为什么,可能我自己修改了messageId吧

反正这个重新发送消息,我得到的和之前的不同,有部分属性缺失了

所以我就没弄这个

如果想详细了解建议看参考文章


参考文章

Springboot + RabbitMQ 用了消息确认机制,感觉掉坑里了!-腾讯云开发者社区-腾讯云 (tencent.com)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1954850.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Chainlit一个快速构建成式AI应用的Python框架,无缝集成与多平台部署

概述 Chainlit 是一个开源 Python 包&#xff0c;用于构建和部署生成式 AI 应用的开源框架。它提供了一种简单的方法来创建交互式的用户界面&#xff0c;这些界面可以与 LLM&#xff08;大型语言模型&#xff09;驱动的应用程序进行通信。Chainlit 旨在帮助开发者快速构建基于…

全网最适合入门的面向对象编程教程:25 类和对象的 Python 实现-Python 判断输入数据类型

全网最适合入门的面向对象编程教程&#xff1a;25 类和对象的 Python 实现-Python 判断输入数据类型 摘要&#xff1a; 本文主要介绍了在使用 Python 面向对象编程时&#xff0c;如何使用 type 函数、isinstance 函数和正则表达式三种方法判断用户输入数据类型&#xff0c;并对…

PWA(渐进式网页应用)方式实现TodoList桌面应用

参考&#xff1a; https://cloud.tencent.com/developer/article/2322236 todlist网页参考&#xff1a; https://blog.csdn.net/weixin_42357472/article/details/140657576 实现在线网页当成app应用&#xff1a; 一个 PWA 应用首先是一个网页, 是通过 Web 技术编写出的一个网…

如何全面提升架构设计的质量?

当我们从可扩展、高可用、高性能等角度设计出来架构的时候&#xff0c;我们如何优化架构呢&#xff1f;就需要从成本、安全、测试等角度进行优化。 如何设计更好的架构 - 步骤 成本 低成本复杂度本质 低成本手段和应用 低成本的主要应用场景 安全 安全性复杂度本质 架构安全…

大语言模型系列-Transformer:深入探索与未来展望

大家好&#xff0c;我是一名测试开发工程师&#xff0c;已经开源一套【自动化测试框架】和【测试管理平台】&#xff0c;欢迎大家联系我&#xff0c;一起【分享测试知识&#xff0c;交流测试技术】 Transformer模型自其问世以来&#xff0c;便迅速在自然语言处理领域崭露头角&a…

2024年【危险化学品生产单位安全生产管理人员】最新解析及危险化学品生产单位安全生产管理人员考试总结

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 危险化学品生产单位安全生产管理人员最新解析参考答案及危险化学品生产单位安全生产管理人员考试试题解析是安全生产模拟考试一点通题库老师及危险化学品生产单位安全生产管理人员操作证已考过的学员汇总&#xff0c;…

mysql基本数据类型(整型)

一、 常见面试题 整型都有哪些基础类型&#xff0c;各占几个字节 tinyint, smallint, mediumint, int, bigint: 1 2 3 4 8 int(n) 是什么意思&#xff0c;什么时候用到 指定显示位宽&#xff0c;需配合 zerofill 使用&#xff08;不够位宽则在前面补0&#xff09;&#xff0c;…

Could not find a version that satisfies the requirement

Could not find a version that satisfies the requirement 目录 Could not find a version that satisfies the requirement 【常见模块错误】 【解决方案】 欢迎来到英杰社区https://bbs.csdn.net/topics/617804998 欢迎来到我的主页&#xff0c;我是博主英杰&#xff0c;2…

MATLAB被360误杀的解决方案

前面被误杀&#xff0c;今天又被误杀。 前面误杀结果是缺少文件&#xff0c;重装MATLAB也不行。 结果重装了操作系统。 这次&#xff0c;看到了提示额外小心。 当时备份了“病毒”文件&#xff0c;结果备份的也被杀了。 解铃还须系铃人 在360安全卫士里面恢复&#xff0c;步骤…

线性代数|机器学习-P27用于深度学习的神经网络结构

文章目录 1. 概述2. 参数定义3. CNN 网络 1. 概述 – 1. 卷积神经网络 CNNs – 2. 连续型线性分段函数 F – 3. 损失函数 – 4. 链式法则计算反向传播算法梯度 ∇ F g r a d F \nabla F \mathrm{grad}\; F ∇FgradF 2. 参数定义 我们定义每个样本有m维度特征&#xff0c;有…

java找不到符号解决办法

一、java找不到符号 如果你的代码里没有报错&#xff0c;明明是存在的。但是java报错找不到符号。如下所示&#xff0c; 二、解决步骤 1.清除编码工具缓存 本人用的idea&#xff0c; eclipse清除缓存方式有需要的可以百度一下&#xff01; 2.如果是mavne项目的 先clean 再…

流媒体服务器一:使用成熟的流媒体SRS 搭建 RTMP流媒体服务器

1 安装和测试srs流媒体服务器 服务器&#xff1a;SRS(Simple RTMP Server&#xff0c;⽀持RTMP、HTTP-FLV&#xff0c;HLS) 推流端&#xff1a;ffmpeg OBS 拉流端&#xff1a;ffplay VLC srs播放器 1.1 安装srs流媒体服务器 官网 SRS (Simple Realtime Server) | SRS 码…

大模型算法面试题(十四)

本系列收纳各种大模型面试题及答案。 1、微调后的模型出现能力劣化&#xff0c;灾难性遗忘是怎么回事 微调后的模型出现能力劣化&#xff0c;灾难性遗忘&#xff08;Catastrophic Forgetting&#xff09;是一个在机器学习领域&#xff0c;尤其是在深度学习和大模型应用中频繁出…

【SpringBoot】6 全局异常捕获

介绍 在项目开发中&#xff0c;如果每个 Controller 都增加 try catch 方法去捕获异常及处理&#xff0c;就会导致代码变得很繁琐、效率低下&#xff0c;而大部分异常是不能直接向外抛出&#xff0c;需要有个统一的显示处理方法&#xff0c;因此需要加上全局异常捕获统一获取并…

深度学习中常用损失函数介绍

选择正确的损失函数对于训练机器学习模型非常重要。不同的损失函数适用于不同类型的问题。本文将总结一些常见的损失函数&#xff0c;并附有易于理解的解释、用法和示例 均方误差损失(MSE) loss_fn nn.MSELoss()py均方误差&#xff08;Mean Squared Error&#xff0c;简称 MSE…

Navidrome音乐服务器 + 音流APP = 释放你的手机空间

20240727 By wdhuag 目录 前言&#xff1a; 参考&#xff1a; Navidrome音乐服务器 Demo试用&#xff1a; 支持多平台&#xff1a; 下载&#xff1a; 修改配置&#xff1a; 设置用NSSM成服务启动&#xff1a; 服务器本地访问网址&#xff1a; 音流 歌词封面API&am…

Golang | Leetcode Golang题解之第292题Nim游戏

题目&#xff1a; 题解&#xff1a; func canWinNim(n int) bool {return n%4 ! 0 }

网站打包封装成app,提高用户体验和商业价值

网站打包封装成app的优势 随着移动互联网的普及&#xff0c;用户对移动应用的需求越来越高。网站打包封装成app可以满足用户的需求&#xff0c;提高用户体验和商业价值。 我的朋友是一名电商平台的运营负责人&#xff0c;他曾经告诉我&#xff0c;他们的网站流量主要来自移动…

vite + xlsx + xlsx-style 导出 Excel

如下 npm i 依赖 npm i xlsxnpm i xlsx-style-vite1、简单的使用&#xff1a;.vue文件中使用 const dataSource ref([]) // 数据源const columns [{title: 用户名,key: userName,width: 120,},{title: 用户组,key: userGroup,width: 120,},{title: 状态,key: enable,width: …

MySQL 视图与事务

文章目录 视图事务事物的四大特性&#xff08;ACID)事务的开启和结束事物隔离级别现象脏读不可重复度幻读 隔离级别读未提交&#xff08;READ UNCOMMITTED)读提交 &#xff08;READ COMMITTED)可重复读 (REPECTABLE READ)串行化 (SERIALIZABLE) 查看与设置事务隔离级别重复读的…