目录
- 一 什么是MQ
- 1.1 MQ的概念
- 1.2 MQ的流量消峰
- 1.3 MQ的应用解耦
- 1.4 MQ的异步处理
- 1.5 MQ的分类以及如何选择
- 1.5.1 ActiveMQ
- 1.5.2 Apache Kafka
- 1.5.3 RabbitMQ
- 1.5.4 RocketMQ
- 1.5.5 四种MQ的区别
- 1.6 MQ的核心概念
- 1.6.1 MQ四大核心概念
- 1.6.2 MQ六大核心部分
- 1.7 安装RabbitMQ
- 二. Hello World
- 2.1 导入依赖
- 2.2 创建生产者
- 2.3 创建消费者
- 三. Work Queues(工作队列模式)
- 3.1 创建工作线程
- 3.2 创建生产者
- 3.3 结果分析
- 四. 消息应答机制
- 4.1 消息应答概念
- 4.2 消息手动应答
- 4.2.1 消息重新入队
- 4.2.2 消息手动应答代码实现
- 五. RabbiMQ消息持久化
- 5.1 消息持久化概念
- 5.2 如何持久化
- 5.2.1 队列持久化
- 5.2.2 消息持久化
- 5.2.3 不公平分发
- 5.2.4 预取值
- 六. 发布确认
- 6.1 发布确认概述
- 6.2 单个确认发布
- 6.3 批量确认发布
- 6.4 异步确认发布
- 6.5 异步未确认消息处理
- 七. 交换机
- 7.1 交换机是什么
- 7.2 fanout交换机
- 7.3 direct交换机
- 7.4 topic交换机
- 八. 死信队列
- 8.1 死信概念与来源
- 8.2 死信实战
一 什么是MQ
1.1 MQ的概念
MQ是消息队列(Message Queue
)的缩写,是一种在应用程序之间传递消息的技术。通常用于分布式系统
或异步通信
中,其中发送者将消息放入队列,而接收者从队列中获取消息。
这种异步通信模式允许发送者和接收者在不需要实时连接的情况下进行通信,从而提高了应用程序的性能和可伸缩性。
消息队列可用于许多不同的应用程序场景,例如处理大量的并发请求、实现任务异步处理、进行解耦和构建松散耦合的系统、实现日志记录和监视等。
下面用一个场景来更好的理解MQ:
订单处理系统:
一个电子商务网站,其中有一个订单处理系统。当用户提交订单时,系统需要将订单信息保存到数据库中,并发送电子邮件和短信通知给客户和仓库管理员。
如果没有使用消息队列,订单处理系统需要等待数据库操作完成,然后才能发送通知。当系统在高负载情况下出现延迟或响应缓慢的情况。
现在,假设使用消息队列来处理这个场景。当用户提交订单时,系统将订单信息放入消息队列中。
然后,另外一个系统(或者进程)从消息队列中获取订单信息,并将其保存到数据库中。
一旦订单信息被保存到数据库中,另一个系统(或者进程)会从消息队列中获取订单信息,并发送通知邮件和短信。
这样可以快速将订单信息放入消息队列中,然后立即返回响应给用户,可以轻松添加更多的处理器来处理消息队列中的订单信息。
下面就按照这个例子来说明一下MQ的三大特点:流量消峰,应用解耦,异步处理。
1.2 MQ的流量消峰
是当系统面临高峰期时,可以将请求放入消息队列中,然后按照系统的处理能力和资源情况逐步消耗消息队列中的请求,就是MQ的流量消峰。
在订单系统中如果客户端每秒有1万次的订单请求服务器,而服务器的承载量只能是5000单每秒,这时数据库可能过载,服务器可能会出现宕机。
可以将这些订单请求放入消息队列中,每秒5000次,依次进行排队访问数据库,也可以设置多个消息队列来处理不同类型的订单请求,进一步分散请求压力。
1.3 MQ的应用解耦
MQ的应用解耦是指将应用程序之间的耦合度降低,不同应用程序之间不再直接依赖,而是通过消息队列进行解耦。
当一个应用程序产生了消息,它只需要将消息发送到消息队列中,其他应用程序可以通过订阅消息队列来获取消息并进行相应的处理,这样就实现了应用程序之间的松耦合。
假设有一个电商网站的订单系统,在传统的应用程序架构中,订单系统和库存管理系统、物流管理系统之间通常存在紧密的耦合关系。一个子系统出现故障,都可能导致下单结果的异常。
通过使用MQ
,订单系统将订单信息放入消息队列中,然后库存管理系统,支付系统,物流管理系统分别订阅消息队列,进行相应的操作。
通过这种方式,系统之间不再直接依赖,而是通过消息队列进行解耦。
1.4 MQ的异步处理
在传统的同步处理模式中,当一个应用程序发送一个任务时,它必须等待任务执行完毕并获取结果,这样会导致应用程序的响应速度变慢,并且无法处理高并发的请求。
通过使用消息队列技术,将任务的执行和结果返回解耦,实现解绑,使得发送任务的应用程序可以立即返回,而不必等待任务执行完毕和结果返回。
A调用B的服务时,不需要等B返回结果后A再做自己的事情,只需加入消息队列,B完成任务之后,通过MQ发送消息通知A即可。
1.5 MQ的分类以及如何选择
常见的消息队列软件包括Apache Kafka、RabbitMQ、ActiveMQ,RocketMQ
等,下面简单的介绍这四种。
1.5.1 ActiveMQ
ActiveMQ
是一款基于Java
的消息中间件,它实现了 Java Message Service (JMS)
规范,用于在分布式应用程序中传递消息。
ActiveMQ
支持多种消息传递协议,包括AMQP、MQTT、OpenWire和STOMP
等,具有持久性、分布式集群、消息选择和安全性等功能。
ActiveMQ
还提供了高级功能,例如消息转换、消息存储、事务性会话、消息监控和调试,可与Spring
框架和其他Java应用程序集成。
1.5.2 Apache Kafka
Apache Kafka
是一种分布式的发布-订阅消息系统,用于处理大规模数据流和实时数据处理。
Kafka
通过分布式存储、分布式处理和分布式协作来处理数据,可以支持以每秒百万级别的速度处理数据,主要用于大数据场景。
它可以与Hadoop、Storm、Spark
等其他大数据处理平台集成使用。
1.5.3 RabbitMQ
RabbitMQ
是一个开源的消息中间件,它实现了高级消息队列协议(AMQP),用于在分布式应用程序中传递消息。
RabbitMQ是一个可靠、可扩展、跨平台的解决方案,可以通过网络链接不同的应用程序、服务或系统,并提供异步、可靠、事务性的消息传递机制。
RabbitMQ的架构基于Erlang语言实现,具有并发性和可伸缩性,可以在多个服务器上分布式地部署消息代理,从而提高系统的可扩展性和可用性。
1.5.4 RocketMQ
RocketMQ
是阿里巴巴开源的分布式消息中间件,具有高可用、高可靠、高吞吐量等特点。它支持发布/订阅模式和点对点模式,同时还提供了多种消息传递模式,如同步发送、异步发送和顺序发送等。
RocketMQ
还提供了消息事务机制和消息轨迹等高级功能,以及与Apache Storm和Apache Flink等大数据框架的集成。RocketMQ是一个成熟的、可靠的、企业级的消息中间件,已经被广泛应用于电商、金融、游戏等领域。
1.5.5 四种MQ的区别
已将Kafka、RabbitMQ、ActiveMQ、RocketMQ
的主要区别进行了比对,可以参考下表:
特点 | Kafka | RabbitMQ | ActiveMQ | RocketMQ |
---|---|---|---|---|
传递模式 | 发布订阅、点对点 | 发布订阅、点对点 | 发布订阅、点对点 | 发布订阅、点对点 |
协议支持 | 自有协议Kafka Protocol、HTTP | AMQP、STOMP、MQTT、HTTP、WebSockets | JMS、AMQP、STOMP、MQTT、OpenWire | 自有协议、MQTT、HTTP |
可扩展性 | 高 | 中等 | 高 | 高 |
可靠性 | 高 | 中等 | 高 | 高 |
吞吐量 | 高 | 中等 | 中等 | 高 |
实时性 | 高 | 低 | 低 | 高 |
存储机制 | 磁盘存储 | 内存或磁盘存储 | 内存或磁盘存储 | 磁盘存储 |
消息保证 | 至少一次、最多一次、精确一次 | 至少一次、最多一次、精确一次、无保证 | 至少一次、最多一次、精确一次 | 至少一次、最多一次、有序消息 |
支持分区 | 支持 | 不支持 | 不支持 | 支持 |
性能对比 | 高 | 低 | 低 | 高 |
应用场景 | 流处理、日志聚合、消息传递 | 数据处理、任务分发、消息传递 | 数据处理、消息传递、企业集成 | 流式数据处理、消息传递、在线事务 |
优点 | 高吞吐量、可靠性高、可扩展性高 | 消息可靠性高、支持多种消息传递模式 | 支持多种协议、可靠性高、消息传递稳定 | 高吞吐量、高可靠性、可扩展性高 |
缺点 | 实时性较低、生态系统较复杂 | 实时性较低、可扩展性一般 | 实时性较低、吞吐量较低 | 生态系统相对较小、部分功能较弱 |
1.6 MQ的核心概念
1.6.1 MQ四大核心概念
MQ(Message Queue
,消息队列)的四大核心概念是:
-
交换机:主要用于将消息路由到指定的队列或主题。
-
队列:用于存储消息的缓冲区,消息发布者将消息发送到队列,消息消费者从队列中取出消息进行处理。
-
生产者:MQ 中将消息发送到队列的应用程序,即消息的生产者。
-
消费者:MQ 中从队列中取出消息进行处理的应用程序,即消息的消费者。
生产者将消息发送到队列,中间通过交换机进行分发,消费者从队列中取出消息进行处理,实现了应用程序之间的解耦,提高了系统的可靠性、可扩展性和可维护性。
1.6.2 MQ六大核心部分
以下是RabbitMQ的六个核心部分:
-
Hello World:RabbitMQ的最简单入门示例,演示了如何在RabbitMQ中发送和接收简单的“Hello World”消息。
-
Work Queues:也称为任务队列模式,用于在多个消费者之间分配任务并平衡负载。多个消费者同时订阅同一个队列,其中一个消息只会被一个消费者接收和处理。
-
Publish/Subscribe:也称为发布/订阅模式,用于将消息广播给多个消费者。发布者将消息发送到一个交换机上,交换机将消息路由到所有与之绑定的队列,从而实现消息的广播。
-
Routing:也称为路由模式,用于将消息路由到指定的队列。发送者发送带有路由键(Routing Key)的消息到交换机上,交换机将根据路由键将消息路由到指定的队列。
-
Topics:也称为主题模式,类似于Routing模式,但路由键可以使用通配符。发送者发送带有主题(Topic)的消息到交换机上,交换机将根据主题将消息路由到相应的队列。
-
RPC:也称为远程过程调用模式,用于实现分布式应用程序之间的通信。客户端发送一个带有回调队列和唯一标识符的请求消息到RabbitMQ中,服务端接收请求并处理,将响应发送回客户端指定的回调队列中。
1.7 安装RabbitMQ
在Linux系统
中使用docker
去安装RabbitMQ
相对来说会简单许多,当然你的虚拟机前提是安装了docker
,安装docker
可以参考:【docker安装】。
在Linux系统中使用Docker安装RabbitMQ
,可以按照以下步骤操作:
-
搜索
RabbitMQ
镜像:在终端中执行以下命令来搜索可用的RabbitMQ
镜像:docker search rabbitmq
-
下载RabbitMQ镜像:选择一个合适的RabbitMQ镜像,使用以下命令从Docker Hub下载:
docker pull rabbitmq:3-management
其中,
3-management
是带有管理插件的RabbitMQ 3.x版本镜像。 -
运行RabbitMQ容器:使用以下命令启动一个RabbitMQ容器:
docker run -d --name my-rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
其中,
--name
指定容器名称,-p
指定容器和主机之间映射的端口。在本例中,RabbitMQ的默认端口5672和Web管理界面的端口15672都映射到主机的相应端口。 -
访问Web管理界面:在浏览器中访问
http://localhost:15672
,使用默认的guest/guest
账号和密码登录RabbitMQ Web管理界面。如果在本机浏览器访问,这里的localhost
需要改为LInux的IP地址。在使用完毕后,可以通过以下命令停止和删除容器:
# 停止容器 docker stop my-rabbitmq # 删除容器 docker rm my-rabbitmq
访问成功后,会出现以下页面:
二. Hello World
2.1 导入依赖
现在用RabbitMQ的最简单入门示例,演示如何在RabbitMQ中发送和接收简单的“Hello World”消息。
如上图所示,P
为生产者,C
为消费者,中间框是rabbitMQ
代表的消息缓冲区域。
在maven
中导入依赖:
<!-- rabbitmq依赖的客户端 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
<!-- 操作文件流的依赖 -->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.11.0</version>
</dependency>
2.2 创建生产者
package com.javadouluo.abbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @author jektong
* @date 2023年05月03日 17:05
*/
public class Producer {
// 队列名称
private static final String QUEUE_NAME = "hello world";
public static void main(String[] args) throws Exception{
// 创建一个工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 工厂IP 连接RabbitMQ队列
connectionFactory.setHost("192.168.10.100");
// 用户名
connectionFactory.setUsername("guest");
// 密码
connectionFactory.setPassword("guest");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 获取信道
Channel channel = connection.createChannel();
/**
* 生成一个队列
* 1.队列名称
* 2.队列中消息是否持久化,默认消息在内存中
* 3.是否只给一个消费者消费,true消息可以共享,false消息不可共享
* 4.是否自动删除,最后一个消费者断开连接之后是否自动删除该队列 true是自动删除
* 5.其他参数
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 发消息
String message = "hello world";
/**
* 发送一个消费
* 1.发送到哪个交换机
* 2.路由的key值哪个,这次是队列名称
* 3.其他参数信息
* 4.发送消息的消息内容
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕!!!");
}
}
basicPublish()
方法是RabbitMQ中AMQP协议提供的方法之一,用于将消息发布到指定的交换机中。
该方法的参数如下:
exchange
:表示消息发送到哪个交换机上,可以为空,表示使用默认的交换机。routingKey
:表示路由键,用于指定将消息路由到哪些队列中。如果使用默认的交换机,那么路由键就需要指定为队列名称。props
:表示消息的属性信息,一般为空,使用默认的属性即可。body
:表示要发送的消息内容,需要转换成字节数组形式。
使用 basicPublish()
方法可以将消息发送到交换机中,然后由交换机根据路由键将消息路由到对应的队列中,等待消费者进行消费。
执行完成之后,打开MQ管理界面,会发现名称为hello world
的队列名称。
主页面也会显示详细的信息:
2.3 创建消费者
package com.javadouluo.abbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author jektong
* @date 2023年05月03日 19:26
*/
public class Consumer {
public static final String QUEUE_NAME = "hello world";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建一个工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 工厂IP 连接RabbitMQ队列
connectionFactory.setHost("192.168.10.100");
// 用户名
connectionFactory.setUsername("guest");
// 密码
connectionFactory.setPassword("guest");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 声明 接收消息
DeliverCallback deliverCallback = (consumerTag,message)->{
System.out.println(new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否自动应答true自动应答,false代表手动应答
* 3.消费者未成功消费的回调
* 4.消费者取消消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
basicConsume
方法用于开始消费一个队列中的消息。具体来说,它向 RabbitMQ 服务器发送一个指令,告诉它我们希望从指定的队列中获取消息并开始消费。
basicConsume
方法的常用参数如下:
queue
:需要消费的队列的名称;autoAck
:如果为true
,则消费者获取到消息后立即自动确认;如果为false
,则需要调用basicAck
方法来手动确认消息;consumerTag
:消费者标识,用于标识当前消费者。如果不设置该参数,RabbitMQ 会为每个消费者生成一个唯一标识;deliverCallback
:消息处理回调函数,用于处理队列中获取到的消息。cancelCallback
:取消消费的回调函数,用于在消费者被取消消费时执行。
当一个消费者调用 basicConsume
方法后,RabbitMQ 服务器会立即将队列中的消息推送给它,并在消息发送完成后立即进行确认,以便让 RabbitMQ 知道该消息已经被消费过。
如果 autoAck
参数为 false
,则消费者需要调用 basicAck
方法来手动确认消息。
如果消费者在处理消息的过程中发生了异常,也可以调用 basicNack
方法将消息重新加入队列,以便重新进行消费。
运行代码查看消息已被消费:
三. Work Queues(工作队列模式)
3.1 创建工作线程
工作队列模式中,生产者发消息到队列,多个消费者从队列中获取消息并进行处理。每个消息只会被一个消费者处理,保证每个消息只会被处理一次,它们之间是竞争关系。
这个模式的特点是在消费者之间分配耗时的任务,一旦一个消息被消费者接收,它就会被从队列中删除。
RabbitMQ
会轮流地将消息发送给每个消费者。当消费者处理较慢或者某个消费者出现宕机等情况时,RabbitMQ
会重新将消息发送给其他消费者进行处理。
多个消费者,也称之为多个工作线程,下面用代码的方式去完成工作队列模式。
首先我们先改造上面消费者,复制两个工作线程(消费者)代码,为了方便加入一行注释即可,Work01.java
代码如下:
/**
* 这是一个工作线程
* @author jektong
* @date 2023年05月08日 21:49
*/
public class Work01 {
public static final String QUEUE_NAME = "hello world";
public static void main(String[] args) throws IOException, TimeoutException {
// 中间省略,与上面的消费者代码一致
// 接收消息
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否自动应答true自动应答,false代表手动应答
* 3.消费者未成功消费的回调
* 4.消费者取消消费的回调
*/
System.out.println("work01等待接收消息");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
Work02.java
代码如下:
/**
* 这是一个工作线程
* @author jektong
* @date 2023年05月08日 21:49
*/
public class Work01 {
public static final String QUEUE_NAME = "hello world";
public static void main(String[] args) throws IOException, TimeoutException {
// 中间省略,与上面的消费者代码一致
// 接收消息
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否自动应答true自动应答,false代表手动应答
* 3.消费者未成功消费的回调
* 4.消费者取消消费的回调
*/
System.out.println("work02等待接收消息");
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
3.2 创建生产者
这部分依然是基于上面的生产者代码,主要模拟出发送多个消息即可:
package com.javadouluo.abbitmq.two;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
/**
* @author jektong
* @date 2023年05月08日 22:10
* 生产者发送消息
*/
public class Task01 {
// 队列名称
private static final String QUEUE_NAME = "hello world";
public static void main(String[] args) throws Exception{
// 中间代码省略,与上面一致
/**
* 发送一个消费
* 1.发送到哪个交换机
* 2.路由的key值哪个,这次是队列名称
* 3.其他参数信息
* 4.发送消息的消息内容
*/
// 发送5条消息
for (int i = 0; i < 5; i++) {
channel.basicPublish("",QUEUE_NAME,null,("消息编号" + i).getBytes());
}
System.out.println("消息发送完毕!!!");
}
}
3.3 结果分析
启动两个工作线程,work01
在等待消息:
work02
也在等待消息:
好的,现在我们开始启动生产者发送消息给消费者,再看work01
这个工作线程:
再看work02
这个工作线程:
很明显,当有多个消息的时候,工作线程是通过轮询的方式去消费消息的。
四. 消息应答机制
4.1 消息应答概念
消息应答机制(Message Acknowledgment
)是一种确认消息是否被消费者成功处理的机制。在消息队列中,当一个消息被消费者获取并处理后,需要向消息队列发送一个确认信息,告诉消息队列该消息已经被消费者成功处理了。
在消息队列中,如果某个消息没有被消费者成功处理,那么它将一直留在消息队列中,直到被正确处理为止。如果没有消息应答机制,则消息队列无法知道哪些消息是否被成功处理。
通常情况下,消息应答机制分为两种模式:自动应答模式和手动应答模式。
自动应答模式
自动应答是指当消费者从队列中接收到消息时,立即将消息从队列中删除,而不需要等待消费者明确地向RabbitMQ确认是否已经处理完成。
自动应答的优点是消费者能够迅速地将消息从队列中移除,提高了消费者的消息处理效率和吞吐量。另外,它使得消息处理变得简单,因为消费者不需要处理应答确认的逻辑。
自动应答也存在一些缺点。如果消费者在处理消息时发生了异常,这些消息将会被丢失而无法重新投递。
如果消费者处理消息的时间很长,而没有明确的确认机制,消息队列无法知道消息是否已被处理,从而导致消息被多次处理,甚至可能导致消息丢失。
手动应答模式
在实际生产环境中,一般采用手动应答的方式来保证消息的可靠处理。
手动应答是指在消费者处理完一条消息后,需向 RabbitMQ
显示地发送一个确认应答信号。
这种方式需要调用channel.basicAck()
方法来通知当前消息已经被消费,可以将其从队列中删除。
如果在消息处理过程中发生了异常,可以调用channel.basicNack()
方法来拒绝当前消息并将其重新放回队列中。此外,还可以使用channel.basicReject()
方法将消息拒绝并将其丢弃。
上面这三种方法需要记住后面详细说明。
手动应答的优点是能够保证消息的可靠处理,可以避免由于消费者处理失败而导致消息丢失的问题。
同时,手动应答可以根据实际情况自行控制消息的处理方式。
对于手动应答还有一个好处就是可以使用批量应答,在批量应答中,消费者可以一次性确认多个消息的处理结果,以提高消息确认的效率。
消费者可以使用basicAck
方法的multiple
参数来进行批量应答,例如:
channel.basicAck(deliveryTag, true)
其中deliveryTag
表示消息的唯一标识,第二个参数决定是否批量确认多条消息。true
表示批量处理消息。
这样,消费者就可以一次性确认多个消息的处理结果了。
对于第二个参数为true
与false
的区别:
例如,当调用channel.basicAck(10, true)
时,会确认 Delivery Tag
从 1 到 10 的所有消息。
而当调用 channel.basicAck(10, false)
时,只会确认Delivery Tag
为 10 的这条消息。
手动应答的缺点是增加了代码的复杂度和实现的难度,需要开发人员自己处理消息的确认和拒绝操作。
手动应答也可能会导致消息处理的延迟,因为需要等待消费者确认消息后才能将其从队列中删除。
4.2 消息手动应答
4.2.1 消息重新入队
如果消息出现上面所说的没有被正确处理掉,需要将消息重新放入消息队列中让其他消费者来消费,从而保证消息的准确性,如下图所示:
4.2.2 消息手动应答代码实现
现在编写代码,用一个生产者和两个消费者来实现消息手动应答不丢失,然后重新入队被消费。
生产者代码
/**
* @author jektong
* @date 2023年05月13日 20:15
*/
public class Task2 {
// 队列名称
private static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws Exception{
Channel channel = RabbitMqUtils.getChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 发消息
Scanner sc = new Scanner(System.in);
while (sc.hasNext()){
String msg = sc.next();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("生产者发送消息:" + msg);
}
}
}
消费者Work03代码
/**
* @author jektong
* @date 2023年05月13日 20:23
*/
public class Work03 {
public static final String QUEUE_NAME = "ack_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 接收消息
Channel channel = RabbitMqUtils.getChannel();
System.out.println("消费者1处理消息时间较短");
// 接收消息后处理
DeliverCallback deliverCallback = (consumerTag, message)->{
try {
// 等待1s处理消息
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println(new String(message.getBody()));
// 手动应答
/**
* arg1:表示消息标识
* arg2:是否批量应答(之前详细说了此方法)
*/
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消费被中断");
};
// 手动应答为fasle
boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
}
}
对于消费者Work04
代码只需修改一下等待时间即可。
// 等待30s后处理消息
TimeUnit.SECONDS.sleep(30);
让Work04
在30秒中给它断开连接,达到让给它进行消费的消息会重新入队给消费者Work03
进行消费(请自行测试)。
五. RabbiMQ消息持久化
5.1 消息持久化概念
上面只是处理了消息不被丢失的情况,但如果要保障当RabbiMQ
服务停掉之后的消息不丢失,因为在默认的情况下,RabbiMQ
会忽略队列与消息。
如果将消息标记为持久化,那么当RabbitMQ关闭或重新启动时,该消息将仍然存在,消息的持久性标志需要同时设置队列和消息的标志。
5.2 如何持久化
5.2.1 队列持久化
上面创建的生产者并没有进行持久化,需要将要其进行持久化,需要标记为durable=true
// 声明队列
boolean durable = true
channel.queueDeclare(QUEUE_NAME,durable ,false,false,null);
需要注意,若之前队列未进行持久化需要将之前的队列进行删除,否则会出现错误。
打开消息管理界面证明队列已经被持久化:
5.2.2 消息持久化
要使发布的消息持久化,需要在消息属性中设置MessageProperties.PERSISTENT_TEXT_PLAIN
属性,修改上述生产者的代码:
// 将消息保存到磁盘上
channel.basicPublish("",QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());
注意,将消息标记为持久化并不能保证它们会永久保存,因为RabbitMQ仍然可以丢失刚要写入磁盘,但是还未完全写入磁盘的消息。因此,要确保消息不会丢失,还需要使用备份和复制策略(后面会说)。
5.2.3 不公平分发
在某些情况下,某个消费者的处理速度比其他消费者慢,这时就需要采用不公平分发的方式,即使某些消费者处于忙碌状态,也将消息发送给它们。
在不公平分发中,RabbitMQ
仍然会将每个消息发送给所有的消费者,但是会将消息发送给第一个处于空闲状态的消费者。
因此,快速处理消息的消费者将会更快地获得更多的消息,而处理较慢的消费者将会逐渐减少接收到的消息数量。
不公平分发的实现方法与公平分发相同,只需不使用basicQos
方法设置prefetchCount
即可,将Work03
与Work04
加入以下代码:
int prefetchCount = 1;
// 使用不公平分发
channel.basicQos(prefetchCount);
5.2.4 预取值
当消费者连接到队列并开始接收消息时,RabbitMQ会按照预取值设置来决定一次性发送给消费者的消息数量。
预取值的设置是在消费者端生效的,而不是在队列端。每个消费者可以独立设置自己的预取值。
因此不同的消费者可以根据自身的处理能力和需求来设置合适的预取值。
比如一开始有7条消息,通过设置预取值给消费者1与2分别发送2条与5条。使用通过channel.basicQos(prefetchCount)
设置预取值。
将Work03
修改以下代码:
// 设置预取值
int prefetchCount = 2;
channel.basicQos(prefetchCount);
将Work04
修改以下代码:
// 设置预取值
int prefetchCount = 5;
channel.basicQos(prefetchCount);
channel.basicQos(prefetchCount)
此方法参数值若为0则是轮询分发,1是不公平分发,其它值都是设置预取值。
六. 发布确认
6.1 发布确认概述
发布确认的原理是基于AMQP协议中的信道(Channel)级别的确认机制。
当生产者发送一条消息到RabbitMQ时,会在信道上启用发布确认模式。一旦启用了发布确认模式,每次发送消息时,生产者都会为该消息分配一个唯一的传递标签(Delivery Tag)。
RabbitMQ在接收到消息后,会发送一个确认消息(ACK)给生产者,通知生产者消息已成功接收。确认消息中包含了相应消息的传递标签。
生产者可以通过三种方式进行发布确认的处理:单个确认发布,批量确认发布与异步确认发布。
6.2 单个确认发布
一种简单的确认模式,使用同步确认发布的方式,单个消息确认的基本流程如下:
- 生产者发送消息到
RabbitMQ
。 - 生产者等待
RabbitMQ
的确认消息。 - 如果在指定的超时时间内收到了确认消息,表示消息已成功接收,生产者可以继续发送下一条消息。
- 如果超时时间内未收到确认消息,生产者可以根据需求进行相应的处理,例如重发消息、记录日志、执行补偿逻辑等。
缺点就是发布速度很慢,下面用代码实现此种方式并查看这种方式发送消息的时间。
public static void publishMessageSingle() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
// 开启发布确认
channel.confirmSelect();
// 开始时间
long startTime = System.currentTimeMillis();
// 批量发送消息
for (int i = 0; i < 1000; i++) {
String msg = i + "";
channel.basicPublish("",queueName,null,msg.getBytes());
// 单个消息发布确认
boolean flag = channel.waitForConfirms();
if(flag){
System.out.println("消息发送成功");
}
}
// 结束时间
long endTime = System.currentTimeMillis();
System.out.println("发布"+1000+"个单独确认消息耗时"+(endTime-startTime)+"ms");
}
运行代码,发现耗时·410ms
:
6.3 批量确认发布
批量消息确认模式下,生产者可以一次性发送多条消息,并在所有消息都被成功接收后进行确认。
生产者会设置一个确认窗口(Confirm Window
),窗口大小决定了可以未确认的消息数量。
当窗口中的所有消息都被确认后,生产者会收到一个批量确认消息(Batch Ack
)。
批量消息确认的基本流程如下:
- 生产者发送多条消息到
RabbitMQ
。 - 生产者设置一个确认窗口大小。
- 当发送的消息数量达到确认窗口大小时,生产者等待RabbitMQ的批量确认消息。
- 如果收到批量确认消息,表示窗口中的所有消息都已成功接收,生产者可以继续发送下一批消息。
- 如果超时时间内未收到批量确认消息,生产者可以根据需求进行相应的处理,例如重发消息、记录日志、执行补偿逻辑等。
通过批量确认消息,生产者可以确保一批消息的完整性,适用于对消息完整性要求不那么严格的场景。
但是如果出现了问题,就并不知道哪个消息是否出现了问题。
下面是批量发布消息确认的实现:
/**
* 批量消息确认
* @throws Exception
*/
public static void publishMessageBatch() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
// 开启发布确认
channel.confirmSelect();
// 开始时间
long startTime = System.currentTimeMillis();
// 批量确认消息的大小
int batchSize = 100;
// 批量发送消息
for (int i = 0; i < 1000; i++) {
String msg = i + "";
channel.basicPublish("",queueName,null,msg.getBytes());
if(i % batchSize == 0){
// 消息发布确认
channel.waitForConfirms();
System.out.println("消息发送成功");
}
}
// 结束时间
long endTime = System.currentTimeMillis();
System.out.println("发布"+1000+"个单独确认消息耗时"+(endTime-startTime)+"ms");
}
运行代码,发现耗时34ms
:
6.4 异步确认发布
异步确认的性价比比上面两种方式都要高,原因就是生产者发送消息后不会立即等待确认消息,而是继续发送下一条消息。
同时,生产者会通过一个异步回调(Callback)函数来处理确认消息的回调操作,来确认消息是否发送成功。
异步消息确认的基本流程如下:
- 生产者发送消息到
RabbitMQ
。 - 生产者不会立即等待确认消息,而是继续发送下一条消息。
- 生产者注册一个异步回调函数,用于处理确认消息。
- 当RabbitMQ接收到消息并完成处理后,会异步发送确认消息给生产者。
- 一旦生产者收到确认消息,就会触发回调函数执行相应的逻辑,比如记录日志、更新状态等。
下面是消息异步确认的实现:
/**
* 异步消息确认
* @throws Exception
*/
public static void publishMessageAsync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
// 开启发布确认
channel.confirmSelect();
// 开始时间
long startTime = System.currentTimeMillis();
// 消息处理成功回调
ConfirmCallback ackCallback = (var1,var2)->{
System.out.println("未确认的消息" + var1);
};
// 消息未处理成功回调
ConfirmCallback nackCallback = (var1,var2)->{
System.out.println("消息发送成功了" + var1);
};
// 消息监听器
channel.addConfirmListener(ackCallback,nackCallback);
// 批量发送消息
for (int i = 0; i < 1000; i++) {
String msg = i + "";
channel.basicPublish("",queueName,null,msg.getBytes());
}
// 结束时间
long endTime = System.currentTimeMillis();
System.out.println("发布"+1000+"个异步确认消息耗时"+(endTime-startTime)+"ms");
}
运行代码,发现耗时18ms
:
6.5 异步未确认消息处理
对于异步确认中未确认消息的处理,有一个方案就是将未确认的消息放到一个基于内存的能被发布的线程访问的队列中。
比如使用ConcurrentLinkedQeque
在多个线程之间进行消息传递。多个线程可以同时发送消息与接收消息,实现消息的并发传递。
在发送的消息时,记录发送过的消息,在回调函数删除已经确认成功的消息,代码实现如下:
/**
* 异步消息确认
* @throws Exception
*/
public static void publishMessageAsync() throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 队列声明
String queueName = UUID.randomUUID().toString();
channel.queueDeclare(queueName,true,false,false,null);
// 开启发布确认
channel.confirmSelect();
/**
* 安全的线程有序的哈希表,就是一个容器,适用于高并发
* 1.将序号与消息关联
* 2.轻松批量删除
* 3.支持高并发
*/
ConcurrentSkipListMap<Long,String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
// 开始时间
long startTime = System.currentTimeMillis();
// 消息处理成功回调
// var1: 消息序列号
// var2: 是否批量
ConfirmCallback ackCallback = (var1,var2)->{
if(var2){
// 删除已经确认的消息,剩下的就是未确认的消息
ConcurrentNavigableMap<Long, String> confirmed =
concurrentSkipListMap.headMap(var1);
confirmed.clear();
}else{
concurrentSkipListMap.remove(var1);
}
System.out.println("确认的消息" + var1);
};
// 消息未处理成功回调
ConfirmCallback nackCallback = (var1,var2)->{
String unConfirm = concurrentSkipListMap.get(var1);
System.out.println("未确认的消息是:"+unConfirm+",消息发送失败了失败标记:" + var1);
};
// 消息监听器
channel.addConfirmListener(ackCallback,nackCallback);
// 批量发送消息
for (int i = 0; i < 1000; i++) {
String msg = i + "";
channel.basicPublish("",queueName,null,msg.getBytes());
concurrentSkipListMap.put(channel.getNextPublishSeqNo(),msg);
}
// 结束时间
long endTime = System.currentTimeMillis();
System.out.println("发布"+1000+"个异步确认消息耗时"+(endTime-startTime)+"ms");
}
ConcurrentSkipListMap
是一个线程安全的有序哈希表,适用于高并发环境。它可以将消息的序列号与消息内容关联起来,并支持高并发的读写操作。
用它来实现通过回调函数处理消息的确认和未确认情况。
七. 交换机
7.1 交换机是什么
交换机(Exchange
)是消息的路由中心,负责接收生产者发送的消息,并根据一定的路由规则将消息路由到一个或多个队列中,决定消息从生产者到达队列的路径。
在RabbitMQ
中有这几个常见的路由规则:直接模式,主题模式,头部模式和Fanout模式等之后细说。在RabbitMQ
提供的管理界面可以看到:
交换机主要有以下几个作用:
- 接收消息:交换机接收来自生产者的消息,并负责将消息发送到合适的队列。
- 路由消息:交换机根据预定义的路由规则将消息路由到一个或多个队列。
- 分发消息:如果一个交换机路由消息到多个队列,那么交换机会将消息复制到所有符合路由规则的队列中,实现消息的广播或者多播。
- 支持不同的路由模式:交换机可以根据不同的路由模式来决定如何路由消息,例如直接交换、扇形交换、主题交换等
对于交换机还有和它相关的一些概念例如绑定(bindings
),很好理解,就是交换机与队列之间可以通过一个RoutingKey
将两者绑定,这样可以将想要的消息发送至指定的队列中。
#pic_center
7.2 fanout交换机
在fanout
模式下,交换机会将消息广播到所有与之绑定的队列,无论消息的路由键是什么。
fanout
模式的特点如下:
- 广播消息:Fanout交换机会将消息复制到所有与之绑定的队列中,实现消息的广播。每个消费者都会收到相同的消息副本。
- 忽略路由键:Fanout交换机忽略消息的路由键,它只关注与之绑定的队列。
- 适用于发布/订阅模式:Fanout模式常用于发布/订阅模式,其中一个生产者发送消息,多个消费者接收并处理消息。
下面用代码来测试一下fanout
模式:
消费者代码:
package com.javadouluo.abbitmq.five;
import com.javadouluo.abbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author jektong
* @date 2023年05月24日 22:23
*/
public class ReceiveLogs01 {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
// 声明一个交换机logs,类型是fanout
channel.exchangeDeclare("logs","fanout");
// 声明一个临时队列,名称是随机的
// 当消费者断开与队列的连接时,队列自动删除
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机与对列
channel.queueBind(queueName,"logs","");
System.out.println("将消息打印到控制台上......");
// 接收消息后处理
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("01接收的消息是:"+ new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消费被中断");
};
channel.basicConsume(queueName,true,deliverCallback,cancelCallback);
}
}
生产者代码:
package com.javadouluo.abbitmq.five;
import com.javadouluo.abbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* @author jektong
* @date 2023年05月27日 10:12
*/
public class EmitLog {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
// 声明一个交换机logs,类型是fanout
channel.exchangeDeclare("logs","fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.next();
// 发送消息
channel.basicPublish("logs","",null,msg.getBytes());
System.out.println("生产者发出消息:" + msg);
}
}
}
将上面的消费者复制两份,然后启动生产者与消费者,通过生产者发送消息,发现两个消费者都收到了消息,这就是fanout
模式下的广播消息的特点:
同时在管理平台上也可以看到创建的交换机:
7.3 direct交换机
直接交换机(direct)主要特点就是绑定的路由键是不一样的,它还有一个功能就是实现多重绑定。
多重绑定就是直接交换机可以有多个路由键来绑定一个交换机,如下图所示:
下面用代码实现上述功能:
消费者DirectReceiveLogs01代码:
package com.javadouluo.abbitmq.six;
import com.javadouluo.abbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author jektong
* @date 2023年06月28日 0:58
*/
public class DirectReceiveLogs01 {
// 队列名称
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
// 声明一个交换机logs,类型是direct
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
// 声明一个队列
channel.queueDeclare("console",false,false,false,null);
// 绑定交换机与对列
channel.queueBind("console",EXCHANGE_NAME,"info");
channel.queueBind("console",EXCHANGE_NAME,"warn");
// 接收消息后处理
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("DirectReceiveLogs01接收的消息是:"+ new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消费被中断");
};
channel.basicConsume("console",true,deliverCallback,cancelCallback);
}
}
消费者DirectReceiveLogs02代码:
package com.javadouluo.abbitmq.six;
import com.javadouluo.abbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author jektong
* @date 2023年06月28日 0:58
*/
public class DirectReceiveLogs02 {
// 队列名称
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
// 声明一个交换机logs,类型是direct
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
// 声明一个队列
channel.queueDeclare("disk",false,false,false,null);
// 绑定交换机与对列
channel.queueBind("disk",EXCHANGE_NAME,"error");
// 接收消息后处理
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("DirectReceiveLogs02接收的消息是:"+ new String(message.getBody()));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消费被中断");
};
channel.basicConsume("disk",true,deliverCallback,cancelCallback);
}
}
生产者DirectLog代码:
package com.javadouluo.abbitmq.six;
import com.javadouluo.abbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
/**
* @author jektong
* @date 2023年06月28日 1:15
*/
public class DirectLog {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String msg = scanner.next();
// 发送消息
channel.basicPublish("direct_logs","info",null,msg.getBytes());
System.out.println("生产者发出消息:" + msg);
}
}
}
测试方法与Fanout交换机一致请自行测试,最后实现的绑定的方式是这样的:
7.4 topic交换机
topic
交换机也叫做主题交换机,看之前直接交换机的那张图,如果现在要通过某种规则同时通过路由键error
路由到队列disk
,通过warn
路由到队列console
,这时就需要使用topic
交换机来进行消息的路由。
主题交换机的好处就是使消息路由到队列的方式更加的灵活。
对于topic
交换机的路由键需要遵循下面的规则:路由键(RoutingKey)必须是单词并且用点隔开,比如aa.route.bb
,还有注意的是星号*
代表一个单词,#
代表一个或者两个单词。
所以上图中的*.orange.*
代表中间单词是orange
的三个单词,*.*.rabbit
代表最后一个单词是rabbit
的三个单词,lazy.#
代表的是第一个单词是lazy
的多个单词。
现在根据上图的队列绑定关系,举例说明它们的数据接收情况:
路由键 | 说明 |
---|---|
quick.orange.rabbit | 被Q1,Q2接收 |
lazy.orange.elephant | 被Q1,Q2接收 |
quick.orange.fox | 被Q1接收 |
lazy.brown.fox | 被Q2接收 |
lazy.pink.rabbit | 满足两个绑定但是会被Q2接收一次 |
quick.brown.fox | 不匹配不会被任何队列接收,会被丢弃 |
quick.orange.male.rabbit | 不匹配不会被任何队列接收,会被丢弃 |
lazy.orange.male.rabbit | 四个单词,但是会匹配Q2 |
接下来通过代码来实现topic
交换机的路由消息的模式。
消费者ReceiveLogsTopic01代码
package com.javadouluo.abbitmq.seven;
import com.javadouluo.abbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.sql.SQLOutput;
import java.util.concurrent.TimeoutException;
/**
* 主题交换机
* @author jektong
* @date 2023年06月29日 0:35
*/
public class ReceiveLogsTopic01 {
// 交换机名称
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
// 声明一个交换机logs,类型是topic
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
// 声明一个队列
channel.queueDeclare("Q1",false,false,false,null);
// 路由键绑定队列
channel.queueBind("Q1",EXCHANGE_NAME,"*.orange.*");
System.out.println("等待接收消息");
// 接收消息后处理
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接收队列:" + "Q1"+" 绑定键:" + message.getEnvelope().getRoutingKey());
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消费被中断");
};
channel.basicConsume("Q1",true,deliverCallback,cancelCallback);
}
}
消费者ReceiveLogsTopic02代码
package com.javadouluo.abbitmq.seven;
import com.javadouluo.abbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 主题交换机
* @author jektong
* @date 2023年06月29日 0:35
*/
public class ReceiveLogsTopic02 {
// 交换机名称
public static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
// 声明一个交换机logs,类型是topic
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
// 声明一个队列
channel.queueDeclare("Q2",false,false,false,null);
// 路由键绑定队列
channel.queueBind("Q2",EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind("Q2",EXCHANGE_NAME,"lazy.#");
System.out.println("等待接收消息");
// 接收消息后处理
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println(new String(message.getBody(),"UTF-8"));
System.out.println("接收队列:" + "Q2"+" 绑定键:" + message.getEnvelope().getRoutingKey());
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消费被中断");
};
channel.basicConsume("Q2",true,deliverCallback,cancelCallback);
}
}
生产者TopicLog代码
package com.javadouluo.abbitmq.seven;
import com.javadouluo.abbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author jektong
* @date 2023年06月29日 0:46
*/
public class TopicLog {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
Map<String,String> map = new HashMap<>();
map.put("quick.orange.rabbit","被Q1,Q2接收");
map.put("lazy.orange.elephant","被Q1,Q2接收");
map.put("quick.orange.fox","被Q1接收");
map.put("lazy.brown.fox","被Q2接收");
map.put("lazy.pink.rabbit","满足两个绑定但是会被Q2接收一次");
map.put("quick.brown.fox","不匹配不会被任何队列接收,会被丢弃");
map.put("quick.orange.male.rabbi","不匹配不会被任何队列接收,会被丢弃");
map.put("lazy.orange.male.rabbit","不匹配不会被任何队列接收,会被丢弃");
for(Map.Entry<String,String> maps:map.entrySet()){
// 路由匹配键
String key = maps.getKey();
// 路由匹配值
String value = maps.getValue();
channel.basicPublish("topic_logs",key,null,value.getBytes("UTF-8"));
System.out.println("生产者发送消息:" + value);
}
}
}
请自行测试,最后在管理平台中可以发现,实现的绑定的方式是这样的:
八. 死信队列
8.1 死信概念与来源
某些特殊的情况下,某些消息无法被正常消费,并且之后也没有后续的处理,消息队列通常提供了一个特殊的队列,即死信队列。
使用死信队列的好处是,它提供了一种容错机制,确保无法被正常消费的消息不会丢失。同时,它也提供了故障排查和问题处理的机会。
产生死信消息的来源可能有下列一些原因:
-
队列达到最大长度:队列已满,无法添加消息。
-
消费者发生异常错误:消费者在处理消息时可能发生错误,导致消息处理失败。
-
消息TTL过期:某些消息可能具有处理时限,如果消费者无法在规定的时间内处理消息,则消息可能被视为无法正常消费,这种我们称为消息TTL过期。
8.2 死信实战
在写代码之前,先看一张图来了解死信队列在RabbitMQ中是如何交互的:
简单解释一下上图,生产者发送消息到MQ中通过直接交换机normal_exchange
通过路由键zhangsan
绑定队列normal_queue
当处在其中的消息无法被C1消费者所消费,此时此消息为死信消息,将通过直接交换机dead_exchange
通过路由键lisi
绑定的dead_queue
放入该队列交给C2消费。
此过程最重要的就是死信消息如何可以通过normal_queue
来与直接交换机dead_exchange
所交互然后放入死信队列的dead_queue
,下面就来实现上图的代码实现。
消费者Consumer01代码
package com.javadouluo.abbitmq.eight;
import com.javadouluo.abbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author jektong
* @date 2023年06月30日 0:02
*/
public class Consumer01 {
// 直接交换机normal_exchange
public static final String NORMAL_EXCHANGE = "normal_exchange";
// 直接交换机dead_exchange
public static final String DEAD_EXCHANGE = "dead_exchange";
// 队列normal_queue
public static final String NORMAL_QUEUE = "normal_queue";
// 队列dead_queue
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
// 声明一个普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明一个死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
// 声明普通与死信队列
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key","lisi");
// arguments就是用来绑定死信交换机的
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
// 普通交换机与普通队列绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
// 死信交换机与死信队列绑定
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
// 接收消息后处理
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("Consumer01接收的消息是: " + new String(message.getBody(),"UTF-8"));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消费被中断");
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
}
}
生产者Product代码
package com.javadouluo.abbitmq.eight;
import com.javadouluo.abbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author jektong
* @date 2023年06月30日 0:27
*/
public class Product {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
// 死信消息设置过期时间10s
AMQP.BasicProperties properties = new AMQP.BasicProperties()
.builder().expiration("10000").build();
// 向普通队列发送消息
for (int i = 0; i < 11; i++) {
String msg = "info" + i;
channel.basicPublish("normal_exchange","zhangsan",properties,msg.getBytes());
}
}
}
先启动消费者Consumer01
然后将其停止,启动生产者Product
发送消息,消息会超时发送至死信队列中:
消费者Consumer02代码
package com.javadouluo.abbitmq.eight;
import com.javadouluo.abbitmq.utils.RabbitMqUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;
/**
* @author jektong
* @date 2023年06月30日 0:02
*/
public class Consumer02 {
// 队列dead_queue
public static final String DEAD_QUEUE = "dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
// 接收消息后处理
DeliverCallback deliverCallback = (consumerTag, message)->{
System.out.println("Consumer02接收的消息是: " + new String(message.getBody(),"UTF-8"));
};
// 取消消息时的回调
CancelCallback cancelCallback = consumerTag->{
System.out.println("消息消费被中断");
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
}
}
先启动消费者Consumer02
,之前的消息会被其消费: