【RabbitMQ】图解RabbitMQ是如何保证消息可靠性的

news2025/1/16 2:45:16

目录

一、概述

1、消息可靠性

2、SpringBoot整合RabbitMQ配置文件

二、生产者---RabbitMQ服务器如何保证信息不丢失

1、confirm确认模式

1.说明

2.SpringBoot代码实现

2、return退回模式

1.说明

2.SpringBoot代码实现

三、RabbitMQ服务器如何保证消息不丢失

四、RabbitMQ服务器---消费者如何保证消息不丢失

1、ACK

2、实现方式

3、SpringBoot代码实现


一、概述

1、消息可靠性

当生产者生产出一条消息发送给MQ是时,该消息来到MQ服务器会先到达交换机,然后由交换机根据路由分发给对应的队列,然后再由MQ服务器给消费者进行消费。这个步骤可分为三个过程分别是消息从消费者到交换机、消息从交换机到队列、消息从队列到消费者。那么RabbitMQ是如何保证消息的可靠性的呢。

在前两个过程,也就是消息从生产者到达服务器的过程,RabbitMQ提供了两种保证消息从生产者到MQ服务器消息可靠性的方法,分别是confirm确认模式与return退回模式。而在第三个过程,也就是消息从MQ服务器到消费者的过程,RabbitMQ提供了ACK模式,如果学习过计算机网络TCP协议就回明白,ACK(ackonwledge承认)当消费者收到MQ服务器的消息后,会给MQ服务器返回一个确认信息,服务器收到ack才会对该消息进行删除。

2、SpringBoot整合RabbitMQ配置文件

在了解这几个机制之前,先了解以下SpringBoot整合RabbitMQ的一些配置信息(yml形式)

spring:
  rabbitmq:
    host: 127.0.0.1                     #RabbitMQ服务器IP
    port: 5672                          #端口
    username: guest                     #用户名
    password: guest                     #密码
    virtual-host: /learn                #虚拟机
    publisher-confirm-type: correlated  #开启确认机制
    publisher-returns: true             #开启回退模式
    listener:
      simple:
        acknowledge-mode: manual        #开启收动签收
        prefetch: 4                     #消费端每次拉去10条数据,直到确认消费完毕才拉去下10条
         retry:
           enabled: true                #开启重试
           max-attempts: 4              #重试最大次数
           max-interval: 1000s          #重试最大时长

二、生产者---RabbitMQ服务器如何保证信息不丢失

1、confirm确认模式

1.说明

当消息从producer生产者到达exchange交换机,会以异步地给消费者返回一个confirmCallbak回调,如果交换机收到了就返回true如果没有收到则返回false,如果返回false生产者收到该信息后可进行重发等处理

2.SpringBoot代码实现

首先我们需要创建一个RabbitMQ地配置类并注入Spring里,让他可以随着项目地启动而启动,然后我们在类里需要先创建队列以及交换机,此出我们使用topic模式进行演示【RabbitMQ】SpringBoot整合RabbitMQ、实现RabbitMQ五大工作模式(万字长文)_1373i的博客-CSDN博客https://blog.csdn.net/qq_61903414/article/details/130174313?spm=1001.2014.3001.5501

package com.example.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 配置RabbitMQ
 * 配置交换机、队列、绑定队列与交换机
 */
@Configuration
public class RabbitMQConfig {
    /**
     * topic通配符模式
     */
    private static final String T_QUEUE1 = "tQueue1";
    private static final String T_QUEUE2 = "tQueue2";
    private static final String T_EXCHANGE = "tEx";

    @Bean
    public Queue tQueue1() {
        return QueueBuilder.durable(T_QUEUE1).build();
    }

    @Bean
    public Queue tQueue2() {
        return new Queue(T_QUEUE2);
    }

    @Bean
    public TopicExchange tEx() {
        return ExchangeBuilder.topicExchange(T_EXCHANGE).durable(true).build();
    }

    @Bean
    public Binding binding1(@Qualifier("tQueue1") Queue tQueue1,@Qualifier("tEx") TopicExchange tEx) {
        return BindingBuilder.bind(tQueue1).to(tEx).with("A.*");
    }

    @Bean
    public Binding binding2(@Qualifier("tQueue2") Queue tQueue2,@Qualifier("tEx") TopicExchange tEx) {
        return BindingBuilder.bind(tQueue2).to(tEx).with("#.error");
    }

}

其次我们要在配置文件里开启RabbitMQ地确认模式

此时我们就可以编写生产者类通过这个方法abbitTemplate.setConfirmCallback实现回调方法

package com.example.demo.controller;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.servlet.http.HttpServletRequest;

@Controller
@ResponseBody
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

   
    @RequestMapping("/tEx")
    public void sendByT() {
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             *
             * @param correlationData 配置信息
             * @param b 是否手动消息
             * @param s 失败原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("confirm被执行");
                if (!b) {
                    System.out.println("接收失败" + s);
                    // 处理,让消息重发
                }
            }
        });
        // 故意修改错交换机名称,运行查看回调方法的执行
        rabbitTemplate.convertAndSend("tExx","A.error","hello mq");
    }

    
}

在发送消息时故意写错交换机名称,运行代码访问127.0.0.1:8080/tEx接口查看回调方法的执行

 此时生产者代码打印了错误信息,此时我们可以通过代码对该消息进行重发处理 

2、return退回模式

1.说明

当消息到达交换机以后就会根据路由key去到达匹配的队列里,如果消息在该过程没有到达queue。就会异步地返回一个returnCallback的回调,将错误信息告诉生产者,生产者可进行后续处理,当交换机消息由于路由问题没有到达队列时,此时交换机对消息的处理有两种方式,默认方式是直接丢弃,另一种是将消息返回给生产者,在后续代码实现时,我们要设置第二种方式

2.SpringBoot代码实现

首先要在配置文件里开启回退模式

然后在生产者类代码里这次我们要通过rabbitTemplate.setReturnCallback实现回调方法

package com.example.demo.controller;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import javax.servlet.http.HttpServletRequest;

@Controller
@ResponseBody
public class Producer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

   
    @RequestMapping("/tEx")
    public void sendByT() {
         rabbitTemplate.setMandatory(true);  // 设为true消息由交换机给queue失败时返回给发送者
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnsCallback(){
            /**
             *
             * @param message    回退的信息
             * @param replyCode  错误码
             * @param replyText  错误信息
             * @param exchange   交换机
             * @param routingKey 路由
             */
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("执行了回退方法");
            }
        });
        // 故意写错路由让消息到交换机后无法到达队列
        rabbitTemplate.convertAndSend("tEx","lolo","hello mq");
    }

    
}

此时运行代码,访问接口查看回调方法的执行 

三、RabbitMQ服务器如何保证消息不丢失

在MQ服务器里交换机、队列等都是默认持久化到硬盘里的,消息到达MQ服务器是默认存储在硬盘里的。所以数据不会因为MQ服务器宕机重启而导致丢失。

四、RabbitMQ服务器---消费者如何保证消息不丢失

1、ACK

消费者收到消息后会给MQ服务器返回收到消息的确认信息,而确认在MQ里确认信息有三种,粉分别是:自动确认(默认)手动确认(manual)根据异常进行确认(auto)第三种比较麻烦。其中自动确认是指当消息一旦被消费者接收,就自动确认收到,MQ服务器就会对该消息进行删除。但是在业务代码里,消息收到后,业务可能会出现异常导致消息没有被真正的消费,那么消息就丢失了,此时就有了手动确认的方法,手动设置需要在业务执行完成后调用channel.basicAck()方法手动签收,而如果执行过程中代码出现了异常也可以使用channel.basicNack()方法进行拒收,让MQ服务器自动重发消息

2、实现方式

在上述介绍了三种实现方式,默认是自动确认但存在缺陷,下面我们用代码实现手动确认

3、SpringBoot代码实现

首先要做配置文件里添加开启自动确认的属性

然后创建一个消费者类并注入Spring中 

此时编写消费者代码,手动确认与拒绝确认API在下面代码注释中有讲解

package com.example.demo.controller;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class DemoQueueListener {
    /**
     * 手动签收
     * @param message
     * @param channel
     * @throws IOException
     */
    @RabbitListener(queues = "fQueue1")
    public void listener(Message message, Channel channel) throws IOException {
        long tag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("消费消息" + new String(message.getBody()));              // 处理消息
            /**
             * 参数1 :收到消息的标签
             * 参数2 :false--签收所有的消息
             */
            int i = 10 / 0;
               // 故意不手动接收 抛出异常 看MQ是否重发
            // channel.basicAck(tag, true); // 确认签收
        } catch (Exception e) {
            /**
             * 参数3:true--消息重回队列,会重发该消息  false---不回
             */
            channel.basicNack(tag,true,true);     // 拒绝签收
        }
    }
}

 此时运行代码查看消息是否被消费,该队列只有一条消息,看是否会被消费

 

 此时消息不但没有被消费,还被持续重发进行重复消费

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/415747.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Logstash:使用自定义正则表达式模式

有时 Logstash Grok 没有我们需要的模式。 幸运的是我们有正则表达式库:Oniguruma。在很多时候,如果 Logstash 所提供的正则表达不能满足我们的需求,我们选用定制自己的表达式。 定义 Logstash 是一种服务器端数据处理管道,可同时…

zabbix客户端配置

一、zabbix客户端配置 1.实验环境:关闭防火墙和安全模块 systemctl disable --now firewalld setenforce 0 2.服务端和客户端都要时间同步 yum install -y ntpdate #注意安装需要用网络源安装,不能用本地源 ntpda…

电子器件系列34:tvs二极管(2)

一、基本原理: 二、重要产数: 不同的资料对于相同的参数可能有不同的命名,要根据实际情况来确定参数的意义 这里以上图表格里的参数名称进行解析,以其他资料作为参考。 结合图表和伏安特性曲线,再结合下面的图我是…

你认为的.NET数据库连接池,真的是全部吗?

一般我们的项目中会使用1到2个数据库连接配置,同程艺龙的数据库连接被收拢到配置中心,由DBA统一配置和维护,业务方通过某个字符串配置拿到的是开箱即用的Connection对象。 DBA能在对业务方无侵入的情况下,让大规模微服务实例切换…

第二周P9-P22

文章目录第三章 系统总线3.1、总线的基本概念一、为什么要用总线二、什么是总线三、总线上信息的传送四、总线结构的计算机举例1、单总线结构框图2、面向CPU的双总线结构框图3、以存储器为中心的双总线结构图3.2、总线的分类1、片内总线2、系统总线3、通信走线3.3、总线特性及性…

基于多目标粒子群优化算法的计及光伏波动性的主动配电网有功无功协调优化(Matlab代码实现)

💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…

Vivado中ILA(集成逻辑分析仪)的使用

Vivado中ILA(集成逻辑分析仪)的使用一、写在前面二、ILA(Integrated Logic Analyzer)的使用2.1 ILA查找2.2 ILA配置2.2.1 General Options2.2.2 Probe Ports三、ILA调用四、ILA联调4.1 信号窗口4.2 波形窗口4.3 状态窗口4.4 设置窗口4.5 触发条件设置窗口…

Segment Anything论文详解(SAM)

论文名称:Segment Anything 论文地址:https://arxiv.org/abs/2304.02643 开源地址:https://github.com/facebookresearch/segment-anything demo地址:Segment Anything | Meta AI 主要贡献:开发一个可提示的图像分…

6.Java数组

数组 一、数组概述 1、什么是数组? 数组是相同类型数据的有序集合。数组描述的是相同类型的若干个数据,按照一定的先后次序排列组合而成。其中,每一个数据称作一个元素,每个元素可以通过一个索引(下标)来访问它们。 2、数组的…

哈夫曼树和最小生成树

哈夫曼树 首先给我们一串权值,然后我们需要让这串权值组成一个树,然后当他的wpl最小 我们可以发现当他的小权值离根节点越远,大权值离根节点越近的时候,我们这个时候构建出来的树就是wpl最小的树,也就是我们说的哈夫曼…

c++之 类和对象

目录 1.类和对象的基本概念 1.c语言与c中的结构体 2.类的封装性 3.定义类 4.构造与析构 构造与析构的概念: 构造函数 析构函数 拷贝构造函数 c默认增加的函数 1.类和对象的基本概念 1.c语言与c中的结构体 我们知道在c语言里,我们是无法在结构体…

进程的开销比线程大在了哪里?

进程内部都有哪些数据? 为什么创建进程的成本很高? 这样的问题确实不好回答,除非你真正理解了进程和线程的原理,否则很容易掉入面试大坑。探究问题背后的原理,围绕面试题展开理论与实践知识的学习。真正理解进程和线…

centos7虚拟机下hbase的使用案例讲解

系列文章目录 centos7虚拟机在集群zookeeper上面配置hbase的具体操作步骤 centos7虚拟机配置集群时间同步的操作步骤_centos虚拟机时间同步 centos7配置zookeeper本地模式与集群模式的详细教程 卸载centos7自带的jdk的操作步骤_centos7 卸载java 虚拟机centos7配置Hadoop单…

如何用 Vitis HLS 实现 OpenCV 仿真

这篇文章的基础是《Windows上快速部署Vitis HLS OpenCV仿真库》,我们使用的版本是Vitis HLS 2022.2,其他版本BUG不清楚,目前已知2021版本有BUG,只能使用其他方式,本文不适合。 这次选择中值滤波这个常规算法作为演示算…

Springboot电脑商城项目

目录 系统概述与环境搭建 1 系统开发及运行环境 2 项目分析 3 创建数据库 4 创建Spring Initializr项目 5 配置并运行项目 6 导入前端项目 用户注册 1 用户-创建数据表 2 用户-创建实体类 3 用户-注册-持久层 4 用户-注册-业务层 5 用户-注册-控制器 6 用户-注册…

归并排序(递归实现)

上一次我们说了快排的其他版本,还有就是快排的非递归实现 这次我们就说一哈归并排序,归并排序也是很厉害的一种排序,而且归并排序的时间复杂度可以说成标准的O(n log n) 下面我们就来看一下归并排序 我们先来看一下什么是归并排序 假设我…

Scratch蓝桥杯实战训练 —— 巧解“韩信点兵”难题的五种方式

“韩信点兵”蓝桥杯问题描述: “蓝桥杯”中有一道有趣的 Scratch 编程题,题目要求为:韩信点兵 扩展知识: 这道题叫“中国余数定理”,又叫“孙子定理”,也叫“韩信点兵问题”,是我国古代数学智慧…

Faster-RCNN代码解读3:制作自己的数据加载器

Faster-RCNN代码解读3:制作自己的数据加载器 前言 ​ 因为最近打算尝试一下Faster-RCNN的复现,不要多想,我还没有厉害到可以一个人复现所有代码。所以,是参考别人的代码,进行自己的解读。 ​ 代码来自于B站的UP主&…

Node【三】Buffer 与 Stream

文章目录🌟前言🌟Buffer🌟 Buffer结构🌟 什么时候用Buffer🌟 Buffer的转换🌟 Buffer使用🌟 创建Buffer🌟 字符串转Buffer🌟 Buffer转字符串🌟 拼接Buffer&…

python 理解BN、LN、IN、GN归一化、分析torch.nn.LayerNorm()和torch.var()工作原理

目录 前言: 简言之BN、LN、IN、GN等归一化的区别: 批量归一化(Batch Normalization,BN) 优点 缺点 计算过程 层归一化(Layer Normalization,LN) 优点 计算过程 总结 分析torch.nn.LayerNorm()工作原理 分析torch.var(…