消息中间件 - RabbitMQ篇之入门及进阶

news2024/11/22 17:38:46

这里写自定义目录标题

  • 一. RabbitMQ简介
    • 1.1. 消息中间件
      • 1.1.1.什么是消息中间件
      • 1.1.2.消息中间件的传递模式
        • 1. 1.2.1 点对点
        • 1. 1.2.2 发布订阅模式
      • 1.1.3 消息中间件种类
      • 1.1.4 消息中间件的作用
  • 2. RabbitMQ介绍
    • 2.1.RabbitMQ的起源
    • 2.2.RabbitMQ的安装及简单使用
  • 3. RabbitMQ的简单使用
    • 3.1 生产者Demo:
    • 3.2 消费者Demo:
  • 二. RabbitMQ入门
    • 2.1 相关概念
    • 2.2 connection 和 channel 详解
    • 2.3 交换器类型
      • 2.3.1 fanout
      • 2.3.2 direct
      • 2.3.3 Topic
      • 2.3.4 headers
    • 2.4 RabbitMQ运转流程
      • 2.4.1 RabbitMQ 生产者运转流程
      • 2.4.2 RabbitMQ 消费者运转流程
    • 2.5 AMQP协议介绍
  • 三. RabbitMQ进阶
    • 3.1 RabbitMQ 的两种模式
      • 3.1.1 推模式
      • 3.1.2 拉模式
      • 3.1.4 Qos相关内容:
    • 3.2 RabbitMQ 消息确认与拒绝
      • 3.2.1 消息的确认
      • 3.2.2 消息的拒绝
    • 3.3 mandatory 和 immediate 参数
      • 3.3.1 mandatory
      • 3.3.2 immediate
    • 3.4 备份交换器
    • 3.4 过期时间 (TTL)
      • 3.4.1 给队列设置TTL, 设置完队列中所有消息都有相同的过期时间
      • 3.4.2 给某一条消息设置TTL
    • 3.5 死信队列 (DLX)
    • 3.6 延迟队列
  • 四 RabbitMQ的一些常见问题
    • 4.1 如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?
      • 4.1.1 发送方确认模式
      • 4.1.2 接收方确认机制
    • 4.2 如何避免消息重复投递或重复消费?
    • 4.3 消息如何分发?
    • 4.4 如何确保消息不丢失?
      • 4.4.1 生产者丢失消息
      • 4.4.2、持久化
    • 4.5 RabbitMQ 消息积压
    • 4.6 顺序消费

一. RabbitMQ简介

1.1. 消息中间件

1.1.1.什么是消息中间件

消息队列中间件(Message Queue Middleware, 简称MQ)是指利用高效可靠的消息传递机制运行与平台无关的数据交流,并给予数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,他可以在分布式环境下扩展进程间的通信

1.1.2.消息中间件的传递模式

1. 1.2.1 点对点

点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在是的消息的异步传输成为可能。

1. 1.2.2 发布订阅模式

发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),topic可以认为是消息传递的中介,消息发布者将消息发布到某个topic,而消息订阅者则从topic中订阅消息。topic使得消息的订阅者和消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。

1.1.3 消息中间件种类

RabbitMQ,Kafka,ActivityMQ,RocketMQ

1.1.4 消息中间件的作用

解耦

冗余(存储):有些情况下,处理数据的过程会失败,消息中间件可以把数据持久化存储,直到完全处理,避免消息丢失

削峰:流量削峰

可恢复性:当一部分组件失效时,不会影响整个系统

顺序保证:在大多数场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性

缓冲:在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过一个缓冲层来帮助任务最高效率的执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。

异步通信

2. RabbitMQ介绍

2.1.RabbitMQ的起源

RabbitMQ是采用Erlang语言实现AMQP协议(Advanced Message Queuing Protocol, 高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。
MSMQ(微软) -> JMS(ActiveMQ) -> AMQP(RabbitMQ)

2.2.RabbitMQ的安装及简单使用

我直接用的HomeBrew安装
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

3. RabbitMQ的简单使用

3.1 生产者Demo:

package com.demo.amqp.rabbitmqdemo.messagebug.producer;
 
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
 
import java.io.IOException;
import java.util.concurrent.TimeoutException;
 
/**
 * @Author jiangtao
 * @Desc 生产者
 * @Date 2022-09-05 21:07
 **/
public class RabbitProducer {
 
    private static final String EXCHANGE_NAME = "exchange_demo";
    private static final String ROUTING_KEY = "routing_key_demo";
    private static final String QUEUE_NAME = "queue_demo";
    private static final String IP_ADDRESS = "127.0.0.1";
    private static final int PORT = 5672;
 
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 创建连接
        Connection connection = factory.newConnection();
        // 创建信道
        Channel channel = connection.createChannel();
        // 创建一个type = direct 持久化 非自动删除的交换器
        channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
        // 创建一个持久化 非排他 非自动删除的队列
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
        // 将交换器和队列通过路由键进行绑定
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
        // 发送一条消息 Hello World
        String message = "Hello World";
        // 发送消息
        channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
        channel.close();
        connection.close();
    }
 
}

3.2 消费者Demo:

package com.demo.amqp.rabbitmqdemo.messagebus.consumer;
 
import com.rabbitmq.client.*;
 
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
 
/**
 * @Author jiangtao
 * @Desc
 * @Date 2022-09-05 21:34
 **/
public class RabbitConsumer {
 
    private static final String QUEUE_NAME = "queue_topic01";
    private static final String IP_ADDRESS = "127.0.0.1";
    private static final int PORT = 5672;
 
    public static void main(String[] args) throws IOException, TimeoutException {
        Address[] addresses = new Address[]{
          new Address(IP_ADDRESS, PORT)
        };
 
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection(addresses);
        final Channel channel = connection.createChannel();
        // 设置客户端最多接收未被Ack的消息的个数
        channel.basicQos(64);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recv message: " + new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        channel.basicConsume(QUEUE_NAME, consumer);
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        channel.close();
        connection.close();
    }
 
}

二. RabbitMQ入门

 RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。

2.1 相关概念

生产者

消费者

Broker:消息中间件的服务节点

Queue:队列,是RabbitMQ的内部对象,用于存储消息。

Exchange:交换器

RoutinfKey:路由键

Binding:绑定

BIndingKey:绑定键

在这里插入图片描述

2.2 connection 和 channel 详解

我们知道无论是生产者还是消费者,都需要和RabbitMQ Broker 建立连接,这个连接就是一条TCP连接,也就是Connection。

一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。

信道是建立在Connection 之上的虚拟连接,RabbitMQ 处理的每条AMQP指令都是通过信道完成的。

在这里插入图片描述
我们完全可以使用Connection 就能完成信道的工作,为什么还要引入信道呢?

   试想这样一个场景,一个应用程序中有很多个线程需要从RabbitMQ 中消费消息,或者生产消息,那么必须需要建立很多个Connection,也就是多个TCP连接。

   然而对于操作系统而言,建立和销毁TCP连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。

   RabbitMQ 采用类似NIO(Non-blocking I/O)的做法,选择TCP连接复用,不仅可以减少性能开销,同时也便于管理。

名词解释:
NIO,也成非阻塞I/O,包含三大核心部分:Channel(信道),Buffer(缓冲区)和Selector(选择器)
NIO 基于 Channel 和 Buffer 尽心操作,数据总是从信道读取到缓冲区中,或者从缓冲区写入到信道中。
Selector 用于监听多个信道的时间(比如连接打开。数据到达等)。因此,单线程可以监听多个数据的信道。

每个线程把持一个信道,所以信道复用了Connction 的 TCP连接。同时RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大的时候,复用单一的Connecion 可以在产生性能瓶颈的情况下有效的节省 TCP 连接资源。但是信道本身的流量很大的时候,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个 Connection ,将这些信道均摊到这些Connection 中。

信道在AMQP中是一个很重要的概念,大多数操作都是在信道这个层面展开的。

比如 channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法。

RabbitMQ 相关的API与AMQP紧密相连,比如 channel.basicPublish 对应AMQP 的 Basic.Publish 命令。

2.3 交换器类型

fanout 、direct、topic、headers

2.3.1 fanout

它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。

2.3.2 direct

RoutingKey和BindingKey完全匹配
在这里插入图片描述
RoutingKey 为 warning 时 会路由到Queue 1 和 Queue 2
RoutingKey 为 error 时 会路由到Queue 1
RoutingKey 为 info和debug 时 会路由到Queue 2

2.3.3 Topic

RoutingKey和BindingKey模糊匹配

RoutingKey为一个点号 “.” 分割的字符串(被点号“.”分隔开的每一段独立的字符串成为一个单词),如“com.rabbitmq.client”, “java.util.concurrent”, “com.hidden.client”
BindingKey 和 RoutingKey 一样也是点号“.” 分隔开的字符串
BinddingKey 中可以存在两种特殊的字符串 “”, “#”,用于做模糊匹配,其中 “”用于匹配一个单词,“#”用于匹配多规格单词(可以是零个)

在这里插入图片描述
RoutingKey 为 com.rabbitmq.client 时 会路由到Queue 1 和 Queue 2
RoutingKey 为 com.hidden.client 时 会路由到 Queue 2
RoutingKey 为 com.hidden.demo 时 会路由到 Queue 2
RoutingKey 为 java.rabbitmq.client 时 会路由到 Queue 1
RoutingKey 为 java.util.concurrent 时 消息会被丢弃或者返回给生产者(需要设置mandatory参数),因为没有匹配到任何BindingKey

2.3.4 headers

headers类型的交换器不依赖于RoutingKey的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时制定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers类型的交换器性能会很差,而且说不实用,基本上也用不到。

2.4 RabbitMQ运转流程

2.4.1 RabbitMQ 生产者运转流程

在这里插入图片描述

2.4.2 RabbitMQ 消费者运转流程

在这里插入图片描述

2.5 AMQP协议介绍

RabbitMQ 就是AMQP 协议的 Erlang 实现。AMQP的模型架构和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。当生产者发送消息时所携带的RoutingKey 与绑定时的BindingKey 相匹配时,消息即被存入相应的队列中。消费者可以订阅相应的队列来获取消息。

RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循AMQP协议中相应的概念。

AMQP协议本身包括三层:

   Module Layer:位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用Queue.Declare 命令声明一个队列或者使用Basic.Consume 订阅消费一个队列中的消息。

   Session Layer:位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。

   Transport Layer:位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。

 AMQP 说到底还是一个通信协议,通信协议都会涉及报文交换,从low-level 举例来说,AMQP 本身是应用层的协议,其填充与TCP协议层的数据部分,而从high-level 来说,AMQP 是通过协议命令进行交互的。AMQP 协议可以看做一系列结构化命令的集合,这里的命令代表一种操作,类似于HTTP 协议中的方法(GET、POST、PUT、DELETE等)

三. RabbitMQ进阶

3.1 RabbitMQ 的两种模式

在RabbitMQ 中有两种处理消息的模式。一种是推模式/订阅模式/投递模式(也叫Push模式),消费者调用 channel.basicConsume() 方法在订阅队列后,由RabbitMQ 主动将消息推送给订阅队列的消费者;另一种是拉模式/检索模式(也叫Pull模式),需要消费者调用 channel.basicGet() 方法,主动从指定队列中拉取消息。

推模式:消息中间件主动将消息推送给消费者
拉模式:消费者主动从消息中间件拉取消息

3.1.1 推模式

1.推模式接收消息是最有效的一种消息处理方式。 channel.basicConsume(queueName, consumer) 方法将信道(channel)设置成投递模式,直到取消队列的订阅为止。在投递模式期间,当消息到达RabbitMQ 时,RabbitMQ 会自动的、不断的投递消息给匹配的消费者,而不需要消费端手动去拉取,当然投递消息的个数还是会受到 channel.basicQos 的限制

 2.因为推模式将消息提前推送给消费者,消费者必须设置一个缓冲区来缓存这些消息。优点是消费者总是有一堆在内存中待处理的消息,所以当真正去消费消息时效率会很高。缺点就是缓冲区可能会溢出。

 3.由于推模式是信息到达RabbitMQ 后,就会立即被投递给匹配的消费者,所以实时性非常好,消费者能及时得到最新的消息。

3.1.2 拉模式

1.如果只想从对列中获取单条消息而不是持续订阅,则可以使用 channel.basicGet 方法来进行消费消息。

 2.拉模式在消费者需要时去消息中间件拉取消息,这段网络开销明显增加消息延迟,降低了系统的吞吐量。

 3.缺点:由于拉模式需要消费者手动去RabbitMQ 中拉取消息,所以实时性会很差。消费者难以获取实时消息,具体什么时候拿到新消息完全取决于消费者什么时候去拉取消息。

3.1.3 结论:
1.推模式更关注消息的实时性

 2.拉模式更关注消费者的消费能力,只有消费者主动去拉取消息才会获取到消息

 3.推模式直接从内存缓冲中获取消息,能有效的提高消息的处理效率以及吞吐量

 4.要想实现高吞吐量,消费者需要使用推模式

 5.不能再循环中使用拉模式来模拟推模式,因为拉模式每次都需要去消息中间件来拉取消息来消费,所以会严重影响RabbitMQ的性能。

3.1.4 Qos相关内容:

为什么要设置Qos

   在RabbitMQ中,队列向消费者发送消息,如果没有设置Qos的值,那么队列中有多少消息就发送多少消息给消费者,完全不管消费者是否能够消费完,这样可能就会形成大量未ack的消息在缓存区堆积,因为这些消息未收到消费者发送的ack,所以只能暂时存储在缓存区中,等待ack,然后删除对应消息。

   这样的话,就需要开发者限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量,一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认。例如:假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。

3.2 RabbitMQ 消息确认与拒绝

3.2.1 消息的确认

为了保证消息从队列可靠的到达消费者,RabbitMQ 提供了消息确认机制。

对应有两种确认方式,自动确认和手动确认,通过autoAck 属性来实现的,手动确认需要服务消费者显式的调用 basic.ask 命令进行确认。

当autoAck 为 true时表示RabbitMQ 发动消息到消费者操作系统的套接字缓冲区即可让 RabbitMQ 将消息队列中的该消息删除(实际上先打上删除标记,之后删除)。但是如果套接字缓冲区崩溃,就会存在消费者应用程序没有读到消息,消息就会从消息队列中移除的风险。

当autoAck 为 false 则表示消息必须要被消费者应用程序手动的调用 basic.ack 进行确认。所以安全性更好。

在这里插入图片描述

3.2.2 消息的拒绝

如果我们的服务消费者需要对获取到的消息进行拒绝,那么就调用basic.reject 命令

channel.basicReject(deliveryTag, requeue);
这里的deliveryTag 是一个 64位的长整数,第二个参数requeue 表示是否重新加入到队列中,如果为false表示立即从队列中移除。

basic.reject 只能拒绝一条消息,如果要批量拒绝需要调用 basic.Nack 这个命令
channel.basicNack(deliveryTag, multiple, requeue);

其中第二个参数multiple 设置为false 表示拒绝编号为 deliveryTag 这个消息,如果为true则表示拒绝deliveryTag 前边的所有未被当前消费者确认的消息。

requeue 设置为false, 可以启动死信队列。死信队列可以通过检测被拒绝或者未被送达的消息,用于追踪问题。

3.3 mandatory 和 immediate 参数

mandatory 和 immediate 是 channel.basicPublish 方法中的两个参数,它们都有当消息传递过程中不可达到目的地时将消息返回给生产者的功能。

3.3.1 mandatory

当 mandatory 参数为true时,交换器无法根据自身的类型和路由键找到一个符合条件 的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。生产者端可以通过调用 channel.addReturnListener 来添加监听器实现,代码如下:

当 mandatory 数设置为 false 时,出现上述情形,则消息直接被丢弃。
channel.basicPublish(EXCHANGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.addReturnListener(new ReturnListener() {
    @Override
    public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
        String message = new String(bytes);
        System.out.println("Basic.Return 返回的消息是: "+ message);
    }
});

上面代码中生产者没有成功的将消息路由到队列,此时RabbitMQ 会通过Basic.Return 返回这条消息,之后生产者客户端会通过ReturnListener 监听到这个事件。

3.3.2 immediate

当immediate 参数为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return 返回至生产者

总结

概括的来说,mandatory 参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回生产者。
immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立即投递;如果所有匹配的队列上都没有消费者,则直接将消息返回给生产者,不用将消息存入队列而等待消费者了。
RabbitMQ 去掉了对 immediate的支持了,因为会增加代码的复杂性,同样的功能可以采用TTL和DLX的方式代替。

3.4 备份交换器

有了mandatory 参数,我们可以回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何去处理这些无法路由的消息,最多打个日志,然后触发报警,之后再手动处理。这种方式可能不太优雅,那如果既不想丢失消息,又不想增加生产者的复杂性,怎么做呢?可以用备份交换器来实现。

 备份交换器,顾名思义,可以理解成RabbitMQ 中交换器的“备胎”,当我们为某一个交换器声明一个对应的备胎交换器时,就是为他创建一个备胎,当交换器接收到一条不可路由的消息时,将会把这条消息转发到备份交换器中,由备份交换器来转发和处理,通常备份交换器的类型为 fanout, 这样就能把所有消息都投递到预与其绑定的队列中。

 也就是说我们在备份交换器下绑定一个队列,这样所有那些原交换器无法路由的消息都会投递到这个队列中,这样的话我们就可以建立一个报警队列,用独立的消费者来进行监听和报警。

使用方式

  可以通过在声明交换器(调用channel.exchangeDeclare()方式)的时候添加alternate-exchange 参数来实现。代码如下:
Map<String, Object> param = new HashMap<>(1);
param.put("alternate-exchange", "myAe");
channel.exchangeDeclare("normalExchange", "direct", true, false, param);
channel.exchangeDeclare("myAe", "fanout", true, false, null);
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueBind("normalQueue", "normalExchange", "normalKey");
channel.queueDeclare("unRoutingQueue", true, false, false, null);
channel.queueBind("unRoutingQueue", "myAe", "");

在这里插入图片描述

3.4 过期时间 (TTL)

TTL(Time To Live的简称),即过期时间。RabbitMQ 可以对消息和队列设置TTL。

3.4.1 给队列设置TTL, 设置完队列中所有消息都有相同的过期时间

channel.queueDeclare() 方法中加入 x-message-ttl 参数实现,代码如下

Map<String, Object> param = new HashMap<>(1);
param.put("x-message-ttl", 6000);
channel.queueDeclare("normalQueue", true, false, false, param);

3.4.2 给某一条消息设置TTL

channel.basicPublish 方法中 加入 expiration 的属性参数,单位为毫秒

String message = "hello world";
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);//持久化消息
builder.expiration("60000");//设置TTL = 60000ms
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, properties, message.getBytes());

3.5 死信队列 (DLX)

DLX(Dead-Letter-Exchange), 可以称之为死信交换器。当消息在一个队列中变成死信(Dead Message)之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列称之为死信队列。

消息变成死信一般有以下几种情况:

  消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false
  消息过期
  队列达到最大长度
DLX 也是一个正常的交换器,和一般的交换器没有区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。之后可以监听这个队列的消息以进行相应的处理

实现方式

通过在channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数来为这个队列添加DLX,代码如下:

// 创建DLX
channel.exchangeDeclare("dlx_exchange", "direct");
Map<String, Object> param = new HashMap<>();
param.put("x-dead-letter-routing-key", "dlx_exchange");
// 为队列 myqueue 添加DLX
channel.queueDeclare("myqueue", false, false, false, param);
// 也可以为这个DLX制定路由键,如果没有特殊指定,则使用原队列的路由键
param.put("x-dead-letter-routing-key", "dlx-routing-key");

在这里插入图片描述

总结

   DLX是一个非常有用的特性,他可以处理异常情况下,消息不能被消费者正确消费(消费者调用了Basic.Reject或者Basic.Nock)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。

   DLX配合TTL还可以实现延迟队列的功能。

应用:

1.业务重试

3.6 延迟队列

由上面的知识点可以知道我们可以给队列设置一个TTL,然后过期了自动转发到一个死信队列中,就可实现延迟队列的功能。

四 RabbitMQ的一些常见问题

4.1 如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?

4.1.1 发送方确认模式

将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后,信道会发送一个确认给生产者(包括消息的ID)。
如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack消息。
发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。
当确认消息到达生产者应用程序,生产者应用的回调方法就会被触发来处理确认消息。

4.1.2 接收方确认机制

消费者接受每一条消息后都必须进行确认,只要有消费者确认了消息,MQ才能安全的把消息从队列中删除。

这里并没有用到超时机制,MQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证了数据的最终一致性。
还有几种情况:

如果消费者接受到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消费重复的隐患,需要去重,幂等处理)
如果消费者接受到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙。则不会给该消费者分发更多的消息。

4.2 如何避免消息重复投递或重复消费?

在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列,
在消息消费时,要求消息体中 必须要有一个bizID(业务id, 对于同一个业务全局唯一) 作为去重的依据,避免同一条消息被重复消费。

4.3 消息如何分发?

一个生产者,多个消费者
多个消费者时,是轮询机制,依次分发给消费者(每个消费者按顺序依次消费)

4.4 如何确保消息不丢失?

4.4.1 生产者丢失消息

可以选择使用 RabbitMQ 提供事务功能,就是生产者在发送数据之前开启事务,然后发送消息,如果消息没有成功被RabbitMQ接收到,那么生产者会受到异常报错,这时就可以回滚事物,然后尝试重新发送;如果收到了消息,那么就可以提交事物。

缺点: RabbitMQ 事务已开启,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。

或者使用死信队列对处理失败的数据进行拒绝,然后使用死信队列重复处理,处理时再用业务id进行幂等性处理

如何保证消息的可靠性

4.4.2、持久化

RabbitMQ中发消息的时候会有个durable参数可以设置,设置为true,就会持久化
在这里插入图片描述

这样的话MQ服务器即使宕机,重启后磁盘文件中有消息的存储,这样就不会丢失了吧。是的这样就一定概率的保障了消息不丢失。

但还会有个场景,就是消息刚刚保存到MQ内存中,但还没有来得及更新到磁盘文件中,突然宕机了。这个场景在持续的大量消息投递的过程中,会很常见。

那怎么办?我们如何作才能保障一定会持久化到磁盘上面呢?

confirm机制
RabbitMQ利用confirm机制来通知我们是否持久化成功
在这里插入图片描述

confirm机制的原理:
(1)消息生产者把消息发送给MQ,如果接收成功,MQ会返回一个ack消息给生产者;
(2)如果消息接收不成功,MQ会返回一个nack消息给生产者。

这样是不是就可以保障100%消息不丢失了呢?

如果我们生产者每发一条消息,都要MQ持久化到磁盘中,然后再发起ack或nack的回调。这样的话是不是我们MQ的吞吐量很不高,因为每次都要把消息持久化到磁盘中。写入磁盘这个动作是很慢的。这个在高并发场景下是不能够接受的,吞吐量太低了。

所以MQ持久化磁盘真实的实现,是通过异步调用处理的,他是有一定的机制,如:等到有几千条消息的时候,会一次性的刷盘到磁盘上面。而不是每来一条消息,就刷盘一次。

所以comfirm机制其实是一个异步监听的机制,是为了保证系统的高吞吐量,这样就导致了还是不能够100%保障消息不丢失,因为即使加上了confirm机制,消息在MQ内存中还没有刷盘到磁盘就宕机了,还是没法处理。

4.5 RabbitMQ 消息积压

消息积压一般有以下这几种情况:

1.消费者消费速度跟不上生产者生产速度,这种情况一般是前期设计阶段没设计好消费者和生产者的平衡关系,保证消费者的消费速度,增加消费者数量

2.消费者出现异常,导致一直不消费消息,从而导致消息积压,超出硬盘存储大小,紧急扩容,或者做其他紧急处理

其他紧急处理思路:

1.分析积压的原因是因为consume程序哪方面出了问题,先修复consumer的问题,确保恢复其消费速度,然后将现有的consumer停掉

2.新建一个新的exchange,临时建立10倍的queue

3.写一个临时程序将积压的消息重新导流到新的queue中

4.接着部署10倍的consumer去消费新的queue中的消息

5.这种做法相当于临时将queue和consumer的资源扩大10倍去快速消费消息

6.等快消费完了,再恢复到原来的架构,重新用原来的机器消费消息

4.6 顺序消费

在某些场景下,消息队列中的若干消息如果是对同一个数据进行操作,并且这些操作是有先后顺序的,那么消息中间件的消费者如果不考虑顺序性问题就会造成数据不一致的问题。

比如订单的先后顺序,比如数据同步服务binlog中增删改的顺序问题。

顺序错乱的场景:

一个Queue,多个Consumer消费

一个Queue,一个Consumer,但是消费者是多线程处理

解决办法:一个Queue,一个Consumer,单线程,这样解决了顺序消费的问题,但是性能很差,吞吐量很低。

如何保证消息的顺序消费,又保证高效呢:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/555945.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Unity UI -- (6)增加Toggle和Slider

在前面的小节中&#xff0c;我们已经有了一个空的设置菜单。现在让我们来添加设置选项。 在本节最后&#xff0c;我们的设置菜单的样子参考如下&#xff1a; 添加一个音乐开关&#xff08;Toggle&#xff09; 现在让我们来增加一个toggle&#xff0c;让用户能够通过它来对场景的…

多功能语音芯片​NV040C的应用,为洗地机开辟新的应用领域

随着科技的快速发展&#xff0c;智能化和自动化已经成为了各个行业发展的主流趋势。传统的家庭洗拖工作日渐被各类洗地机、扫地机器人等取代&#xff0c;其中作为洗地机作为近几年家庭清洁的好物。近年来&#xff0c;不少洗地机厂商开始将语音芯片技术应用到产品中&#xff0c;…

多商户商城系统开发功能优势与选择技巧

电商行业的持续发展&#xff0c;让越来越多的商家企业开始选择入驻多商户商城&#xff0c;通过该系统不仅能够为消费者提供更加便捷良好的购物体验&#xff0c;而且也能够为企业提供一个高效稳定的电商平台&#xff0c;可以说是未来电商行业发展的重要趋势。那么多商户商城系统…

Milk -v 开发板烧录系统以及ssh连接

Milk -v 开发板烧录系统以及ssh连接 0. 前言1. 系统下载2. 驱动安装3. ssh连接 0. 前言 操作系统&#xff1a;Windows10 专业版 开发板&#xff1a;Milk -v 准备读卡器、内存卡、Typec 数据线 Milk-V开发板官方文档 到手后它的外形和宣传图片是一致的&#xff0c;但是更加的…

微信小程序云开发学习记录--1

目录 1.配置小程序项目 2. 云创建 3.新建云文件夹 4.数据库的建立和使用 5.增、删、改、查四种数据库基本操作 增加数据 查询数据 修改数据 删除数据 拿到微信小程序&#xff0c;首先可以先配置好自己的服务器或者是云环境&#xff0c;服务器的方法就不说了&#xff0…

前端实训——Day01

前言 学校最近开始实训周了&#xff0c;一上就是一个月&#xff0c;本来想在课上学点考研的东西的&#xff0c;但是无奈任务重&#xff0c;而且最后还能有点小奖励&#xff0c;就认真学了&#xff0c;再者说&#xff0c;html也挺重要的&#xff0c;学一学也不算浪费时间。 软…

Linux——安装tomcat并部署项目

目录 1、准备阶段 2、具体步骤 2.1、下载tomcat安装包 2.2、上传tomcat安装包 2.3、启动tomcat 2.4、访问页面 3、部署Maven项目至tomcat 3.1、打包Maven项目 3.2、上传打包后的war包至Linux 3.3、修改连接数据库配置文件中的ip 3.4、访问项目 3.5、直接访问ip访问…

Promise理解+ JS 的执行机制

做一道题,理解一下: function getPrinterList() {let res 初始setTimeout(() > {res 1},1000)return res }let res getPrinterList() console.log(res); //输出初始 在getPrinterList函数中,先分清同步异步. JS执行语句时,会区分同步异步,把所有的同步放在同步队列中,把…

【Python pymongo】零基础也能轻松掌握的学习路线与参考资料

Python pymongo是一款基于Python的MongoDB数据库的驱动程序&#xff0c;它提供了操作MongoDB数据库的接口和方法。学习Python pymongo可以帮助开发者更好地使用MongoDB数据库&#xff0c;从而实现更好的数据存储和管理。在这篇文章中&#xff0c;我们将介绍Python pymongo的学习…

chatgpt赋能Python-python_bin__

Python中的bin()方法&#xff1a;将数字转换为二进制字符串 Python中内置的bin()函数是一个非常有用的工具&#xff0c;它可以将一个整数转换成一个二进制字符串。这个函数非常简单易用&#xff0c;对于任何需要进行二进制操作的开发者来说都是一个必不可少的工具。 什么是二…

chatgpt赋能Python-python_bio包

Python Bio包&#xff1a;简介、功能和应用 Python Bio包是什么 Python Bio包是一套专门为生物信息学而设计的Python模块。它包含了许多优秀的工具和算法&#xff0c;可以帮助生物学家们解决各种生物问题。Python Bio包主要由五个子模块组成&#xff1a; Bio.Seq&#xff1a…

EXP-00026: conflicting modes specified

今天下午现场项目经理问了一个问题&#xff0c;直接上截图&#xff0c;问是不是客户端不兼容&#xff1f; C:\Users\Administrator>exp usr_jwc/Test#123192.16.50.100:1521/orcl ownerusr_jwc fully fileC:\ABCD20230521.dmp logC:\imp_ABCD20230521.log Export: Release…

day38_Servlet

今日内容 零、 复习昨日 一、Servlet 二、HTTP 三、HttpServlet 零、 复习昨日 见晨考 一、Servlet 1.1 介绍 javaweb开发,就是需要服务器接收前端发送的请求,以及请求中的数据,经过处理(jdbc操作),然后向浏览器做出响应. 我们要想在服务器中写java代码来接收请求,做出响应,我…

chatgpt赋能Python-python_aipspeech

Python Aipspeech介绍与优势分析 什么是Python Aipspeech&#xff1f; Python Aipspeech是一种基于Python编程语言的语音识别API&#xff0c;可以实现语音转文字、语音合成、语音唤醒等功能。它基于Aipspeech强大的语音识别引擎&#xff0c;可以实现高精度的语音识别&#xf…

香港VPS服务器如何屏蔽指定访客ip?

​  如果你是一个香港VPS服务器的管理员&#xff0c;你可能会遇到一些不良用户或者恶意攻击者&#xff0c;这些人会尝试通过不断的访问和攻击你的网站来破坏你的网站的运行。如何保护你的网站&#xff0c;你需要使用一些方法来屏蔽这些指定的访客IP。 首先&#xff0c;你需要…

某医院内部网络攻击分析案例

分析概要 分析概要从以下三点做介绍。 分析内容 NetInside网络流量分析设备采集的流量。 分析时间 报告分析时间范围为&#xff1a;2020-09-28 07:58:00-11:58:00&#xff0c;时长共计3小时。 分析目的 本报告主要分析目的&#xff1a;查找和定位存在可疑现象的主机、查…

当你学会这项python数据提取神器时,请做好升职准备!

一、什么是 jsonpath ● JsonPath 是一种信息抽取类库&#xff0c;是从 JSON 文档中抽取指定信息的工具&#xff0c;提供多种语言实现版本&#xff0c;包括&#xff1a;JavaScript、Python、PHP 和 Java。 *文末领10节自动化精品课* 二、特点 ● 只能提取 JSON 格式的数据 ●…

Postman和Jmeter的区别

01、创建接口用例集 Postman 是 Collections&#xff0c;Jmeter 是线程组&#xff0c;没什么区别。 02、 步骤的实现 Postman 和 jmeter 都是创建 http 请求 区别 1&#xff1a;postman 请求的URL 是一个整体&#xff0c;jmeter 分成了 4 个部分&#xff08;协议、主机、端…

chatgpt赋能Python-pythonsub

Python Sub- 快捷、高效的字符串替换工具 如果你是一个有大量文本替换需求的开发者&#xff0c;Python Sub 库是一个你一定不能错过的工具。Python Sub 是一个快捷、高效的字符串替换工具&#xff0c;帮助你快速轻松地替换字符串&#xff0c;优化你的工作效率。 Python Sub 提…

基于Java+Swing+mysql物业收费管理系统

基于JavaSwingmysql物业收费管理系统 一、系统介绍1. 居民管理模式:2. 物业管理员管理模式:3.项目说明 二、功能展示1.用户登陆2.查询缴费--业主3.历史账单3.资料设置4.生成账单--管理员5.收费记录--管理员6.用户管理--管理员 三、数据库四、其它系统五、获取源码 一、系统介绍…