RabbitMQ消息队列实战(4)—— spring-boot-starter-amqp中消息的可靠性传输和确认机制

news2025/1/12 22:46:09

        在上一篇文章中,笔者整理了从消息生产出来到消费结束的整个生命周期过程中,为了确保消息能够可靠到达或者消费,我们需要在哪些环节进行哪些处理,同时也展示了使用Java原生代码怎么样在这些环节进行处理。本文主要介绍使用spring boot集成RabbitMQ的方式时,针对这些环节应该进行怎样的处理。

一、创建Exchange、Queue和Binding

        首先,需要创建待测试的交换机、队列和绑定。相对于原生代码,spring boot对ConnectionFactory、Channel这些对象的创建和销毁进行了封装,使得我们不再需要手动创建Connection或者Channel,也不需要手动进行释放。这样做的一个好处就是:我们不必再关心这些系统资源的生命周期,从而简化了开发,而且避免了因忘记释放资源造成的内存泄露。RabbitMQ的Exchange、Queue和Binding这些组件的创建,只需要创建相应的Bean即可,注入到IOC中。

        比如,笔者通过一个RabbitConfig的自动配置类,对这些Bean进行了注入:

@Configuration
public class RabbitConfig {
    @Value("${spring.rabbitmq.host}")
    private String host;
    @Value("${spring.rabbitmq.port}")
    private int port;
    @Value("${spring.rabbitmq.username}")
    private String username;
    @Value("${spring.rabbitmq.password}")
    private String password;
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        connectionFactory.setPublisherConfirms(true);
        return connectionFactory;
    }
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }
    @Bean
    public Queue TestDirectQueue() {
        return new Queue("TestDirectQueue", true, false, false);
    }
    @Bean
    DirectExchange TestDirectExchange() {
        return new DirectExchange("TestDirectExchange", true, false);
    }
    @Bean
    Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectQueue");
    }
}

第3~10行:是RabbitMQ连接参数的配置

第11~19行:根据配置的连接参数,创建链接工厂ConnectionFactory ,这里我们只需要创建ConnectionFactory ,spring boot会自动创建和管理Connection以及Channel。

第20~24行:创建RabbitTemplate对象。RabbitTemplate是采用模板方法模式进行消息发送的一个模板,后面我们在发送消息时就是使用RabbitTemplate的相应的方法。实际上是在RabbitTemplate内部对Connection和Channel的创建进行了封装,而且只有我们在使用其第一次发送消息时,才会真正在RabbitMQ的broker上创建声明好的Exchange、Queue、Binding等组件。

第25~28行:创建名称为TestDirectQueue的队列

第29~32行:创建名称为TestDirectExchange的直连交换机

第33~36行:创建绑定,将上面创建的交换机和队列绑定在一起,路由键为队列的名称。

二、生产者端处理连接异常

        在前面我们已经提到,spring boot集成RabbitMQ之后,使用RabbitTemplate对象进行消息的发送,所以生产者端的异常处理需要在调用RabbitTemplate对象发送消息的代码上。我们在处理Java原生代码调用RabbitMQ要处理的异常主要是:

  • IOException —— 客户端连不上broker的情况,抛出的异常。
  • TimeoutException —— 客户端连接broker超时抛出的异常。
  • ShutdownSignalException —— broker的交换机不存在时出现的异常。

        但是在spring boot的框架中,引入了AmqpConnectException异常,实际上一旦发生MQ掉线或者超时的情况,AmqpConnectException异常取代了IOException 和TimeoutException。所以完整的异常处理的代码如下:

try {
    Message message = new Message(str.getBytes());
    rabbitTemplate.send("TestDirectExchange", "TestDirectQueue", message);
    User user = new User("张三", 18);
    rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectQueue", user);
}  catch (ShutdownSignalException e) {
    System.out.println("交换机故障或者不存在。");
    e.printStackTrace();
}catch (AmqpConnectException e) {
    System.out.println("服务器连接失败。");
    e.printStackTrace();
}

第3行和第5行:分别调用rabbitTemplate的send和convertAndSend来发送消息,两者的不同在于前者发送的参数为Message对象,而该对象中封装了发送的消息的字节数组;后者发送的是一个自定义的类对象,但是要注意类需要实现Serializable接口。

第6~8行:对ShutdownSignalException 类型异常的处理,当交换机不存在时会产生此类异常。

第9~11行:对AmqpConnectException类型的异常的处理,当客户端连接不上Broker时会抛出此类的异常。

三、生产者端手动确认消息

        在生产者端开启手动确认消息,可以保证由于RabbitMQ自身的原因导致消息路由到队列失败时,我们可以进行手动的消息重发。在java原生代码调用RabbitMQ时需要手动开启通道的confirm模式,而且提供了3种接受broker返回的ack的方法。在spring-boot-starter-amqp中,进一步对这个逻辑进行了封装,只保留了使用回调方法的方式。

        下面,先创建一个回调的类:

public class MyConfirmCallback implements RabbitTemplate.ConfirmCallback {

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("MyConfirmCallback:ack=" + ack);
    }
}

        可以看到,回调类实现了ConfirmCallback 接口,接口中有一个回调方法confirm,confirm具有三个参数:

  • correlationData —— 发送消息时携带的附加数据,send方法和convertAndSend方法都具有携带这个参数的重载类型:
void send(String routingKey, Message message, CorrelationData correlationData);
void convertAndSend(String routingKey, Object object, CorrelationData correlationData);
  • ack —— 消息是否成功发送到exchange
  • cause —— 消息发送失败的原因

        有了上述回调类的实现,我们就可以在发送时指定回调类,比如上面的发送消息代码就可以改成这样:

try {
    Message message = new Message(str.getBytes());
    rabbitTemplate.setConfirmCallback(new MyConfirmCallback());
    rabbitTemplate.send("TestDirectExchange", "TestDirectQueue", message);
    User user = new User("张三", 18);
    rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectQueue", user);
}  catch (ShutdownSignalException e) {
    System.out.println("交换机故障或者不存在。");
    e.printStackTrace();
}catch (AmqpConnectException e) {
    System.out.println("服务器连接失败。");
    e.printStackTrace();
}

        在第3行中,我们为rabbitTemplate指定了回调的类对象,这里需要特别注意:

(1)一定要在发送消息之前,也就是在调用send方法或者convertAndSend方法之前调用setConfirmCallback指定回调对象

(2)只能为rabbitTemplate指定一次回调对象,否则会抛出异常

四、生产者端处理Return的消息

        在上一篇文章中,我们特别指出了一种场景:消息从交换机路由到队列中时,我们可以选择当队列缺失时,将消息返回给生产者。在spring-boot-starter-amqp中,对这种应用也进行了封装,与上一小节中的手动确认消息类似,我们需要先创建一个接受Return消息的回调类:

public class MyReturnCallback implements RabbitTemplate.ReturnsCallback {

    @Override
    public void returnedMessage(ReturnedMessage returnedMessage) {
        String str = String.format("消息发送失败-消息回退,应答码:{},原因:{},交换机:{},路由键:{}",
                returnedMessage.getReplyCode(),
                returnedMessage.getReplyText(),
                returnedMessage.getExchange(),
                returnedMessage.getRoutingKey());
        System.out.println(str);
    }
}

        回调类实现了ReturnsCallback 接口,接口中有一个回调方法returnedMessage,方法只有一个参数ReturnedMessage,先看下ReturnedMessage类型的定义:

public class ReturnedMessage {
    private final Message message;
    private final int replyCode;
    private final String replyText;
    private final String exchange;
    private final String routingKey;
}
  • message —— 细心的朋友可能已经观察到,这个参数的类型实际上就是rabbitTemplate.send方法中封装的消息的类型,当消息没有路由到队列被return时被原样返回。
  • replyCode —— 返回时的应答码
  • replyText —— 返回的原因
  • exchange —— 交换机名称
  • routingKey —— 路由键,如果在直连模式下就是队列名称

        在实现了回调类之后,就可以进行回调类对象的装配了:

try {
    Message message = new Message(str.getBytes());
    rabbitTemplate.setConfirmCallback(new MyConfirmCallback());
    rabbitTemplate.setMandatory(true);
    rabbitTemplate.setReturnsCallback(new MyReturnCallback());
    rabbitTemplate.send("TestDirectExchange", "TestDirectQueue", message);
    User user = new User("张三", 18);
    rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectQueue", user);
}  catch (ShutdownSignalException e) {
    System.out.println("交换机故障或者不存在。");
    e.printStackTrace();
}catch (AmqpConnectException e) {
    System.out.println("服务器连接失败。");
    e.printStackTrace();
}

        注意第4行和第5行是我们新增的代码,对于设置手动处理broker return的消息来说,第4行代码必不可少,意思是开启手动处理回退消息。对于Return消息的回调类的使用也需要注意:

(1)一定要在发送消息之前,也就是在调用send方法或者convertAndSend方法之前调用setReturnsCallback指定回调对象(注意是使用setReturnsCallback,而非setReturnCallback,后者已经被标注过时)

(2)只能为rabbitTemplate指定一次回调对象,否则会抛出异常

(3)在使用setReturnsCallback之前要使用rabbitTemplate.setMandatory(true)。

        如果不出什么意外,当消息被return时,程序会打印类似以下的信息(红色圈出部分):

五、消费者端手动ACK

        在前面我们梳理了RabbitMQ生产者端一些消息的可靠性传输的保证机制,下面再学习下消费者端怎么进行手动ack。消费者端的手动ack,首先要开启消费者端支持手动ack(默认是自动ack),开启的方式有两种:

  • 第一种,使用配置文件开启手动ack:
spring:
  #配置rabbitMq 服务器
  rabbitmq:
    listener:
      type: simple
      simple:
        #simple关闭自动ack,手动ack
        acknowledge-mode: manual

第8行,修改消费者确认消息的模式为手动

  • 第二种,自定义containerFactory,开启手动确认模式:
@Bean
@ConditionalOnClass
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(CachingConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    return factory;
}

        注意第6行,开启了手动确认模式。

        在完成了上述任意一种设置之后,然后配置监听器,并手动进行确认:

@RabbitListener(containerFactory = "rabbitListenerContainerFactory", bindings = @QueueBinding(value = @Queue(value = "TestDirectQueue", durable = "true"),
        exchange = @Exchange(name = "TestDirectQueue", durable = "true", type = "direct")))
public void consumer(@Payload Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
    System.out.println("接受到的消息是:" + message.toString());
    channel.basicAck(deliveryTag,false);
    System.out.print("这里是接收者1答应消息: ");
    System.out.println("SYS_TOPIC_ORDER_CALCULATE_ZZ_FEE process1  : " + message);
}

第1行 —— 使用RabbitListener注解指定监听器,同时装配自定义的containerFactory(如果使用配置文件的方式手动确认,无需装配containerFactory),同时指定的还有exchange、queue和binding等等。

第3行 —— consumer包含了三个参数:message是传递过来的消息,deliveryTag是为每个消息指定的自增长的id(详细在上一篇文章中已经解释过),channel传递信息的信道。

第5行 —— 通过调用channel的basicAck进行消息的确认,参数的解释可以参照笔者上一篇文章。

        至此,整个消费者的手动确认方法也介绍完毕。

六、总结

        下面,针对本文的内容进行总结:

(1)spring-boot-starter-amqp中,针对生产者端或者消费者端连接不到broker的IOException异常和TimeoutException异常,重新封装了新的异常类型AmqpConnectException异常。

(2)生产者发送消息时如果交换机不存在,会抛出ShutdownSignalException异常。

(3)生产者端可以手动确认发送的消息正常到达了交换机,方法是实现ReturnsCallback接口,然后使用rabbitTemplate.setConfirmCallback()方法进行装配。

(4)如果broker出现内部异常或者目标队列不存在时,可以设置消息返还给生产者,方法是设置rabbitTemplate.setMandatory(true),实现ReturnsCallback接口,然后使用rabbitTemplate.setReturnsCallback()进行装配。

(5)开启消费者手动ack有两种方法,一种是通过配置文件修改支持,另一种是创建自定义的SimpleRabbitListenerContainerFactory并注入到IOC中,在监听方法中调用channel.basicAck()。

七、附yml文件中常见的RabbitMQ相关配置:

spring:
  #配置rabbitMq 服务器
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    #虚拟host 可以不设置,使用server默认host
    #virtual-host: JCcccHost
    publisher-confirm-type: correlated
    #发布者到达确认
    publisher-returns: true
    listener:
      type: simple
      simple:
        #simple关闭自动ack,手动ack
        acknowledge-mode: manual
        retry:
          ### 开启重试机制(调用监听方法失败时会重试,不是从队列中重复拿消息)
          enabled: true
          #最大重试传递次数
          max-attempts: 3
          #第一次和第二次尝试传递消息的间隔时间 单位毫秒
          initial-interval: 5000ms
          #最大重试时间间隔,单位毫秒
          max-interval: 300000ms
          #应用前一次重试间隔的乘法器,multiplier默认为1
          multiplier: 3
          #以上配置的间隔0s  5s  15s  45s
        #重试次数超过上面的设置之后是否丢弃(消费者listener抛出异常,是否重回队列,默认true:重回队列, false为不重回队列(结合死信交换机))
        default-requeue-rejected: true
    ### 模板配置
    ##设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
    template:
      mandatory: true

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/416009.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java静态代码块

在 Java中,每个类都有一个静态的代码块,用来描述类的构造函数和实例变量。在 java. util. Static中定义了一个静态代码块,在该代码块中,类的构造函数和实例变量都是不可以被修改的。 一个类包含了由它自己定义的静态代码块&#x…

【论文阅读】Self-paced Multi-view Co-training

论文下载 bib: ARTICLE{MaMeng2020SPamCo, title {Self-Paced Multi-View Co-Training}, author {Fan Ma and Deyu Meng and Xuanyi Dong and Yi Yang}, journal {J. Mach. Learn. Res.}, year {2020}, volume {21}, number {1}, numpages {1--38} }目录1.…

Kubernetes中的Calico网络

文章目录1 介绍2 环境部署3 IPIP模式3.1 测试环境3.2 ping包网络转发4 BGP模式4.1 测试环境4.2 ping网络转发5 两种模式对比1 介绍 Calico网络的大概思路,即不走Overlay网络,不引入另外的网络性能损耗,而是将转发全部用三层网络的路由转发来…

GPSS【实践 01】Developing a Greenplum Streaming Server Client 自定义GPSS客户端开发实例

自定义GPSS客户端开发流程1.GPSS是什么2.架构3.组件下载安装4.自定义客户端4.1 GPSS Batch Data API Service Definition4.2 Setting up a Java Development Environment4.3 Generating the Batch Data API Client Classes4.4 Coding the GPSS Batch Data Client4.4.1 Connect …

【论文笔记】Attention Augmented Convolutional Networks(ICCV 2019 入选文章)

目录 一、摘要 二、介绍 三、相关工作 卷积网络Convolutional networks: 网络中注意力机制Attention mechanisms in networks: 四、方法 1. 图像的自注意力Self-attention over images: 二维位置嵌入Two-dimensional Positional Enco…

redis 第一章

开始学习redis 之旅吧 关于redis 的介绍 redis 是一个开源的软件,可以存储结构化的数据在内存中,像内存数据库,缓存、消息中间件、流处理引擎。 redis 提供的数据结构像strings, hashes, lists, sets, sorted sets 。Redis具有内置复制、Lua…

《花雕学AI》13:早出对策,积极应对ChatGPT带来的一系列风险和挑战

ChatGPT是一款能和人类聊天的机器人,它可以学习和理解人类语言,也可以帮人们做一些工作,比如翻译、写文章、写代码等。ChatGPT很强大,让很多人感兴趣,也让很多人担心。 使用ChatGPT有一些风险,比如数据的质…

Pytorch 张量操作 Python切片操作

目录一维张量定义一维实例操作二维张量操作张量拼接-注意需要拼接的维度一定要相同广播机制更高维的演示总结YOLOv5 Focus样例参考梳理一下Pytorch的张量切片操作一维张量定义 一维向量的操作其实很像numpy一维数组,基本定义如下: 1.默认步长为1 2.起始…

HotSpot经典垃圾收集器

虽然垃圾收集器的技术在不断进步,但直到现在还没最好的收集器出现,更加不存在“万能”的收集器,所以我们选择的只是对具体应用最合适的收集器。 图 HotSpot中的垃圾收集器,连线表示可搭配使用 1 Serial收集器 是最基础、历史最悠…

第08章_面向对象编程(高级)

第08章_面向对象编程(高级) 讲师:尚硅谷-宋红康(江湖人称:康师傅) 官网:http://www.atguigu.com 本章专题与脉络 1. 关键字:static 回顾类中的实例变量(即非static的成员变量) c…

linux文件类型和根目录结构

目录 一、Linux文件类型 二、Linux系统的目录结构 1. FHS 2. 路径以及工作目录 (1)路径 (2)工作目录 一、Linux文件类型 使用ls -l命令查看到的第一个字符文件类型说明-普通文件类似于Windows的记事本d目录文件类似于Windo…

【GPT4】GPT4 创作郭德纲姜昆相声作品的比较研究

欢迎关注【youcans的 AIGC 学习笔记】原创作品 说明:本文附录内容由 youcans 与 GPT-4 共同创作。 【GPT4】GPT4 创作郭德纲姜昆相声作品的比较研究研究总结0. 背景1. 对 GPT4 创作的第 1 段相声的分析2. 对GPT4 创作的第 2 段相声的分析3. 对GPT4 创作的第 3 段相…

Window常用命令

一、快捷键 1、自带快捷键 序号快捷键作用1windowsGXBOX录屏2cmd >osk屏幕键盘3cmd >calc计算器4cmd >mrt恶意软件删除工具 2、浏览器快捷键 序号快捷键作用1Alt P浏览器图片下载(来自油猴脚本) 二、其他功能 1、解决端口占用 第一步&…

Linux安装单细胞分析软件copykat

Linux安装单细胞分析软件copykat 测试环境 Linux centos 7R 4.1.2minconda3天意云24C192GB安装步骤 新建环境 conda activate copykatconda install r-base4.1.2 安装基础软件 checkPkg <- function(pkg){return(requireNamespace(pkg, quietly TRUE))}if(!checkPkg("…

类的加载过程-过程二:Linking阶段

链接过程之验证阶段(Verification) 当类加载到系统后&#xff0c;就开始链接操作&#xff0c;验证是链接操作的第一步。 它的目的是保证加载的字节码是合法、合理并符合规范的。 验证的步骤比较复杂&#xff0c;实际要验证的项目也很繁多&#xff0c;大体上Java虚拟机需要做…

基于stable diffusion的艺术操作

下面是作者基于stable diffusion的艺术操作 得益于人工智能的强大技术 以下所有的图 绝对是整体星球上唯一的图 现在人工智能越来越强大&#xff0c;感觉将来最有可能取代的就是摄影师、中低级的程序员、UI设计师、数据分析师等&#xff0c;人们未来更多从事的职业应该是快速…

机器学习 01

目录 一、机器学习 二、机器学习工作流程 2.1 获取数据 2.2 数据集 2.2.1 数据类型构成 2.2.2 数据分割 2.3 数据基本处理 2.4 特征工程 2.4.1什么是特征工程 2.4.2 为什么需要特征工程(Feature Engineering) 2.4.3 特征工程内容 2.5 机器学习 2.6 模型评估 2.7 …

【消息队列】细说Kafka消费者的分区分配和重平衡

消费方式 我们直到在性能设计中异步模式&#xff0c;一般要么是采用pull&#xff0c;要么采用push。而两种方式各有优缺点。 pull &#xff1a;说白了就是通过消费端进行主动拉去数据&#xff0c;会根据自身系统处理能力去获取消息&#xff0c;上有Broker系统无需关注消费端的…

Windows GPU版本的深度学习环境安装

本文记录了cuda、cuDNN的安装配置。 参考文章&#xff1a; cuda-installation-guide-microsoft-windows 12.1 documentation Installation Guide :: NVIDIA cuDNN Documentation 一、cuda安装 注意事项&#xff1a; 1、cuda安装最重要的是查看自己应该安装的版本。 表格…

Java数组打印的几种方式

&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了 博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳,欢迎大佬指点!人生格言&#xff1a;当你的才华撑不起你的野心的时候,你就应该静下心来学习! 欢迎志同道合的朋友一起加油喔&#x1f9be;&am…