RabbitMQ_面试题01

news2024/11/17 2:44:47

文章目录

  • 1.RabbitMQ如何防止消息堆积
  • 2.RabbitMQ如何保证消息顺序消费
  • 3.RabbitMQ如何防止消息重复消费
  • 4.RabbitMQ如何保证消息可靠性
    • 4.1 消息持久化
    • 4.2 生产者确认
      • 2.2.1 application.yml
      • 2.2.2 Config
      • 2.2.3 Test
    • 4.3 消费者确认
      • 4.3.1 application.yml
      • 4.3.2 Test

1.RabbitMQ如何防止消息堆积

当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题。

解决消息堆积有两种思路:一是队列上绑定多个消费者,提高消费速度,也就是work工作模式;二是扩大队列容积,提高堆积上限

要提升队列容积,把消息保存在内存中显然是不行的,需要将消息保存到本地磁盘中,这时候就需要用到惰性队列了。
惰性队列的特征如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存
  • 支持数百万条的消息存储

要设置一个队列为惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可。
在这里插入图片描述

大量消息突然涌入导致积压临时处理方法

方案一:

  1. 新建一个或多个topic交换机,partition是原来的10倍,临时建立好原先10倍的queue数量。
  2. 然后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询将消费的消息临时写入建立好的10倍数量的queue中。
  3. 接着临时征用10倍或足够用的机器来部署consumer,每一批consumer消费一个临时queue的数据。这种做法相当于是临时将 queue 资源和consumer资源扩大10倍,以正常的10倍速度来消费数据。
  4. 修复原来的consumer问题,确保其恢复消费速度,重新用原先consumer机器来消费消息。

方案二:

前提:MQ中消息失效:假设你用的是RabbitMQ,RabbtiMQ是可以设置过期时间的,也就是 TTL。如果消息在queue中积压超过一定的时间就会被RabbitMQ给清理掉,这个数据就没了。那这就是第二个坑了,这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。

解决方法:批量重导。队列消息大量积压的时候,在consumer接收到消息标记后直接丢弃数据,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入mq里面去,把白天丢的数据给他补回来。假设1万个订单积压在mq里面,没有处理,其中 1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到mq里去再补一次。

2.RabbitMQ如何保证消息顺序消费

之所以要保证消息消费的顺序性是因为使用中间件消息队列后,假设当前是购物场景,用户下订单和支付订单是两种不同的业务数据,当放到两个队列中时,要保证下订单消息要在支付订单消息之前处理才不会出现异常,这就需要保证消费者消费消息的顺序性。
在这里插入图片描述

方案一

一个queue (消息队列)但是对应一个consumer(消费者),然后这个consumer(消费者)内部用内存队列做排队,然后分发给底层不同的worker来处理。
在这里插入图片描述

方案二

拆分多个queue(消息队列),每个queue(消息队列) 一个consumer(消费者),就是多一些queue(消息队列)而已。
在这里插入图片描述
不同队列中的消息消费顺序是没有保证的,例如:火车站检票的的时候,排了三个队伍,不同队伍之间不同确保谁先进站。

3.RabbitMQ如何防止消息重复消费

什么是消息的重复消费?

两个消费者消费了相同的数据。

为什么会重复消费?

消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除,但是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将消息分发给其他的消费者

解决方法

保证消息的唯一性,在写入消息队列的数据做唯一标示,消费消息时,根据唯一标识判断是否消费过,就算是多次传输,不要让消息的多次消费带来影响;保证消息等幂性;
在这里插入图片描述

其实MQ内部已经为我们做出了一些保障,在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列;
在消息消费时,要求消息体中必须要有一个 bizId(对于同一业务全局唯一,如支付ID、订单ID、帖子ID 等)作为去重的依据,避免同一条消息被重复消费。

4.RabbitMQ如何保证消息可靠性

消息的可靠性是指从 生产者发送消息 --》 消息队列存储消息 --》消费者消费消息 的整个过程中消息的安全性及可控性。

4.1 消息持久化

生产者确认可以确保消息投递到 RabbitMQ 的队列中,但是消息发送到 RabbitMQ 以后,如果宕机,也可能导致消息丢失。

要想确保消息在 RabbitMQ 中安全保存,必须开启消息持久化机制。

  • 交换机持久化
  • 队列持久化
    • 1.将queue的持久化标识durable设置为true,则代表是一个持久的队列
    • 2.发送消息的时候将deliveryMode=2这样设置以后,即使rabbitMQ挂了,重启后也能恢复数据
      在这里插入图片描述
  • 消息持久化

默认情况下,由SpringAMQP声明的交换机、队列、消息都是持久化的。
![在这里插入图片描述](https://img-blog.csdnimg.cn/ad954990187949cd851c9a38cf3d7764.png

4.2 生产者确认

生产者丢失消息:从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息;
transaction机制就是说:发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit())。然而,这种方式有个缺点:吞吐量下降;
confirm模式用的居多:一旦channel进入confirm模式,所有在该信道上发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后;rabbitMQ就会发送一个ACK给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了;如果rabbitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作

2.2.1 application.yml

spring:
  rabbitmq:
    publisher-confirm-type: correlated  #开启消息确认异步回调
    publisher-returns: true             #开启消息发送失败回调
    template:
      # 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
      mandatory: true
  • publish-confirm-type:开启 publisher-confirm,支持两种类型:

    • simple:同步等待 confirm 结果,直到超时
    • correlated:异步回调, 定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
  • publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback

  • template.mandatory:定义消息发送到交换机时失败的策略。true,则调用ReturnCallback;false:则直接丢弃消息

每个RabbitTemplate只能配置一个 ReturnCallback,因此需要在项目加载时配置:

2.2.2 Config

package com.bjpowernode.product.config;
import com.bjpowernode.product.entity.LocalMessage;
import com.bjpowernode.product.mapper.LocalMessageMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;

@Component
@Slf4j
public class RabbitTemplateConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnsCallback{

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 确认消息是否投递到交换机
     *
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            log.info("消息成功投递到交换机...");
        } else {
            log.error("消息未成功投递到交换机...");
        }
    }

    /**
     * 消息成功投递到交换机,向队列投递失败时调用
     * @param returned the returned message and metadata.
     */
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        log.error("消息未成功投递到队列...");
    }
}

2.2.3 Test

package com.bjpowernode.mq;

import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
class SpringAmqpTest02 {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void testDirectExchange() {
        rabbitTemplate
                .convertAndSend("amq.direct", "product.saveOrUpdate2", "新增或更新商品通知");
    }
}

4.3 消费者确认

消费者丢失消息:消费者丢数据一般是因为采用了自动确认消息模式,改为手动确认消息即可!

消费者在收到消息之后,处理消息之前,会自动回复RabbitMQ已收到消息;

如果这时处理消息失败,就会丢失该消息;

解决方案:处理消息成功后,手动回复确认消息。

4.3.1 application.yml

spring:
  rabbitmq:
    listener:
      simple:
        # manual:手动ack,需要在业务代码结束后,调用 api发送ack。
        # auto:自动ack,由 Spring 监测 Listener 代码是否出现异常,没有异常则返回 ack;抛出异常则返回 nack
        # none:关闭 ack,消息投递给消费者后立即从队列删除
        acknowledge-mode: manual #手动确认

4.3.2 Test

@RabbitListener(queues = "simple.queue")
public void basicQueueListener(String message, Channel channel, Message msg) throws IOException {
    try {
        int i = 100 / 0;
        System.out.println("消费者接收到消息:" + message);
        // 手动ACK
        channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
    }  catch (Exception e) {
        // TODO 添加额外的处理逻辑
        //b:是否允许多条处理  b1:是否重新回到队列
        // 返回 nack,从队列删除该消息
        channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/546888.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OPT (奥普特)锂电池视觉检测技术精彩亮相CIBF

5月16~18日,第十五届中国国际电池技术展览会在深圳举办,全球2500多家优秀电池企业参展。 OPT(奥普特)作为锂电行业机器视觉核心供应商,携3D、深度学习、分频技术等视觉检测技术亮相,并展示了上…

chatgpt赋能Python-python3免费吗

Python3免费吗? Python3到底免费还是收费呢?这是一个被许多人关注和疑惑的问题。本文将从不同方面解答这个问题,希望能给你提供一个清晰的认识。 什么是Python3? Python3是一种通用、高级、解释型的编程语言。它是由Guido van …

【Linux初阶】fork进程创建 进程终止 进程等待

🌟hello,各位读者大大们你们好呀🌟 🍭🍭系列专栏:【Linux初阶】 ✒️✒️本篇内容:fork进程创建,理解fork返回值和常规用法,进程终止(退出码、退出场景、退出…

第08章_聚合函数

第08章_聚合函数 我们上一章讲到了 SQL 单行函数。实际上 SQL 函数还有一类,叫做聚合(或聚集、分组)函数,它是对一组数据进行汇总的函数,输入的是一组数据的集合,输出的是单个值。 1. 聚合函数介绍 什么是…

【sentinel】Sentinel工作主流程以流控规则源码分析

Sentinel工作主流程 在Sentinel里面,所有的资源都对应一个资源名称(resourceName),每次资源调用都会创建一个Entry对象。Entry可以通过对主流框架的适配自动创建,也可以通过注解的方式或调用SphU API显式创建。Entry创…

跨境seo引流的13种方法

跨境SEO引流是一种通过搜索引擎优化来吸引国际目标受众并增加网站流量的策略。以下是一些跨境SEO引流的关键步骤和技巧: 目标受众研究:了解你的目标市场和受众群体。了解他们的需求、喜好、购买习惯以及使用的搜索引擎和关键词。这将帮助你确定你的跨境S…

chatgpt赋能Python-python3怎么合并列表

Python3:合并列表的不同方法 如果你正在使用Python 3,那么你很可能会面对合并列表的问题。合并列表(也称为连接列表或串联列表)是将两个或多个列表组合成一个列表的过程,这是在编程中很常见的任务。在这篇文章里&…

Python并发编程:异步编程和多线程技术的应用和效率优化

第一章:介绍 在当今的软件开发领域,高效的并发编程对于处理大规模数据和提升系统性能至关重要。Python作为一种简洁、易读且功能强大的编程语言,提供了多种并发编程的技术和工具。本文将深入探讨Python中的异步编程和多线程技术,…

chatgpt赋能Python-python3如何画图

Python3如何画图? Python是一种高级编程语言,它有着多种用途,包括数据分析和可视化。Python3是Python的最新版本,它具有更好的性能和易用性。在这篇文章中,我们将介绍如何使用Python3来画图,并探讨其优势和…

KingbaseES 逻辑读与物理读

oracle数据库中逻辑读,物理读 数据访问方式:数据库少不了和操作系统进行数据交互,表数据最好的方式是从数据库共享池中访问到,避免发生磁盘IO,当然如果共享池中没有访问到数据就难免发生磁盘IO。 物理读:从…

第三篇、Arduino uno、nano、2560用oled0.96寸屏幕显示dht11温湿度传感器的温度和湿度信息——结果导向

0、结果 说明:先来看看拍摄的显示结果,如果是你想要的,可以接着往下看。 1、外观 说明:本次使用的oled是0.96寸的,别的规格的屏幕不一定适用本教程,一般而言有显示白色、蓝色和蓝黄一起显示的&#xff0…

【小沐学Web】Node实现Web图表功能(ECharts.js,React)

🎈🎈🎈Python实现Web图表功能系列:🎈🎈🎈1🎈【Web开发】Python实现Web图表功能(D-Tale入门)🎈2🎈【Web开发】Python实现Web图表功能&a…

Fragment 要你何用?2.0版本

前言 在之前的文章里有分析过Activity、Fragment、View之间的关联,也简单分析了Fragment的原理。 本篇将对Fragment被高频使用的场景以及一些坑点作分析,通过本篇文章,你将了解到: 老生常谈:为什么需要Fragment?Frag…

Java 创建一个大文件

有时候,我们在对文件进行测试的时候,可能需要创建一个临时的大文件。 那么问题来了,在 Java 中如何创建大文件呢? 问题和解决 有些人想到的办法就是定义一个随机的字符串,然后重复很多次,然后将这个字符…

第一篇:强化学习基本原理通俗介绍

你好,我是zhenguo(郭震) 今天强化学习第一篇:白话介绍强化学习的基本原理 强化学习是一种机器学习方法,旨在让智能体(agent)通过与环境的交互学习如何做出最优的行动选择以获得最大的累积奖励。…

Rust每日一练(Leetday0004) 正则表达、盛水容器、转罗马数字

目录 10. 正则表达式匹配 Regular Expression Matching 🌟🌟🌟 11. 盛最多水的容器 Container with most water 🌟🌟 12. 整数转罗马数字 Integer to Roman 🌟🌟 🌟 每日一练…

new和delete用法详解

本篇文章对C中的new和delete进行详解。在讲解new和delete时,我们会对比C语言中的malloc和free,看看两者的区别和相似之点。希望本篇文章会对你有所帮助。 文章目录 一、什么是new和delete 二、new和delete的用法 2、1 new和delete操作内置类型 2、2 new和…

中青宝两大议案被否!散户又“赢了”?

21.93万股,就能决定股东大会上的议案成败——离奇的一幕在中青宝上演。 5月18日,中青宝召开2022年度股东大会。会上,《关于2023年度日常关联交易预计的议案》《关于非独立董事2023年度薪酬方案的议案》两项议案被否。 此次股东大会上&#x…

linux设置静态ip与windows互相ping通、设置静态ip之后不能联网和网络服务重启失败的问题

1.虚拟机linux设置静态ip与windows互相ping通及设置静态ip之后不能联网问题一站式解决: 转载:https://www.codenong.com/cs105332412/ 2.遇到网络服务重启失败的问题 按照提示查看网络服务的状态 看到这篇博文https://www.cyberithub.com/failed-to-s…

Ae 效果详解:Keylight(1.2)

Ae菜单:效果/Keying/Keylight(1.2) Effects/Keying/Keylight(1.2) Keylight 是一款工业级的蓝幕或绿幕键控器,核心算法由 Computer Film 公司开发,并由 The Foundry 公司进一步开发移植到 Ae。 Keylight 在制作专业品质的抠像效果方面表现出色…