消息可靠性保证

news2024/9/23 11:22:15

回顾RabbitMQ的消息传递过程

image.png
如图所示,发生消息丢失的可能阶段也就是生产者发送消息,时rabbitmq存储消息时,消费者消费消息时。
项目源码:gitee

生产者发送消息阶段

  1. 生产者发送消息时把交换机名写错
  2. 生产者发送消息时把routingKey写错

RabbitMQ存储消息阶段

默认情况下rabbitmq会把消息存储到内存中,如果在消费者消费消息之前,rabbitmq服务器宕机了,内存就会被释放,消息就会丢失

消费者消息消息阶段

消费者在获取到消息以后,就会自动给rabbitmq服务端返回一个ack标志,rabbitmq服务端就会把这个消息从队列中删除。但当消费者获取到消息以后,准备进行业务逻辑处理时消费者宕机了,相当于该消息没有被消费成功,即消息丢失。

因此,我们就针对以上3个阶段,分别解决

生产者保证消息不丢失

  1. 生产者确认机制:可以让生产者感知到消息是否正常发送给交换机
  2. 生产者回退机制:可以让生产者感知到消息是否正常发送给队列

生产者确认机制

image.png

  1. 首先准备好环境,交换机,队列,绑定信息。
package com.example.rabbitmqreliable.demos;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    public static final String CONFIRM_EXCHANGE_NAME = "confirm_exchange";
    public static final String CONFIRM_QUEUE_NAME = "confirm_queue";
    public static final String CONFIRM_ROUTING_KEY = "key1";

    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange(CONFIRM_EXCHANGE_NAME);
    }
    @Bean
    public Queue confirmQueue() {
        return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
    }
    @Bean
    public Binding confirmBind(@Qualifier("directExchange") DirectExchange confirmExchange,
                               @Qualifier("confirmQueue") Queue confirmQueue) {
        return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);

    }
}

  1. 配置文件
spring.rabbitmq.host=101.133.141.75
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=ConFirm
# 开启生产者确认机制,当消费者成功处理这个消息时,会向生产者发送一个确认信号,
# 告诉生产者这个消息已经被成功消费了。
# 如果生产者在一定时间内没有收到确认信号,就会重新发送这个消息。
spring.rabbitmq.publisher-confirm-type=correlated
  1. 通过测试类,创建生产者发送消息
package com.example.rabbitmqreliable;

import com.example.rabbitmqreliable.demos.RabbitMQConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class ProviderTests {
    /**
     * 目标:让生产者获取到rabbitmq服务返回的ack或nack
     * 做法:rabbitTemplate需要绑定对应的回调函数
     * 分析:目前的rabbitTemplate是spring托管的,并没有对应的回调函数,需要自定义
     * 实施:需要自定义rabbitTemplate,并注入到spring容器中
     * 一旦我们在spring容器中配置了一个rabbitTemplate,
     * 那么spring boot就不会对rabbitTemplate进行自动化配置
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void test1() {
        rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, RabbitMQConfig.CONFIRM_ROUTING_KEY, "HELLO CONFIRM");
    }

}
  1. 自定义rabbitTemplate,实现确认机制的回调方法,需要在RabbitMQConfig文件中添加以下内容:
/**
     * ConnectFactory由spring boot根据配置文件中的连接信息实现自动化配置
     * 即在spring容器中直接存在了ConnectionFactory对象
     * @param connectionFactory
     * @return
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 设置回调函数
        // 而ConfirmCallBack是一个接口,需要一个类去实现他
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 当rabbitmq服务端给生产者放回ack/nack时会执行该方法
             * @param correlationData 消息的id,内容
             * @param ack 消息是否发送成功
             * @param cause 原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack) {
                    System.out.println("消息正常发送给交换机");
                }else {
                    System.out.println("消息没有正常发送给交换机,cause" + cause);
                    // TODO 处理方案:再次发送消息给rabbitmq,需要获取消息内容
                }
            }
        });

      return rabbitTemplate;
    }
  1. 实现当消息发送失败时,再次重新发送部分。
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 当rabbitmq服务端给生产者放回ack/nack时会执行该方法
             * @param correlationData 消息的id,内容
             * @param ack 消息是否发送成功
             * @param cause 原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack) {
                    System.out.println("消息正常发送给交换机");
                }else {
                    System.out.println("消息没有正常发送给交换机,cause" + cause);
                    // 处理方案:再次发送消息给rabbitmq,需要获取消息内容
                    //方案1: 立马拿着id去数据库查消息
                    // 方案2:通过定时任务重新发送
                    String msgId = correlationData.getId();
                    System.out.println("msgId" + msgId);
                    // 规定消息的最大发送次数3次,发送消息前判断实际发送次数是否大于最大发送次数,如果大于就不进行重新发送,并设置status=2

                }
            }
        });
 @Test
    public void test1() {
        // 发送消息前把消息写入数据库,并分配唯一id(如果发送失败,可以拿这个id去查数据库,重新发送)
        // 并且还要记录消息实际发送次数,以及消息状态。当超过发送次数超过了规定值,就设置消息的status为2,发送成功设置消息状态为1
        String msgId = UUID.randomUUID().toString().replace("-","");
        CorrelationData correlationData =new CorrelationData(msgId);
        rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, RabbitMQConfig.CONFIRM_ROUTING_KEY + "error", "HELLO CONFIRM", correlationData);
    }

image.png

生产者回退机制

image.png

  1. 需要在配置文件中开启生产者回退机制
# 开启生产者确认机制,
spring.rabbitmq.publisher-returns=true
  1. 给rabbitTemplate绑定生产者回退机制的回调函数
/**
         * 给rabbitTemplate绑定回退机制的回调函数
         * ReturnCallback是一个接口,使用匿名内部类实现
         * 该方法被调用的概率极低,因为从交换机到队列的过程是rabbitmq内部实现的
         * 如果会出错,咱们也不会用他
         */
        rabbitTemplate.setMandatory(true);//让rabbitmq服务把失败信息回传给生产者
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            // 当消息没有正常转发给队列的时候被调用
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                byte[] body = returnedMessage.getMessage().getBody();
                String msg = new String(body);
                System.out.println("msg:" + msg);
            }
        });
  1. 执行测试方法
package com.example.rabbitmqreliable;

import com.example.rabbitmqreliable.demos.RabbitMQConfig;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.UUID;

@SpringBootTest
public class ProviderTests {
    /**
     * 目标:让生产者获取到rabbitmq服务返回的ack或nack
     * 做法:rabbitTemplate需要绑定对应的回调函数
     * 分析:目前的rabbitTemplate是spring托管的,并没有对应的回调函数,需要自定义
     * 实施:需要自定义rabbitTemplate,并注入到spring容器中
     * 一旦我们在spring容器中配置了一个rabbitTemplate,
     * 那么spring boot就不会对rabbitTemplate进行自动化配置
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test1() {
        // 发送消息前把消息写入数据库,并分配唯一id(如果发送失败,可以拿这个id去查数据库,重新发送)
        // 并且还要记录消息实际发送次数,以及消息状态。当超过发送次数超过了规定值,就设置消息的status为2,发送成功设置消息状态为1
        String msgId = UUID.randomUUID().toString().replace("-","");
        CorrelationData correlationData =new CorrelationData(msgId);
        // 把routingKey写错
        rabbitTemplate.convertAndSend(RabbitMQConfig.CONFIRM_EXCHANGE_NAME, RabbitMQConfig.CONFIRM_ROUTING_KEY + "404", "HELLO CONFIRM", correlationData);
    }

}

image.png

RabbitMQ保证消息不丢失

  1. 对交换机进行持久化
  2. 对队列进行持久化
  3. 对消息进行持久化

消费者保证消息不丢失

Spring Boot整合RabbitMQ消费者的应答模式:

  • none:自动应答,消费者获取到消息后直接给rabbitmq返回ack
  • auto(默认值):由spring boot框架根据业务执行特点决定给rabbitmq返回ack还nack,业务正常执行完毕返回ack,业务执行过程产生异常,返回nack
  • manual:手动应答,由程序员自己根据业务执行特点给rabbitmq返回对应的ack或nack

使用none模式

  1. 在配置文件中设置消费者应答模式
# 消费者的应答模式
spring.rabbitmq.listener.simple.acknowledge-mode=none
  1. 设置消费者
@Component
public class Consumer {
	@RabbitListener(queues = RabbitMQConfig.CONFIRM_QUEUE_NAME) // 监听的队列
	public void consumerListener(Message message) {
		byte[] body = message.getBody();;
		String msg = new String(body);
		// 进行业务处理
		int a = 1 / 0; //产生异常
		System.out.println("[consumerListener],msg:" + msg);

	}
}
  1. 启动项目,再运行测试类

image.png
image.png
由于我们使用none自动应答模式,消费者给rabbitmq返回ack,rabbitmq直接把消息从队列中删除,导致消息丢失

使用auto模式

修改配置文件中的应答方式为auto,以及引伸出来的其他配置项

# 消费者的应答模式
spring.rabbitmq.listener.simple.acknowledge-mode=auto
# 开启重试机制
spring.rabbitmq.listener.simple.retry.enabled=true
# 最大重试次数,否则会无限重试下去
spring.rabbitmq.listener.simple.retry.max-attempts=3
# 初始化的重试时间间隔
spring.rabbitmq.listener.simple.retry.initial-interval=1000
# 最大的重试时间间隔
spring.rabbitmq.listener.simple.retry.max-interval=5000
# 乘子(计算每一次时间间隔):1s->2s->4s->5s
spring.rabbitmq.listener.simple.retry.multiplier=2

此时控制台报了3次错误,队列没有消息,消息还是丢失了。

auto模式需要设置最大重试次数,否则会死循环,但是又无法判断最大重试次数是多少

使用manual模式

  1. 修改配置文件中的应答方式为manual,将auto模式中延申出来的配置项注释掉
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  1. 消费者代码
package com.example.rabbitmqreliable.demos;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Component
public class Consumer {
    @RabbitListener(queues = RabbitMQConfig.CONFIRM_QUEUE_NAME) // 监听的队列
    public void consumerListener(Message message, Channel channel) {
        byte[] body = message.getBody();;
        String msg = new String(body);
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 进行业务处理
            int a = 1 / 0; //产生异常
            System.out.println("[consumerListener],msg:" + msg);

            // 没有产生异常,给服务端返回ack
            // 第一个参数:表示消息的标签,保证消息唯一性
            // 第二个数:表示是否需要进行批量应答
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
			e.printStackTrace();
            // 产生异常
            // 给rabbitmq返回nack
            // 第三个参数:表示是否将消息重新放入队列中
            try {
               channel.basicNack(deliveryTag, true, true);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
        }
    }
}

启动项目后执行测试代码,控制台会不断报红,死循环。因为没有设置最大重试次数,因此我们需要统计消息的实际消费次数,可以借助redis计算。一旦消息的实际消费次数大于最大消费次数,那么此时需要给rabbitmq返回ack删除该消息,返回之前要将该消息记录数据库中,后期人工处理

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1311334.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

行为树保姆级教程(以机器人的任务规划为例

行为树 目录 什么是行为树(behavior tree)?行为树的相关术语 行为节点和控制节点不同类型的控制结点: 顺序节点选择节点并行节点装饰结点 机器人的例子:物体搜索 1:如果只存在一个地点A,那么行为树很简单&#xff0…

实验:BGP配置

1.实验目的: 本实验旨在掌握BGP协议的基本概念和配置方法,以及使用Packet Tracer模拟网络环境进行BGP配置的方法。 2.实验要求: 理解BGP协议的基本概念和原理;掌握BGP协议的配置方法;能够使用Packet Tracer模拟网络…

MyBatisPlus使用时报错Invalid value type for attribute ‘factoryBeanObjectType‘

目录 问题 探索过程 总结 问题 今天在学习MyBatisPlus过程中突然报了个错,信息如下 Invalid value type for attribute factoryBeanObjectType: java.lang.String Caused by: java.lang.IllegalArgumentException: Invalid value type for attribute factoryB…

json Deserialization of Python Objects

openweathermap.json {"coord": {"lon": 114.0683, "lat":22.5455},"weather":[ {"id": 803, "main":"Clouds", "description":"多云", "icon":"04d"}],"…

MacOS多屏状态栏位置不固定,程序坞不小心跑到副屏

目录 方式一:通过系统设置方式二:鼠标切换 MacOS多屏状态栏位置不固定,程序坞不小心跑到副屏 方式一:通过系统设置 先切换到左边 再切换到底部 就能回到主屏了 方式二:鼠标切换 我的两个屏幕放置位置如下 鼠标在…

R语言【rgbif】——什么是多值传参?如何在rgbif中一次性传递多个值?多值传参时的要求有哪些?

rgbif版本:3.7.8.1 什么是多值传参? 您是否在使用rgbif时设想过,给某个参数一次性传递许多个值,它将根据这些值独立地进行请求,各自返回独立的结果。 rgbif支持这种工作模式,但是具体的细节需要进一步地…

蓝牙物联网智慧物业解决方案

蓝牙物联网智慧物业解决方案是一种利用蓝牙技术来提高物业管理和服务效率的解决方案。它通过将蓝牙技术与其他智能设备、应用程序和云服务相结合,为物业管理和服务提供更便捷、高效和智能化的支持。 蓝牙物联网智慧物业解决方案包括: 1、设备管理&#…

Crypto基础之密码学

FLAG:20岁的年纪不该困在爱与不爱里,对吗 专研方向: 密码学,Crypto 每日emo:今年你失去了什么? Crypto基础之密码学 前言一、编码Base编码base64:Base32 和 Base16:uuencode:xxencod…

计算机网络——网络层——OSPF协议的介绍

什么是 OSPF ? OSPF 是一个基于链路状态的自治系统内部路由协议,在 TCP/IP 的网络层中进行路由选择,常用于构建大型企业网络或者服务上的骨干网络。在互联网核心路由器之间也可以使用。 OSPF 概述 OSPF 使用的是 Dijkstra(最短…

智能优化算法应用:基于黏菌算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用:基于黏菌算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于黏菌算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.黏菌算法4.实验参数设定5.算法结果6.参考文献7.MA…

记录 | mac打开终端时报错:login: /opt/homebrew/bin/zsh: No such file or directory [进程已完成]

mac打开终端时报错:login: /opt/homebrew/bin/zsh: No such file or directory [进程已完成],导致终端没有办法使用的情况 说明 zsh 没有安装或者是安装路径不对 可以看看 /bin 下有没有 zsh,若没有,肯定是有 bash 那就把终端默…

QT- QT-lximagerEidtor图片编辑器

QT- QT-lximagerEidtor图片编辑器 一、演示效果二、关键程序三、下载链接 功能如下: 1、缩放、旋转、翻转和调整图像大小 2、幻灯片 3、缩略图栏(左、上或下);不同的缩略图大小 4、Exif数据栏 5、内联图像重命名 6、自定义快捷方式…

JS加密/解密之JSX解密解析(photoshop插件)

简介 Adobe Photoshop 插件通常使用 JSX(JavaScript XML)脚本语言。这是一种基于JavaScript的扩展,专门设计用于处理Adobe Creative Suite(包括Photoshop)的任务。JSX脚本允许开发者编写自定义脚本以扩展和增强Photos…

【Eureka】自定义元数据消失原因?

【Eureka】自定义元数据运行很长一段时间后,自定义元数据(scheduler.server.enabled)偶尔会消失,但服务元数据信息还在 eureka是单节点的,这个应用服务也是单节点的 代码实现方式如下 我看过eureka服务的日志信息&…

在做题中学习(33):只出现一次的数字 II

137. 只出现一次的数字 II - 力扣(LeetCode) 思路: 1.首先想到出现三次的数,它们仨的任意一位都是相同的(1/0) 2.可以发现出现三次的数的某一位和a某一位在所有情况下%3最后的结果都和a的那一位相同&…

06.迪米特法则(Demeter Principle)

明 嘉靖四十年 江南织造总局 小黄门唯唯诺诺的听完了镇守太监杨金水的训斥,赶忙回答:“知道了,干爹!” “知道什么?!!” 杨金水打断了他的话,眼神突然变得凌厉起来: “有…

企业计算机服务器中了halo勒索病毒如何解密,halo勒索病毒恢复流程

网络技术的不断发展与应用,为企业的生产运营提供了极大便利,越来越多的企业使用数据库存储企业的重要数据,方便工作与生产,但网络是一把双刃剑,网络安全威胁一直存在,并且网络威胁的手段也在不断升级。在本…

我的隐私计算学习——匿踪查询

笔记内容来自多本书籍、学术资料、白皮书及ChatGPT等工具,经由自己阅读后整理而成。 (一)PIR的介绍 ​ 匿踪查询,即隐私信息检索(Private InformationRetrieval,PIR),是安全多方计算…

C# OpenVINO 直接读取百度模型实现印章检测

目录 效果 模型信息 项目 代码 下载 其他 C# OpenVINO 直接读取百度模型实现印章检测 效果 模型信息 Inputs ------------------------- name:scale_factor tensor:F32[?, 2] name:image tensor:F32[?, 3, 608, 608] …

Windows更改远程桌面端口并添加防火墙入站规则

1.运行 快捷键winR组合键,win就是键盘上的windows系统图标键。 2.打开注册表 Regedit,在对话框中输入regedit命令,然后回车 3.打开注册表,输入命令后,会打开系统的注册表,左边是目录栏,右边是…