RabbitMQ高频面试题整理

news2024/9/19 10:54:01

文章目录

  • 1、RabbitMQ如何保证消息不丢失
    • 1)confirm 消息确认机制 (生产者)
    • 2)消息持久化机制 (RabbitMQ 服务)
    • 3)ACK 事务机制(消费者)
  • 2、RabbitMQ 中有哪几种交换机类型?
    • 1) Direct Exchange
    • 2)Fanout Exchange
    • 3)Topic Exchange
    • 4)Headers Exchange
    • 5) Default Exchange
  • 3、什么是AMQP?
  • 4、RabbitMQ中如何解决消息堆积问题
    • 1)增加消费者的数量
    • 2)优化消费者的处理逻辑
    • 3) 使用消息预取(Prefetch)机制
    • 4)消息分发策略
    • 5)消息优先级队列
    • 6)分布式部署和集群化
    • 7) 流量控制
    • 8)延迟队列和死信队列
    • 9)监控和报警
  • 5、RabbitMQ 是如何实现死信队列的?
  • 6、RabbitMQ中如何保证消息不被重复消费

1、RabbitMQ如何保证消息不丢失

RabbitMQ 提供了相应的解决方案:

1)confirm 消息确认机制 (生产者)

confirm 模式是 RabbitMQ 提供的一种消息可靠性保障机制。当生产者通过 confirm 模式发送消息时,它会等待RabbitMQ 的确认,确保消息已经被正确地投递到了指定的 Exchange 中。

  • 消息正确投递到 queue 时,会返回 ack。
  • 消息没有正确投递到 queue 时,会返回 nack。如果 exchange 没有绑定 queue,也会出现消息丢失

使用方法:

  • 生产者通过 confirm.select 方法将 Channel 设置为 Confirm 模式。
  • 发送消息后,通过添加 add confirm listener 方法,监听消息的确认状态.

2)消息持久化机制 (RabbitMQ 服务)

持久化机制是指将消息存储到磁盘,以保证在 RabbitMQ 服务器宕机或重启时,消息不会丢失使用方法:

  • 生产者通过将消息的 delivery_mode 属性设置为 2,将消息标记为持久化。
  • 队列也需要进行持久化设置,确保队列在 RabbitMQ 服务器重启后仍然存在。经典队列需要将durable属性设置为true。而仲裁队列和流式队列默认必须持久化保存。

注意事项:

  • 持久化机制会影响性能,因此在需要确保消息不丢失的场景下使用。

3)ACK 事务机制(消费者)

ACK 事务机制用于确保消息被正确消费。当消息被消费者成功处理后,消费者发送确认 (ACK)给 RabbitMQ,告知消息可以被移除。这个过程是自动处理的,也可以关闭进行手工发送 ACK。

使用方法:

  • 在 RabbitMQ 中,ACK 机制默认是开启的。当消息被消费者接收后,会立即从队列中删除,除非消费者发生异常。
  • 可以手动开启 ACK 机制,通过将 auto_ack 参数设置为 False,手动控制消息的 ACK

注意事项:

  • ACK 机制可以确保消息不会被重复处理,但如果消费者发生异常或者未发送 ACK,消息可能会被重复投递

2、RabbitMQ 中有哪几种交换机类型?

RabbitMQ 提供了5种不同类型的交换机,每种交换机都有其特定的路由逻辑:

1) Direct Exchange

Direct Exchange 根据消息的路由键(Routing Key)精确地将消息路由到队列。

  • 路由规则:消息被路由到路由键完全匹配的队列。
  • 使用场景:需要将消息发送到特定队列的情况。
  • 例子:
channel.exchangeDeclare("directExchange", "direct");
channel.queueBind("queue1", "directExchange", "routingKey1");
channel.queueBind("queue2", "directExchange", "routingKey2");

// 发送消息
channel.basicPublish("directExchange", "routingKey1", null, "Message to queue1".getBytes());
channel.basicPublish("directExchange", "routingKey2", null, "Message to queue2".getBytes());

2)Fanout Exchange

Fanout Exchange 将消息广播到绑定到该交换机的所有队列。

  • 路由规则:消息会被路由到所有与该交换机绑定的队列。
  • 使用场景:广播消息到多个队列的情况。
  • 例子:
channel.exchangeDeclare("fanoutExchange", "fanout");
channel.queueBind("queue1", "fanoutExchange", "");
channel.queueBind("queue2", "fanoutExchange", "");

// 发送消息
channel.basicPublish("fanoutExchange", "", null, "Broadcast Message".getBytes());

3)Topic Exchange

Topic Exchange 根据消息的路由键模式(通常是带点号的字符串)将消息路由到匹配的队列。

  • 路由规则:路由键和绑定键(Binding Key)是点号分隔的字符串。绑定键可以包含两个特殊字符:*(匹配一个单词)和 #(匹配零个或多个单词)。
  • 使用场景:需要基于模式(如日志级别、地理位置等)路由消息的情况。
  • 例子:
channel.exchangeDeclare("topicExchange", "topic");
channel.queueBind("queue1", "topicExchange", "key1.*");
channel.queueBind("queue2", "topicExchange", "key2.#");

// 发送消息
channel.basicPublish("topicExchange", "key1.test", null, "Message to queue1".getBytes());
channel.basicPublish("topicExchange", "key2.test.sub", null, "Message to queue2".getBytes());

4)Headers Exchange

Headers Exchange 根据消息的头属性(Headers)进行路由。与其他交换机不同,Headers Exchange 不使用路由键。

  • 路由规则:消息的头属性必须与绑定的头属性完全匹配,才能将消息路由到相应的队列。
  • 使用场景:需要基于消息的多个属性进行复杂路由的情况。
  • 例子:
Map<String, Object> headers = new HashMap<>();
headers.put("header1", "value1");
headers.put("header2", "value2");

channel.exchangeDeclare("headersExchange", "headers");
channel.queueBind("queue1", "headersExchange", "", new AMQP.BasicProperties.Builder().headers(headers).build());

// 发送消息
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();
channel.basicPublish("headersExchange", "", props, "Message to queue1".getBytes());

5) Default Exchange

Default Exchange 是 RabbitMQ 内置的一个隐式交换机,每个队列在创建时会自动绑定到这个交换机上,路由键为队列的名称。

  • 路由规则:消息的路由键必须与队列名称完全匹配。
  • 使用场景:直接发送消息到指定队列的情况,不需要显式声明交换机。
  • 例子:
// 直接发送消息到名为 "queue1" 的队列
channel.basicPublish("", "queue1", null, "Message to queue1".getBytes());

综合使用:在实际应用中,可以根据需求选择合适的交换机类型,并结合多种类型的交换机进行复杂的消息路由和处理。

3、什么是AMQP?

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一种用于消息传递的开放标准协议,广泛用于消息队列和消息中间件系统中。RabbitMQ 是 AMQP 协议的一个实现。

AMQP 定义了一套标准的消息传递机制,包括以下几个核心组件:

  • Broker(代理)
    消息代理是消息队列服务器,负责接收、存储和转发消息。例如,RabbitMQ 就是一个 AMQP 消息代理。
  • Message(消息)
    消息是 AMQP 中的基本数据单元,包含要传递的数据和一些元数据(如路由键和头属性)。
  • Producer(生产者)
    生产者是发送消息到交换机的应用程序。生产者将消息发布到指定的交换机,而不是直接发送到队列。
  • Consumer(消费者)
    消费者是从队列中接收和处理消息的应用程序。
  • Exchange(交换机)
    交换机接收来自生产者的消息,并根据绑定规则将消息路由到一个或多个队列。AMQP 定义了几种不同类型的交换机,如 direct、fanout、topic 和 headers。
  • Queue(队列)
    队列存储来自交换机的消息,直到消费者接收并处理这些消息。队列是消息传递的终点。
  • Binding(绑定)
    绑定是交换机和队列之间的连接,定义了消息的路由规则。

AMQP 的工作原理:

AMQP 的消息传递流程可以概括为以下几个步骤:

  • 生产者将消息发送到交换机
  • 生产者将消息发送到指定的交换机,并指定路由键。
  • 交换机根据路由规则将消息发送到队列
  • 交换机会根据绑定的路由规则,将消息发送到一个或多个队列。
  • 消费者从队列中接收消息
  • 消费者从队列中拉取或推送接收到的消息,并进行处理。

4、RabbitMQ中如何解决消息堆积问题

在这里插入图片描述

1)增加消费者的数量

增加消费者的数量是解决消息堆积问题的直接方法。通过增加更多的消费者来并行处理消息,可以有效地提高消息的处理速度。

// 示例:启动多个消费者
for (int i = 0; i < 5; i++) {
    new Thread(() -> {
        // 消费者逻辑
    }).start();
}

2)优化消费者的处理逻辑

优化消费者的处理逻辑,减少每条消息的处理时间,从而提高整体处理效率。这可以包括:

  • 使用更高效的算法或数据结构。
  • 减少 I/O 操作,如数据库访问或网络请求。
  • 使用缓存机制,避免重复计算或访问。

3) 使用消息预取(Prefetch)机制

RabbitMQ 允许消费者设置预取值,控制消费者一次可以预取多少条消息。合理设置预取值可以确保消费者不会一次获取过多的消息,导致处理速度变慢。

channel.basicQos(10); // 设置预取值为 10

4)消息分发策略

使用合适的消息分发策略将消息均匀地分发到多个消费者。例如,可以使用轮询分发策略(Round-robin dispatching)来确保每个消费者都能公平地分配到消息。

5)消息优先级队列

使用消息优先级队列,确保高优先级的消息可以优先被处理,从而避免关键消息被低优先级消息淹没。

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("priority_queue", true, false, false, args);

// 发送带有优先级的消息
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
    .priority(5)
    .build();
channel.basicPublish("", "priority_queue", properties, message.getBytes());

6)分布式部署和集群化

将 RabbitMQ 部署在集群环境中,通过多个节点来分担消息处理压力。RabbitMQ 支持集群模式,可以水平扩展来处理大量消息。

7) 流量控制

使用 RabbitMQ 提供的流量控制机制(Flow Control)来限制生产者的消息发送速率,防止生产者过快地发送消息导致队列积压。

8)延迟队列和死信队列

使用延迟队列和死信队列处理无法立即处理的消息。通过设置消息的 TTL(Time To Live),可以将处理不了的消息重新入队,或转移到死信队列进行后续处理。

9)监控和报警

建立完善的监控和报警系统,及时发现和处理消息堆积问题。可以使用 RabbitMQ 提供的管理插件或第三方监控工具(如 Prometheus、Grafana)来监控队列长度、消费者数量等关键指标。

5、RabbitMQ 是如何实现死信队列的?

死信队列是 RabbitMQ 提供的一种特殊序列,处理那些无法被正常消费的消息。有三种情况会产生死信:

  • 消息被消费者明确拒绝
  • 消息达到预设的过期时间仍没有消费者消费
  • 消息由于队列已经达到最大长度限制而被丢弃

在 RabbitMQ 中,实现死信队列只需要给正常队列增加三个核心参数即可:

  • 1.dead-letter-exchange: 指定当前队列对应的死信队列
  • 2.dead-letter-routing-key:指定消息转入死信队列时的路由键
  • 3.message-ttl: 消息在队列中的过期时间。

接下来,就可以往正常队列中发送消息。如果消息满足了某些条件就会成为死信,并被重新发送到对应的死信队列中。而此时,RabbitMQ会在消息的头部添加一些与死信相关的补充信息,例如时间、成为死信的原因、原队列等。

应用程序可以按需处理这些补充的信息,最后,死信队列中的消息都是正常业务处理失败的消息,应用程序需要创建一个消费者来专门处理这些被遗漏的消息。例如记录日志、发送警报等。这样才能保证业务数据的完整性。

6、RabbitMQ中如何保证消息不被重复消费

什么情况会导致消息被重复消费呢?

  • 1.生产者: 生产者可能会重复推送一条数据到 MQ 中,比如 Controller 接口被重复调用了 2 次,没有做接口幂等性导致的;
  • 2.MQ: 在消费者消费完准备响应 ack 消息消费成功时,MQ 突然挂了,导致 MQ 以为消费者还未消费该条数据MQ 恢复后再次推送了该条消息,导致了重复消费。
  • 3.消费者: 消费者已经消费完消息,正准备但是还未响应给ack消息到时,此时消费者挂了,服务重启后 MQ 以为消费者还没有消费该消息,再次推送了该条消息。

解决方案:

1)消息确认机制

启用消息确认机制,最好是手动确定。确保消费者成功处理消息后才将消息从队列中删除。

  • 自动确认(Auto Ack):消息一旦被消费者接收,就会立即被确认,适用于消息处理非常快且可靠的场景。
  • 手动确认(Manual Ack):消息只有在消费者明确确认后才会被移出队列,适用于需要确保消息处理成功的场景。
// 手动确认消息
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        String message = new String(body, "UTF-8");
        try {
            // 处理消息
            System.out.println(" [x] Received '" + message + "'");
            // 确认消息已处理成功
            channel.basicAck(envelope.getDeliveryTag(), false);
        } catch (Exception e) {
            // 处理失败,消息未被确认,将被重新投递
            channel.basicNack(envelope.getDeliveryTag(), false, true);
        }
    }
});

2)消息重投机制
使用消息重投(Requeue)机制,确保处理失败的消息重新进入队列,供其他消费者再次处理。

catch (Exception e) {
    // 处理失败,消息未被确认,将被重新投递
    channel.basicNack(envelope.getDeliveryTag(), false, true);
}

3)消息幂等性设计
确保消费者在处理消息时具有幂等性,即无论相同的消息被处理多少次,结果都是一致的。这可以通过以下方法实现:

  • 使用唯一标识符(如消息ID)记录已经处理过的消息。
  • 对于数据库操作,使用唯一约束防止重复插入。
  • 对于外部系统调用,设计幂等接口。
// 在发送消息时设置唯一ID
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder()
    .messageId(UUID.randomUUID().toString())
    .build();
channel.basicPublish("", "my_queue", props, message.getBytes("UTF-8"));

综合案例:

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeoutException;

public class RabbitMQConsumer {

    private final static String QUEUE_NAME = "my_durable_queue";
    private static Set<String> processedMessageIds = new HashSet<>();

    public static void main(String[] argv) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        boolean durable = true;
        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        channel.basicQos(1); // 仅处理一个未确认的消息

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            String messageId = delivery.getProperties().getMessageId();

            if (processedMessageIds.contains(messageId)) {
                // 已处理过的消息,直接确认
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            } else {
                try {
                    // 处理消息
                    System.out.println(" [x] Received '" + message + "'");
                    // 记录已处理的消息ID
                    processedMessageIds.add(messageId);
                    // 确认消息已处理成功
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                } catch (Exception e) {
                    // 处理失败,消息未被确认,将被重新投递
                    channel.basicNack(delivery.getEnvelope().getDeliveryTag(), false, true);
                }
            }
        };

        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1815017.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

meilisearch的分页

Elasticsearch 做为老牌搜索引擎&#xff0c;功能基本满足&#xff0c;但复杂&#xff0c;重量级&#xff0c;适合大数据量。 MeiliSearch 设计目标针对数据在 500GB 左右的搜索需求&#xff0c;极快&#xff0c;单文件&#xff0c;超轻量。 所以&#xff0c;对于中小型项目来说…

细说MCU串口函数及使用printf函数实现串口发送数据的方法

目录 1、硬件及工程 2、串口相关的库函数 &#xff08;1&#xff09;串口中断服务函数&#xff1a; &#xff08;2&#xff09;串口接收回调函数&#xff1a; &#xff08;3&#xff09;串口接收中断配置函数&#xff1a; &#xff08;4&#xff09;非中断发送&#xff…

使用API有效率地管理Dynadot域名,列表形式查询已存在的文件夹信息

关于Dynadot Dynadot是通过ICANN认证的域名注册商&#xff0c;自2002年成立以来&#xff0c;服务于全球108个国家和地区的客户&#xff0c;为数以万计的客户提供简洁&#xff0c;优惠&#xff0c;安全的域名注册以及管理服务。 Dynadot平台操作教程索引&#xff08;包括域名邮…

2024年学习AI绘画是还有来得及吗?事实上看这篇就足够了aigc绘画入门基础篇

想要学好stable diffusion&#xff0c;学习资料很重要&#xff0c;本文就将常用的模型下载、提示词工具、学习资料网站进行&#xff0c;以及AI可以做的那些副业&#xff0c;汇总&#xff0c;以提升各位彦祖、亦非们的学习体验~ 一、简介 今天给大家分享Stable Diffusion模型存…

力扣42 接雨水

听说字节每人都会接雨水&#xff0c;我也要会哈哈哈 数据结构&#xff1a;数组 算法&#xff1a;核心是计算这一列接到多少雨水&#xff0c;它取决于它左边的最大值和右边的最大值&#xff0c;如下图第三根柱子能接到的雨水应该是第一根柱子高度和第五根柱子高度的最小值减去第…

DNS响应时间分析

目录 什么是DNS响应时间&#xff1f; 为什么DNS响应时间很重要&#xff1f; AnaTraf流量分析仪DNS分析 在当今数字化时代&#xff0c;网络的稳定性和性能对企业的运营至关重要。作为IT运维人员&#xff0c;我们的职责是确保网络顺畅运行&#xff0c;而DNS&#xff08;域名系…

我国喷砂机产量逐渐增长 金属加工为最大应用领域

我国喷砂机产量逐渐增长 金属加工为最大应用领域 喷砂是通过压缩空气作为动力形成高速喷射束&#xff0c;将粉状喷料高速喷射到需处理工件表面&#xff0c;使得工件外表面的外表发生变化&#xff0c;起到清理和粗化基体表面的作用。喷砂机是喷砂设备的核心组成部分&#xff0c;…

网站选择定制化的优缺点

网站定制化要明白的是&#xff0c;先有需求&#xff0c;然后在按照每一个需求去进行任务开发。 一.优点&#xff1a; 1.能够落实到每一个需求细节里面&#xff0c;可以很好的掌握需求的实现。 2.网站的所有使用权都在自己的手里&#xff0c;不需要第三方托管&#xff0…

Linux 防火墙 Firewall 和 Iptables 的使用

如果我们在Linux服务器的某个端口上运行了个服务&#xff0c;需要外网能访问到&#xff0c;就必须通过防火墙将服务运行端口给开启。Linux中有两种防火墙软件&#xff0c;CentOS7.0以上使用的是firewall&#xff0c;CentOS7.0以下使用的是iptables&#xff08;使用较少且不建议…

代码签名证书一年的价格是多少?如何申请

代码签名证书的价格因品牌、类型及所提供的服务等因素而有所不同&#xff0c;价格通常在数千元至万余元人民币之间不等。 不同类型代码签名证书价格差异 个人代码签名证书&#xff1a;个人代码签名证书是最基础的类型&#xff0c;适用于个体开发者&#xff0c;其价格较为经济…

通信原理眼图硬件实验

一、实验目的 1. 了解眼图与系统抗噪性能、码间干扰之间的关系及实际意义&#xff1b; 2. 掌握眼图观测的方法并记录研究&#xff1b; 二、实验内容 1. 观测ASK调制系统眼图并记录分析&#xff1b; 2. 观测FSK调制系统眼图并记录分析&#xff1b; 三、实验器材 1. 双踪示…

训练大模型自动在RAG和记忆间选择

现如今&#xff0c;检索增强生成(Retrieval-augmented generation&#xff0c;RAG)管道已经能够使得大语言模型(Large Language Models&#xff0c;LLM)在其响应环节中&#xff0c;充分利用外部的信息源了。不过&#xff0c;由于RAG应用会针对发送给LLM的每个请求&#xff0c;都…

RabbitMQ-Stream(高级详解)

文章目录 什么是流何时使用 RabbitMQ Stream&#xff1f;在 RabbitMQ 中使用流的其他方式基本使用Offset参数chunk Stream 插件服务端消息偏移量追踪示例 示例应用程序RabbitMQ 流 Java API概述环境创建具有所有默认值的环境使用 URI 创建环境创建具有多个 URI 的环境 启用 TLS…

C# WinForm ——31 32 Menustrip菜单栏

1. 介绍 菜单控件&#xff0c;包含多个菜单项的菜单容器 主菜单下面可以有子菜单&#xff0c;子菜单下面可以有下一级子菜单 2. 常用属性 属性解释(Name)控件ID&#xff0c;在代码里引用的时候会用到Enabled控件是否启用Dock定义要绑定到容器的控件边框&#xff0c;默认是t…

最短路:Bellman-Ford

最短路&#xff1a;Bellman-Ford 题目描述参考代码 题目描述 输入样例 3 3 1 1 2 1 2 3 1 1 3 3输出样例 3参考代码 #include <iostream> #include <cstring> #include <algorithm>using namespace std;const int N 510, M 10010;int n, m, k; int dist…

Master-Worker 架构的灰度发布难题

作者&#xff1a;石超 一、前言 Master-Worker 架构是成熟的分布式系统设计模式&#xff0c;具有集中控制、资源利用率高、容错简单等优点。我们数据中心内的几乎所有分布式系统都采用了这样的架构。 &#xfeff; 我们曾经发生过级联故障&#xff0c;造成了整个集群范围的服…

使用 C# 进行面向对象编程:第 9 部分

使用 OOP 的用户活动日志 应用程序背后的关键概念 在这一部分中&#xff0c;我们将使用之前学到的一些 OOP 概念。我们将创建一个小型应用程序。在继续之前&#xff0c;请阅读我的文章user-activity-log-using-C-Sharp-with-sql-server/。在本课程中&#xff0c;我们将再次使…

Python实现base64加密/解密

实现原理&#xff1a;导入base64库 一、加密 import base64# 加密 username "admin" base64_username base64.b64encode(username.encode(utf-8)).decode() print(base64_username) password "123" base64_password base64.b64encode(password.encod…

Vue23-过滤器

一、效果图 二、好用的时间戳三方工具 该三方工具比较大 推荐使用 dayjs的用法&#xff1a; 三、过滤器的使用 3-1、计算属性实现 3-2、methods函数实现 3-3、过滤器filters属性实现 过滤器的本质就是函数&#xff01;&#xff01;&#xff01; 1、过滤器-未传参 默认将管道…

MySQL常用命令(Linux环境)

一、数据定义语句(DDL) 数据库操作 ●登录数据库&#xff1a; mysql -uroot -proot ●创建数据库&#xff1a; create database test ●查看所有数据库&#xff1a; show databases ●选择数据库并使用&#xff1a; use test ●查看所有数据表&#xff1a; show tabl…