这里写自定义目录标题
- 一. RabbitMQ简介
- 1.1. 消息中间件
- 1.1.1.什么是消息中间件
- 1.1.2.消息中间件的传递模式
- 1. 1.2.1 点对点
- 1. 1.2.2 发布订阅模式
- 1.1.3 消息中间件种类
- 1.1.4 消息中间件的作用
- 2. RabbitMQ介绍
- 2.1.RabbitMQ的起源
- 2.2.RabbitMQ的安装及简单使用
- 3. RabbitMQ的简单使用
- 3.1 生产者Demo:
- 3.2 消费者Demo:
- 二. RabbitMQ入门
- 2.1 相关概念
- 2.2 connection 和 channel 详解
- 2.3 交换器类型
- 2.3.1 fanout
- 2.3.2 direct
- 2.3.3 Topic
- 2.3.4 headers
- 2.4 RabbitMQ运转流程
- 2.4.1 RabbitMQ 生产者运转流程
- 2.4.2 RabbitMQ 消费者运转流程
- 2.5 AMQP协议介绍
- 三. RabbitMQ进阶
- 3.1 RabbitMQ 的两种模式
- 3.1.1 推模式
- 3.1.2 拉模式
- 3.1.4 Qos相关内容:
- 3.2 RabbitMQ 消息确认与拒绝
- 3.2.1 消息的确认
- 3.2.2 消息的拒绝
- 3.3 mandatory 和 immediate 参数
- 3.3.1 mandatory
- 3.3.2 immediate
- 3.4 备份交换器
- 3.4 过期时间 (TTL)
- 3.4.1 给队列设置TTL, 设置完队列中所有消息都有相同的过期时间
- 3.4.2 给某一条消息设置TTL
- 3.5 死信队列 (DLX)
- 3.6 延迟队列
- 四 RabbitMQ的一些常见问题
- 4.1 如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?
- 4.1.1 发送方确认模式
- 4.1.2 接收方确认机制
- 4.2 如何避免消息重复投递或重复消费?
- 4.3 消息如何分发?
- 4.4 如何确保消息不丢失?
- 4.4.1 生产者丢失消息
- 4.4.2、持久化
- 4.5 RabbitMQ 消息积压
- 4.6 顺序消费
一. RabbitMQ简介
1.1. 消息中间件
1.1.1.什么是消息中间件
消息队列中间件(Message Queue Middleware, 简称MQ)是指利用高效可靠的消息传递机制运行与平台无关的数据交流,并给予数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,他可以在分布式环境下扩展进程间的通信
1.1.2.消息中间件的传递模式
1. 1.2.1 点对点
点对点模式是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在是的消息的异步传输成为可能。
1. 1.2.2 发布订阅模式
发布订阅模式定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(Topic),topic可以认为是消息传递的中介,消息发布者将消息发布到某个topic,而消息订阅者则从topic中订阅消息。topic使得消息的订阅者和消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布/订阅模式在消息的一对多广播时采用。
1.1.3 消息中间件种类
RabbitMQ,Kafka,ActivityMQ,RocketMQ
1.1.4 消息中间件的作用
解耦
冗余(存储):有些情况下,处理数据的过程会失败,消息中间件可以把数据持久化存储,直到完全处理,避免消息丢失
削峰:流量削峰
可恢复性:当一部分组件失效时,不会影响整个系统
顺序保证:在大多数场景下,数据处理的顺序很重要,大部分消息中间件支持一定程度上的顺序性
缓冲:在任何重要的系统中,都会存在需要不同处理时间的元素。消息中间件通过一个缓冲层来帮助任务最高效率的执行,写入消息中间件的处理会尽可能快速。该缓冲层有助于控制和优化数据流经过系统的速度。
异步通信
2. RabbitMQ介绍
2.1.RabbitMQ的起源
RabbitMQ是采用Erlang语言实现AMQP协议(Advanced Message Queuing Protocol, 高级消息队列协议)的消息中间件,它最初起源于金融系统,用于在分布式系统中存储转发消息。
MSMQ(微软) -> JMS(ActiveMQ) -> AMQP(RabbitMQ)
2.2.RabbitMQ的安装及简单使用
我直接用的HomeBrew安装
3. RabbitMQ的简单使用
3.1 生产者Demo:
package com.demo.amqp.rabbitmqdemo.messagebug.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @Author jiangtao
* @Desc 生产者
* @Date 2022-09-05 21:07
**/
public class RabbitProducer {
private static final String EXCHANGE_NAME = "exchange_demo";
private static final String ROUTING_KEY = "routing_key_demo";
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "127.0.0.1";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("guest");
factory.setPassword("guest");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 创建一个type = direct 持久化 非自动删除的交换器
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
// 创建一个持久化 非排他 非自动删除的队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 将交换器和队列通过路由键进行绑定
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 发送一条消息 Hello World
String message = "Hello World";
// 发送消息
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.close();
connection.close();
}
}
3.2 消费者Demo:
package com.demo.amqp.rabbitmqdemo.messagebus.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* @Author jiangtao
* @Desc
* @Date 2022-09-05 21:34
**/
public class RabbitConsumer {
private static final String QUEUE_NAME = "queue_topic01";
private static final String IP_ADDRESS = "127.0.0.1";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException {
Address[] addresses = new Address[]{
new Address(IP_ADDRESS, PORT)
};
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection(addresses);
final Channel channel = connection.createChannel();
// 设置客户端最多接收未被Ack的消息的个数
channel.basicQos(64);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv message: " + new String(body));
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(QUEUE_NAME, consumer);
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
channel.close();
connection.close();
}
}
二. RabbitMQ入门
RabbitMQ整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。
2.1 相关概念
生产者
消费者
Broker:消息中间件的服务节点
Queue:队列,是RabbitMQ的内部对象,用于存储消息。
Exchange:交换器
RoutinfKey:路由键
Binding:绑定
BIndingKey:绑定键
2.2 connection 和 channel 详解
我们知道无论是生产者还是消费者,都需要和RabbitMQ Broker 建立连接,这个连接就是一条TCP连接,也就是Connection。
一旦TCP连接建立起来,客户端紧接着可以创建一个AMQP信道(Channel),每个信道都会被指派一个唯一的ID。
信道是建立在Connection 之上的虚拟连接,RabbitMQ 处理的每条AMQP指令都是通过信道完成的。
我们完全可以使用Connection 就能完成信道的工作,为什么还要引入信道呢?
试想这样一个场景,一个应用程序中有很多个线程需要从RabbitMQ 中消费消息,或者生产消息,那么必须需要建立很多个Connection,也就是多个TCP连接。
然而对于操作系统而言,建立和销毁TCP连接是非常昂贵的开销,如果遇到使用高峰,性能瓶颈也随之显现。
RabbitMQ 采用类似NIO(Non-blocking I/O)的做法,选择TCP连接复用,不仅可以减少性能开销,同时也便于管理。
名词解释:
NIO,也成非阻塞I/O,包含三大核心部分:Channel(信道),Buffer(缓冲区)和Selector(选择器)
NIO 基于 Channel 和 Buffer 尽心操作,数据总是从信道读取到缓冲区中,或者从缓冲区写入到信道中。
Selector 用于监听多个信道的时间(比如连接打开。数据到达等)。因此,单线程可以监听多个数据的信道。
每个线程把持一个信道,所以信道复用了Connction 的 TCP连接。同时RabbitMQ 可以确保每个线程的私密性,就像拥有独立的连接一样。当每个信道的流量不是很大的时候,复用单一的Connecion 可以在产生性能瓶颈的情况下有效的节省 TCP 连接资源。但是信道本身的流量很大的时候,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个 Connection ,将这些信道均摊到这些Connection 中。
信道在AMQP中是一个很重要的概念,大多数操作都是在信道这个层面展开的。
比如 channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法。
RabbitMQ 相关的API与AMQP紧密相连,比如 channel.basicPublish 对应AMQP 的 Basic.Publish 命令。
2.3 交换器类型
fanout 、direct、topic、headers
2.3.1 fanout
它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中。
2.3.2 direct
RoutingKey和BindingKey完全匹配
RoutingKey 为 warning 时 会路由到Queue 1 和 Queue 2
RoutingKey 为 error 时 会路由到Queue 1
RoutingKey 为 info和debug 时 会路由到Queue 2
2.3.3 Topic
RoutingKey和BindingKey模糊匹配
RoutingKey为一个点号 “.” 分割的字符串(被点号“.”分隔开的每一段独立的字符串成为一个单词),如“com.rabbitmq.client”, “java.util.concurrent”, “com.hidden.client”
BindingKey 和 RoutingKey 一样也是点号“.” 分隔开的字符串
BinddingKey 中可以存在两种特殊的字符串 “”, “#”,用于做模糊匹配,其中 “”用于匹配一个单词,“#”用于匹配多规格单词(可以是零个)
RoutingKey 为 com.rabbitmq.client 时 会路由到Queue 1 和 Queue 2
RoutingKey 为 com.hidden.client 时 会路由到 Queue 2
RoutingKey 为 com.hidden.demo 时 会路由到 Queue 2
RoutingKey 为 java.rabbitmq.client 时 会路由到 Queue 1
RoutingKey 为 java.util.concurrent 时 消息会被丢弃或者返回给生产者(需要设置mandatory参数),因为没有匹配到任何BindingKey
2.3.4 headers
headers类型的交换器不依赖于RoutingKey的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时制定一组键值对,当发送消息到交换器时,RabbitMQ会获取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时制定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers类型的交换器性能会很差,而且说不实用,基本上也用不到。
2.4 RabbitMQ运转流程
2.4.1 RabbitMQ 生产者运转流程
2.4.2 RabbitMQ 消费者运转流程
2.5 AMQP协议介绍
RabbitMQ 就是AMQP 协议的 Erlang 实现。AMQP的模型架构和 RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器和队列绑定。当生产者发送消息时所携带的RoutingKey 与绑定时的BindingKey 相匹配时,消息即被存入相应的队列中。消费者可以订阅相应的队列来获取消息。
RabbitMQ 中的交换器、交换器类型、队列、绑定、路由键等都是遵循AMQP协议中相应的概念。
AMQP协议本身包括三层:
Module Layer:位于协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用Queue.Declare 命令声明一个队列或者使用Basic.Consume 订阅消费一个队列中的消息。
Session Layer:位于中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端与服务器之间的通信提供可靠性同步机制和错误处理。
Transport Layer:位于最底层,主要传输二进制数据流,提供帧的处理、信道复用、错误检测和数据表示等。
AMQP 说到底还是一个通信协议,通信协议都会涉及报文交换,从low-level 举例来说,AMQP 本身是应用层的协议,其填充与TCP协议层的数据部分,而从high-level 来说,AMQP 是通过协议命令进行交互的。AMQP 协议可以看做一系列结构化命令的集合,这里的命令代表一种操作,类似于HTTP 协议中的方法(GET、POST、PUT、DELETE等)
三. RabbitMQ进阶
3.1 RabbitMQ 的两种模式
在RabbitMQ 中有两种处理消息的模式。一种是推模式/订阅模式/投递模式(也叫Push模式),消费者调用 channel.basicConsume() 方法在订阅队列后,由RabbitMQ 主动将消息推送给订阅队列的消费者;另一种是拉模式/检索模式(也叫Pull模式),需要消费者调用 channel.basicGet() 方法,主动从指定队列中拉取消息。
推模式:消息中间件主动将消息推送给消费者
拉模式:消费者主动从消息中间件拉取消息
3.1.1 推模式
1.推模式接收消息是最有效的一种消息处理方式。 channel.basicConsume(queueName, consumer) 方法将信道(channel)设置成投递模式,直到取消队列的订阅为止。在投递模式期间,当消息到达RabbitMQ 时,RabbitMQ 会自动的、不断的投递消息给匹配的消费者,而不需要消费端手动去拉取,当然投递消息的个数还是会受到 channel.basicQos 的限制
2.因为推模式将消息提前推送给消费者,消费者必须设置一个缓冲区来缓存这些消息。优点是消费者总是有一堆在内存中待处理的消息,所以当真正去消费消息时效率会很高。缺点就是缓冲区可能会溢出。
3.由于推模式是信息到达RabbitMQ 后,就会立即被投递给匹配的消费者,所以实时性非常好,消费者能及时得到最新的消息。
3.1.2 拉模式
1.如果只想从对列中获取单条消息而不是持续订阅,则可以使用 channel.basicGet 方法来进行消费消息。
2.拉模式在消费者需要时去消息中间件拉取消息,这段网络开销明显增加消息延迟,降低了系统的吞吐量。
3.缺点:由于拉模式需要消费者手动去RabbitMQ 中拉取消息,所以实时性会很差。消费者难以获取实时消息,具体什么时候拿到新消息完全取决于消费者什么时候去拉取消息。
3.1.3 结论:
1.推模式更关注消息的实时性
2.拉模式更关注消费者的消费能力,只有消费者主动去拉取消息才会获取到消息
3.推模式直接从内存缓冲中获取消息,能有效的提高消息的处理效率以及吞吐量
4.要想实现高吞吐量,消费者需要使用推模式
5.不能再循环中使用拉模式来模拟推模式,因为拉模式每次都需要去消息中间件来拉取消息来消费,所以会严重影响RabbitMQ的性能。
3.1.4 Qos相关内容:
为什么要设置Qos
在RabbitMQ中,队列向消费者发送消息,如果没有设置Qos的值,那么队列中有多少消息就发送多少消息给消费者,完全不管消费者是否能够消费完,这样可能就会形成大量未ack的消息在缓存区堆积,因为这些消息未收到消费者发送的ack,所以只能暂时存储在缓存区中,等待ack,然后删除对应消息。
这样的话,就需要开发者限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 basic.qos 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量,一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认。例如:假设在通道上有未确认的消息 5、6、7,8,并且通道的预取计数设置为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。比方说 tag=6 这个消息刚刚被确认 ACK,RabbitMQ 将会感知这个情况到并再发送一条消息。
3.2 RabbitMQ 消息确认与拒绝
3.2.1 消息的确认
为了保证消息从队列可靠的到达消费者,RabbitMQ 提供了消息确认机制。
对应有两种确认方式,自动确认和手动确认,通过autoAck 属性来实现的,手动确认需要服务消费者显式的调用 basic.ask 命令进行确认。
当autoAck 为 true时表示RabbitMQ 发动消息到消费者操作系统的套接字缓冲区即可让 RabbitMQ 将消息队列中的该消息删除(实际上先打上删除标记,之后删除)。但是如果套接字缓冲区崩溃,就会存在消费者应用程序没有读到消息,消息就会从消息队列中移除的风险。
当autoAck 为 false 则表示消息必须要被消费者应用程序手动的调用 basic.ack 进行确认。所以安全性更好。
3.2.2 消息的拒绝
如果我们的服务消费者需要对获取到的消息进行拒绝,那么就调用basic.reject 命令
channel.basicReject(deliveryTag, requeue);
这里的deliveryTag 是一个 64位的长整数,第二个参数requeue 表示是否重新加入到队列中,如果为false表示立即从队列中移除。
basic.reject 只能拒绝一条消息,如果要批量拒绝需要调用 basic.Nack 这个命令
channel.basicNack(deliveryTag, multiple, requeue);
其中第二个参数multiple 设置为false 表示拒绝编号为 deliveryTag 这个消息,如果为true则表示拒绝deliveryTag 前边的所有未被当前消费者确认的消息。
requeue 设置为false, 可以启动死信队列。死信队列可以通过检测被拒绝或者未被送达的消息,用于追踪问题。
3.3 mandatory 和 immediate 参数
mandatory 和 immediate 是 channel.basicPublish 方法中的两个参数,它们都有当消息传递过程中不可达到目的地时将消息返回给生产者的功能。
3.3.1 mandatory
当 mandatory 参数为true时,交换器无法根据自身的类型和路由键找到一个符合条件 的队列,那么 RabbitMQ 会调用 Basic.Return 命令将消息返回给生产者。生产者端可以通过调用 channel.addReturnListener 来添加监听器实现,代码如下:
当 mandatory 数设置为 false 时,出现上述情形,则消息直接被丢弃。
channel.basicPublish(EXCHANGE_NAME, "", true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
String message = new String(bytes);
System.out.println("Basic.Return 返回的消息是: "+ message);
}
});
上面代码中生产者没有成功的将消息路由到队列,此时RabbitMQ 会通过Basic.Return 返回这条消息,之后生产者客户端会通过ReturnListener 监听到这个事件。
3.3.2 immediate
当immediate 参数为true时,如果交换器在将消息路由到队列时发现队列上并不存在任何消费者,那么这条消息不会存入队列中。当与路由键匹配的所有队列都没有消费者时,该消息会通过Basic.Return 返回至生产者
总结
概括的来说,mandatory 参数告诉服务器至少将该消息路由到一个队列中,否则将消息返回生产者。
immediate 参数告诉服务器,如果该消息关联的队列上有消费者,则立即投递;如果所有匹配的队列上都没有消费者,则直接将消息返回给生产者,不用将消息存入队列而等待消费者了。
RabbitMQ 去掉了对 immediate的支持了,因为会增加代码的复杂性,同样的功能可以采用TTL和DLX的方式代替。
3.4 备份交换器
有了mandatory 参数,我们可以回退消息,我们获得了对无法投递消息的感知能力,有机会在生产者的消息无法被投递时发现并处理。但有时候,我们并不知道该如何去处理这些无法路由的消息,最多打个日志,然后触发报警,之后再手动处理。这种方式可能不太优雅,那如果既不想丢失消息,又不想增加生产者的复杂性,怎么做呢?可以用备份交换器来实现。
备份交换器,顾名思义,可以理解成RabbitMQ 中交换器的“备胎”,当我们为某一个交换器声明一个对应的备胎交换器时,就是为他创建一个备胎,当交换器接收到一条不可路由的消息时,将会把这条消息转发到备份交换器中,由备份交换器来转发和处理,通常备份交换器的类型为 fanout, 这样就能把所有消息都投递到预与其绑定的队列中。
也就是说我们在备份交换器下绑定一个队列,这样所有那些原交换器无法路由的消息都会投递到这个队列中,这样的话我们就可以建立一个报警队列,用独立的消费者来进行监听和报警。
使用方式
可以通过在声明交换器(调用channel.exchangeDeclare()方式)的时候添加alternate-exchange 参数来实现。代码如下:
Map<String, Object> param = new HashMap<>(1);
param.put("alternate-exchange", "myAe");
channel.exchangeDeclare("normalExchange", "direct", true, false, param);
channel.exchangeDeclare("myAe", "fanout", true, false, null);
channel.queueDeclare("normalQueue", true, false, false, null);
channel.queueBind("normalQueue", "normalExchange", "normalKey");
channel.queueDeclare("unRoutingQueue", true, false, false, null);
channel.queueBind("unRoutingQueue", "myAe", "");
3.4 过期时间 (TTL)
TTL(Time To Live的简称),即过期时间。RabbitMQ 可以对消息和队列设置TTL。
3.4.1 给队列设置TTL, 设置完队列中所有消息都有相同的过期时间
channel.queueDeclare() 方法中加入 x-message-ttl 参数实现,代码如下
Map<String, Object> param = new HashMap<>(1);
param.put("x-message-ttl", 6000);
channel.queueDeclare("normalQueue", true, false, false, param);
3.4.2 给某一条消息设置TTL
channel.basicPublish 方法中 加入 expiration 的属性参数,单位为毫秒
String message = "hello world";
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(2);//持久化消息
builder.expiration("60000");//设置TTL = 60000ms
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, properties, message.getBytes());
3.5 死信队列 (DLX)
DLX(Dead-Letter-Exchange), 可以称之为死信交换器。当消息在一个队列中变成死信(Dead Message)之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列称之为死信队列。
消息变成死信一般有以下几种情况:
消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false
消息过期
队列达到最大长度
DLX 也是一个正常的交换器,和一般的交换器没有区别,他能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。之后可以监听这个队列的消息以进行相应的处理
实现方式
通过在channel.queueDeclare 方法中设置 x-dead-letter-exchange 参数来为这个队列添加DLX,代码如下:
// 创建DLX
channel.exchangeDeclare("dlx_exchange", "direct");
Map<String, Object> param = new HashMap<>();
param.put("x-dead-letter-routing-key", "dlx_exchange");
// 为队列 myqueue 添加DLX
channel.queueDeclare("myqueue", false, false, false, param);
// 也可以为这个DLX制定路由键,如果没有特殊指定,则使用原队列的路由键
param.put("x-dead-letter-routing-key", "dlx-routing-key");
总结
DLX是一个非常有用的特性,他可以处理异常情况下,消息不能被消费者正确消费(消费者调用了Basic.Reject或者Basic.Nock)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。
DLX配合TTL还可以实现延迟队列的功能。
应用:
1.业务重试
3.6 延迟队列
由上面的知识点可以知道我们可以给队列设置一个TTL,然后过期了自动转发到一个死信队列中,就可实现延迟队列的功能。
四 RabbitMQ的一些常见问题
4.1 如何确保消息正确地发送至RabbitMQ? 如何确保消息接收方消费了消息?
4.1.1 发送方确认模式
将信道设置成confirm模式(发送方确认模式),则所有在信道上发布的消息都会被指派一个唯一ID。一旦消息被投递到目的队列后,或者消息被写入磁盘后,信道会发送一个确认给生产者(包括消息的ID)。
如果RabbitMQ发生内部错误从而导致消息丢失,会发送一条nack消息。
发送方确认模式是异步的,生产者应用程序在等待确认的同时,可以继续发送消息。
当确认消息到达生产者应用程序,生产者应用的回调方法就会被触发来处理确认消息。
4.1.2 接收方确认机制
消费者接受每一条消息后都必须进行确认,只要有消费者确认了消息,MQ才能安全的把消息从队列中删除。
这里并没有用到超时机制,MQ仅通过Consumer的连接中断来确认是否需要重新发送消息。也就是说,只要连接不中断,RabbitMQ给了Consumer足够长的时间来处理消息。保证了数据的最终一致性。
还有几种情况:
如果消费者接受到消息,在确认之前断开了连接或取消订阅,RabbitMQ会认为消息没有被分发,然后重新分发给下一个订阅的消费者。(可能存在消费重复的隐患,需要去重,幂等处理)
如果消费者接受到消息却没有确认消息,连接也未断开,则RabbitMQ认为该消费者繁忙。则不会给该消费者分发更多的消息。
4.2 如何避免消息重复投递或重复消费?
在消息生产时,MQ内部针对每条生产者发送的消息生成一个inner-msg-id,作为去重的依据(消息投递失败并重传),避免重复的消息进入队列,
在消息消费时,要求消息体中 必须要有一个bizID(业务id, 对于同一个业务全局唯一) 作为去重的依据,避免同一条消息被重复消费。
4.3 消息如何分发?
一个生产者,多个消费者
多个消费者时,是轮询机制,依次分发给消费者(每个消费者按顺序依次消费)
4.4 如何确保消息不丢失?
4.4.1 生产者丢失消息
可以选择使用 RabbitMQ 提供事务功能,就是生产者在发送数据之前开启事务,然后发送消息,如果消息没有成功被RabbitMQ接收到,那么生产者会受到异常报错,这时就可以回滚事物,然后尝试重新发送;如果收到了消息,那么就可以提交事物。
缺点: RabbitMQ 事务已开启,就会变为同步阻塞操作,生产者会阻塞等待是否发送成功,太耗性能会造成吞吐量的下降。
或者使用死信队列对处理失败的数据进行拒绝,然后使用死信队列重复处理,处理时再用业务id进行幂等性处理
如何保证消息的可靠性
4.4.2、持久化
RabbitMQ中发消息的时候会有个durable参数可以设置,设置为true,就会持久化
这样的话MQ服务器即使宕机,重启后磁盘文件中有消息的存储,这样就不会丢失了吧。是的这样就一定概率的保障了消息不丢失。
但还会有个场景,就是消息刚刚保存到MQ内存中,但还没有来得及更新到磁盘文件中,突然宕机了。这个场景在持续的大量消息投递的过程中,会很常见。
那怎么办?我们如何作才能保障一定会持久化到磁盘上面呢?
confirm机制
RabbitMQ利用confirm机制来通知我们是否持久化成功
confirm机制的原理:
(1)消息生产者把消息发送给MQ,如果接收成功,MQ会返回一个ack消息给生产者;
(2)如果消息接收不成功,MQ会返回一个nack消息给生产者。
这样是不是就可以保障100%消息不丢失了呢?
如果我们生产者每发一条消息,都要MQ持久化到磁盘中,然后再发起ack或nack的回调。这样的话是不是我们MQ的吞吐量很不高,因为每次都要把消息持久化到磁盘中。写入磁盘这个动作是很慢的。这个在高并发场景下是不能够接受的,吞吐量太低了。
所以MQ持久化磁盘真实的实现,是通过异步调用处理的,他是有一定的机制,如:等到有几千条消息的时候,会一次性的刷盘到磁盘上面。而不是每来一条消息,就刷盘一次。
所以comfirm机制其实是一个异步监听的机制,是为了保证系统的高吞吐量,这样就导致了还是不能够100%保障消息不丢失,因为即使加上了confirm机制,消息在MQ内存中还没有刷盘到磁盘就宕机了,还是没法处理。
4.5 RabbitMQ 消息积压
消息积压一般有以下这几种情况:
1.消费者消费速度跟不上生产者生产速度,这种情况一般是前期设计阶段没设计好消费者和生产者的平衡关系,保证消费者的消费速度,增加消费者数量
2.消费者出现异常,导致一直不消费消息,从而导致消息积压,超出硬盘存储大小,紧急扩容,或者做其他紧急处理
其他紧急处理思路:
1.分析积压的原因是因为consume程序哪方面出了问题,先修复consumer的问题,确保恢复其消费速度,然后将现有的consumer停掉
2.新建一个新的exchange,临时建立10倍的queue
3.写一个临时程序将积压的消息重新导流到新的queue中
4.接着部署10倍的consumer去消费新的queue中的消息
5.这种做法相当于临时将queue和consumer的资源扩大10倍去快速消费消息
6.等快消费完了,再恢复到原来的架构,重新用原来的机器消费消息
4.6 顺序消费
在某些场景下,消息队列中的若干消息如果是对同一个数据进行操作,并且这些操作是有先后顺序的,那么消息中间件的消费者如果不考虑顺序性问题就会造成数据不一致的问题。
比如订单的先后顺序,比如数据同步服务binlog中增删改的顺序问题。
顺序错乱的场景:
一个Queue,多个Consumer消费
一个Queue,一个Consumer,但是消费者是多线程处理
解决办法:一个Queue,一个Consumer,单线程,这样解决了顺序消费的问题,但是性能很差,吞吐量很低。
如何保证消息的顺序消费,又保证高效呢: