互联网全景消息(2)之RabbitMq高阶使用

news2024/9/23 9:33:56

一、RabbitMQ消息可靠性保障

        消息的可靠性投递是使用消息中间件不可避免的问题,不管是Kafka、rocketMQ或者是rabbitMQ,那么在RabbitMQ中如何保障消息的可靠性呢?

        首先来看一下rabbitMQ的 架构图:

        首先从图里我们可以看到,消息投递的保障性主要从三个方面来解决:

  • 生产者;
  • Broker;
  • 消费者; 

1.1 生产者保障 

        生产者发送消息到broker时,要保障消息的可靠性,主要方案有以下两种:

  1. 生产者确认;
  2. 失败通知; 

         首先RabbitMQ生产者通过制定一个Exchange和routingkey把消息送达到某个队列中,然后消费者监听队列进行消费处理。但是在某些情况下,如果我们在发送消息,当前的exchange不存在或者指定的routingkey找不到对应的队列,这个时候如果要监听这种不可达的消息,就需要失败通知了。

1.1.1 交换器、队列、路由健的关系

        队列通过路由健(routingkey,某种规则)绑定到交换器中,生产者将消息发布到交换器中,交换器根据绑定的routingkey将消息路由到指定队列中,然后由订阅这个队列的消费者进行监听消费。

 

        此时就会存在一个问题,消息路由到了不存在的队列怎么办?一般情况下RabbitMQ会直接忽略,当这个消息不存在,也就是消息丢弃了。

        所以在不做任何配置的情况下,生产者是不知道消息是否真正达到rabbitMQ,也就是说消息发布不会返回任何消息给生产者。

1.1.2 失败通知 

        那如何保证我们消息发布的可靠性,这里我们就可以启动失败通知,在原生的编码中可以在发送消息的时候设置Mandatory,即可开启故障检测模式。

        注意:他只会让RabbitMQ向你通知失败,而不会通知成功,如果消息正确的路由到队列,则发布者不会收到任何通知。带来的问题就是无法确保发布消息一定是成功的,因为通知失败的消息可能会丢失。

1.1.2.1 实现方式

        spring配置方式:

spring:

        rabbitmq:

                # 消息在未被队列收到的情况下返回

                publisher-returns: true

         关键代码,注意需要发送者实现 ReturnCallback 接口方可实现失败通知

 

1.1.2.2 存在的问题 

        如果消息正确路由到队列,则发布者不会收到任何通知。带来的问题就是无法确保消息一定是成功的,因为通知失败的消息可能会丢失。

        这样子我们可以使用RabbitMQ的发送方确认来实现,它不仅仅在路由失败的时候给我们发送消息,并且能够在路由成功的时候也给我们发送消息。

 1.1.3 发送方确认

        发送方确认是指生产者在投递消息后,如果Broker接收到消息,则会给生产者一个应答。生产者进行接收应答,用来确认这条消息是否正常的发送到Broker,这种方式也是可靠消息投递的核心保证。 

        rabbitMQ消息发送分为两个阶段:

  • 将消息发送到broker,即发送到exchange交换机;
  • 消息通过exchange被路由到队列; 

        一旦消息投递到队列,队列则会向生产者发送一个通知,如果队列设置了消息持久到磁盘,则会等待消息持久化到磁盘之后再发送通知。

        注意:发送者确认只有出现RabbitMQ内部错误才会出现发送者确认失败。 

        在发送者确认这种模式也可以分为具体两种情况来看待:

  1. 队列不可路由;
  2. 队列可路由; 
1.1.3.1 队列不可路由 

        当前的消息达到交换器之后,对于发送者确认是成功的。因为此时的消息已经到达了broker,此时只是不可路由队列他认为是成功的。

 

        首先RabbitMQ交换器不可路由时,消息也根本 不会投递到队列中,所以这里他只管到交换器这里,当消息成功到达交换器后,就会进行确认操作。 

        另外在这过程中,生产者收到了确认之后,那么因为消息不可路由,所以该消息也是无效的相当于被抛弃了,无法到达队列,所以一般这里会结合失败通知来一同使用,这里一般会进行mandatory模式,失败则会调用addReturnListener监听器来处理。

1.1.3.2 队列可以路由

        只要消息能够到达队列即可进行确认,一般是RabbitMQ发生内部错误才会出现确认失败的情况; 

         

        可以路由的消息,要等到被投递到所有匹配的队列之后,broker会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达队列了。

        如果消息和队列是可持久化的,那么消息会在将消息写入磁盘之后发出,broker回传给生产者的确认小学中delivery-tag包含了确认消息的序列号。

1.1.3.3 使用方式

        Spring配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated

         关键代码,注意需要发送者实现 ConfirmCallback 接口方可实现失败通知:

 1.1.4 broker丢失消息

        前面我们从生产者的角度分析了消息可靠性传输的原理和实现,接下来就要看下broker是如何保障消息的可靠性传输的。

        假设生产者已经成功将消息发送到了交换机,并且交换机也成功的将消息路由到队列中,但是此时消费者还没有进行消费的时候,mq挂掉了,那么重启之后消息就会不存在,那样子就不能保障消息的可靠性 传输了。

        所以此时就要开启RabbitMQ的持久化,也就是将消息持久化到磁盘,此时即使MQ挂掉了,重启之后也会自动读取之前存储的数据。

1.1.4.1 持久化队列 

         在spring开启一个持久化队列。

  @Configuration
   public class RabbitConfig {

       public static final String DURABLE_QUEUE_NAME = "durable_queue";

       @Bean
       public Queue durableQueue() {
           // 创建一个持久化的队列
           return new Queue(DURABLE_QUEUE_NAME, true); // 第二个参数为true表示队列持久化
       }
   }
1.1.4.2 持久化交换器

@Configuration
public class RabbitConfig {

    public static final String DURABLE_EXCHANGE_NAME = "durable_exchange";
    public static final String DURABLE_QUEUE_NAME = "durable_queue";
    public static final String ROUTING_KEY = "durable_routing_key";

    @Bean
    public DirectExchange durableExchange() {
        // 创建一个持久化的Direct Exchange
        return new DirectExchange(DURABLE_EXCHANGE_NAME, true, false);
    }

    @Bean
    public Queue durableQueue() {
        // 创建一个持久化的队列
        return new Queue(DURABLE_QUEUE_NAME, true); // 第二个参数为true表示队列持久化
    }

    @Bean
    public Binding binding(Queue durableQueue, DirectExchange durableExchange) {
        // 绑定队列到交换器
        return BindingBuilder.bind(durableQueue).to(durableExchange).with(ROUTING_KEY);
    }
}
 1.1.4.3 发送持久化消息

         在发送消息的时候,需要设置属性deliveryMode=2,表示发送的是一个持久化消息,需要注意的是在springboot中,发送消息时已经自动设置了deliveryMode为2,不需要人工再去设置一遍。

@Component
public class MessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendPersistentMessage(String messageContent) {
        // 创建消息属性,并设置为持久化
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);

        // 创建消息
        Message message = new Message(messageContent.getBytes(), messageProperties);

        // 发送消息到指定的交换器
        rabbitTemplate.convertAndSend(RabbitConfig.DURABLE_EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message);
        System.out.println("Sent message: " + messageContent);
    }
}

 1.1.5 总结

        生产者以及Broker要保障消息传递的可靠性如果结合失败通知以及发送方确认和持久化消息来实现。

1.发送方确认:保障消息能够到达broker;

2.失败通知:保障的是消息能够成功路由到队列;

3.持久化队列:保障消息的持久化;

1.2 消费者消息可靠性 

        消费者接收到消息,但是还未处理或者还未处理完成,此时消费者进程挂了,比如重启或者异常中断,此时mq会认为消费者已经完成消息消费,就会从队列中删除消息,从而导致消息丢失。 

        那该如何避免这种情况呢?这就要使用到RabbitMQ提供的ack机制,RabbitMQ默认是自动ack的,此时需要将其修改为手动ack,也就是自己的程序确定消息是否已经处理完成。如果此时出现消息未处理完成进程挂掉的情况,由于没有提交ack,rabbitMQ就不会删除这条消息,而是会把这条消息发送给其他消费者处理,但是消息不回丢失。 

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

        acknowledge-mode: manual代表开启手动ack,该配置项的其他两个参数值为none和auto;

  • auto:消费者根据程序执行的正常或者抛出异常来决定是抛出ack或者nack;
  • munual:手动ack,用户必须手动提交ack或者nack;
  • none:没有ack机制; 

        默认值是none,如果将ack的模式设置auto,此时如果消费者执行异常的话,就相当于执行了nack方法,消息会被放置到队列的头部,消息会被无限期的执行,从而导致后续消息无法执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2090664.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python发现是anaconda的,而不是原来的编译环境

发现有三个python编译器。 可以检查一下环境变量,把原来的python编译器版本上移到anaconda的python编译器之前。这样每次在终端使用python命令就是原来的python编译器版本了

基于Docker搭建Graylog分布式日志采集系统

文章目录 一、简介二、Graylog1、主要特点2、组件3、工作流程介绍4、使用场景 三、Graylog 安装部署1、 安装 docker2、安装docker compose3、 安装graylog4、Graylog控制台 四、springboot集成Graylog 一、简介 Graylog是一个开源的日志管理工具,主要功能包括日志…

c++中的匿名对象及内存管理及模版初阶

c中的匿名对象 A a;//a的生命周期在整个main函数中 a.Sum(1); //匿名对象生命周期只有一行,只有这一行会创建对象,出了这一行就会调析构 A().Sum(1);//只有这一行需要这个对象,其他地方不需要。 return 0; 日期到天数的转换 计算日期到天数转换_牛客…

解读GaussianTalker:利用音频驱动的基于3D高斯点染技术的实时高保真讲话头像合成

单位:首尔大学 项目地址:https://ku-cvlab.github.io/GaussianTalker/ github:https://github.com/KU-CVLAB/gaussiantalker 本文是对GaussianTalker的解读,欢迎大家阅读指正! 目录 前言摘要一、背景介绍二 相关工作三…

拼多多Temu半托管和全托管对比

根据东吴证券报告显示,与全托管相比,半托管给予商家更灵活的物流选择,允许商家自行负责仓配物流;与传统3P模式相比,半托管仍保留平台核价、平台负责营销售后客服等特点。 Temu 最开始采用全托管模式,但随着…

【C++ Primer Plus习题】7.6

问题: 解答: #include <iostream> using namespace std;#define SIZE 20int Fill_array(double* arr, int len) {int i 0;for (i 0; i < len; i){cout << "请输入值:";cin >> arr[i];if (cin.fail()){cout << "输入非法数字,结束…

.NET中分布式服务

单体架构 特点&#xff1a; 所有的功能集成在一个项目工程中。所有的功能打在一个安装包。 优点&#xff1a; 项目架构简单。开发效率高。容易打包。 缺点&#xff1a; 全部功能集成在一个工程中&#xff0c;如果要更新&#xff0c; 所有的都要重新发布版本迭代速度逐渐变…

C#笔记4 详细解释事件及其原型、匿名方法和委托的关系

匿名方法 定义 匿名方法允许一个与委托关联的代码被内联的写入使用委托的位置。 语法形式 delegate(参数列表) {代码块 } 前文说过&#xff0c;委托是定义了一个公司&#xff0c;公司专门承接某一类型的任务。 委托的实例化就是公司把任务交给了具体的职员&#xff08;方…

【React】从零开始搭建 react 项目(初始化+路由)

创建 React 项目 创建项目的方式&#xff1a;create-react-app 项目名称 如果没有安装 react 脚手架&#xff0c;请先安装 npm isntall create-react-app -g安装成功后&#xff0c;开始配置项目 配置项目的 icon 和标题 配置 jsconfig.json 目的&#xff1a;用于配置 Java…

Google Earth Engine(GEE)——在 CloudCompare软件 中处理地面激光扫描 (TLS) 数据

背景和目的 本实验的目标是熟悉 3D 点云数据。我们将使用上周在 Boab 法院校园收集的数据。我们使用 Leica BLK360 激光扫描仪收集了多次扫描,今天我们将处理其中的一些扫描。 可以在此处以 .las 格式下载扫描数据(请注意,每个文件约为 1GB):这个是链接 https://www.dro…

批量进行Mysql数据处理的一项工作记录以及保存一个nginx变量大全

一、批量进行Mysql数据处理的一项工作记录 在使用SQL执行一起数据批量处理的时候遇到执行数速度非常慢。表temp_users是一个包含百万级的用户ID表&#xff0c;表user_list是一个亿级的表&#xff0c;因为跨库&#xff0c;这里使用的是federated引擎创建的结构表。根据要实现的目…

FPGA实现SDI视频H265压缩网络推流输出,基于VCU架构,支持12G-SDI 4K60帧,提供工程源码和技术支持

目录 1、前言工程概述免责声明 2、相关方案推荐我这里已有的视频图像编解码方案本博已有的 SDI 编解码方案 3、详细设计方案设计框图FPGA开发板视频输入SDI硬件均衡器LMH1219UHD-SDI GT SDI视频解串SMPTE UHD-SDI RX SUBSYSTEM SDI视频解码Video Frame Buffer WriteZynq UltraS…

大模型时代下,软件检测行业将如何发展?

大模型时代&#xff0c;软件测试面临着前所未有的机遇和挑战&#xff0c;各类产品测试领域将如何应对技术发展和时代变化&#xff1f; 2024年8月28日晚八点&#xff0c;安畅检测首席专家李龙与腾讯Tech Lead茹炳晟、中电金信质量团队负责人王壮做客人民邮电出版社创办的IT专业…

springboot集成guava布隆过滤器

1.创建springboot项目&#xff0c;引入maven依赖 <dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>23.0</version></dependency>2.创建guava布隆过滤器 Component public class Gua…

DexclassLoader读取dex在Android14上遇到问题

报错如下&#xff1a; 在Android 14&#xff08;API 级别 34&#xff09;及以后版本中&#xff0c;DexClassLoader 被进一步限制&#xff0c;只能用于加载只读文件中的代码。这意味着你不能再使用 DexClassLoader 来加载从应用的内部存储空间中读取的文件。 我想通过JNI来修改…

Elasticsearch 8.13.4 LocalDateTime类型转换问题

框架背景 springboot 3.3.1elasticseach8.13.4spring-data-elasticsearch5.3.1(其实只要用了springboot3.3.1 上下两个的版本都在里面绑死了) 问题描述 使用spring-data-elasticsearch操作es&#xff0c;当字段增加映射注解,其实如果是日期类型&#xff0c;你不加默认也给你…

计算机网络概述(协议层次与服务模型)

目录 1.协议层次 2.服务模型 1.协议层次 层次化方式实现复杂网络功能&#xff1a; 将网络复杂的功能分成明确的层次&#xff0c;每一层实现了其中一个或一组功能&#xff0c;功能中有其上层可以使用的功能&#xff1a;服务本层协议实体相互交互执行本层的协议动作&#xff0…

C++(Qt)-GIS开发-QGraphicsView显示在线瓦片地图

C(Qt)-GIS开发-QGraphicsView显示在线瓦片地图 文章目录 C(Qt)-GIS开发-QGraphicsView显示在线瓦片地图1、概述2、实现效果3、主要代码4、源码地址 更多精彩内容&#x1f449;个人内容分类汇总 &#x1f448;&#x1f449;GIS开发 &#x1f448; 1、概述 支持加载显示在线瓦片…

TD学习笔记————中级教程总结(NEW)

目录 Instance功能讲解 问题&#xff1a; 报错All ops must generate the same number of instances (have the same length Replicator功能讲解 问题&#xff1a; 视频分辨率过大 Cannot find function named:onValueChange Instance功能讲解 数据通道的长度要一致 N…

redroid搭建云手机学习笔记(一)

参考链接 通过Redroid搭建自己的云手机 docker安装 docker官网目前打不开了&#xff0c;通过官网安装的方式无法实现&#xff0c;这里需要借助镜像网站来实现docker的安装 参考链接&#xff1a;https://developer.aliyun.com/mirror/docker-ce # step 1: 安装必要的一些系统…