文章目录
- 1. 补充知识:同步通讯和异步通讯
- 1.1 同步通讯
- 1.2 异步通讯
- 2. 同步调用的缺点
- 2.1 业务耦合
- 2.2 性能较差
- 2.3 级联失败
- 3. 什么情况下使用同步调用
- 4. 异步调用
- 5. 异步调用的优点和缺点
- 5.1 异步调用的优点
- 5.1.1 解除耦合,拓展性强
- 5.1.2 无需等待,性能好
- 5.1.3 故障隔离
- 5.1.4 削峰填谷
- 5.2 异步调用的缺点
- 5.2.1 不能得到调用结果
- 5.2.2 不确定下游业务执行是否成功
- 5.2.3 业务安全依赖于消息代理的可靠性
- 6. MQ 的技术选型
- 7. 安装 RabbitMQ 并启动 RabbitMQ
- 7.1 搜索 RabbitMQ 镜像
- 7.2 下载 RabbitMQ 镜像
- 7.3 启动 RabbitMQ
- 7.4 访问 RabbitMQ 的管理页面
- 7.5 可能遇到的问题
- 7.5.1 安全组和防火墙未开放端口
- 7.5.2 RabbitMQ 没有安装 Web 插件
- 8. RabbitMQ 的整体架构和核心概念
- 9. RabbitMQ 快速入门
- 9.1 新建队列
- 9.2 绑定队列与交换机
- 9.3 发送消息
- 9.4 可能遇到的问题
- 10. 数据隔离
- 10.1 新建用户
- 10.2 为新用户创建一个 VirtualHost
- 10.3 测试不同 VirtualHost 之间是否有数据隔离
- 11. 在 SpringBoot 项目中集成 RabbitMQ
- 11.1 AMQP 和 SpringAMQP
- 11.2 快速入门
- 11.2.1 引入 Maven 依赖
- 11.2.2 编写与 RabbitMQ 有关的配置信息
- 11.3 完成一个简单的案例
- 11.3.1 创建队列
- 11.3.2 发送消息
- 11.3.3 接收消息
- 12. Work Queues 模型
- 12.1 Work Queues 的概念
- 12.2 Work Queues 模型的消息推送机制
- 13. 交换机
- 13.1 Fanout 交换机
- 13.1.1 Fanout 交换机的概念
- 13.1.2 快速上手
- 13.2 Direct 交换机
- 13.2.1 Direct 交换机的概念
- 13.2.2 快速上手
- 13.3 Topic 交换机(推荐使用)
- 13.3.1 Topic 交换机的概念
- 13.3.2 快速上手
- 14. 在 SpringBoot 项目中声明队列和交换机的方式
- 14.1 编程式声明
- 14.1.1 SpringAQMP提供的创建队列和交换机的类
- 14.1.2 快速上手
- 14.1.3 编程式声明的缺点
- 14.2 注解式声明(推荐使用)
- 15. 消息转换器
- 15.1 默认的消息转换器
- 15.2 自定义消息转换器
1. 补充知识:同步通讯和异步通讯
1.1 同步通讯
同步通讯是指发送方在发送消息后,会等待接收方的回应,直到收到回应后才会继续执行后续操作
同步通讯的特点是:
- 阻塞:发送方在等待回应期间会被阻塞,无法进行其他操作
- 顺序执行:消息的处理是按照发送和接收的顺序进行的,确保了消息的时序性
- 实时反馈:发送方可以立即得到接收方的回应,适用于需要立即确认的场景
- 占用资源:由于需要等待,可能会造成资源的浪费,如线程阻塞
打电话就是一个典型的同步通讯例子,通话双方必须实时交流,一方说话时,另一方必须等待
1.2 异步通讯
异步通讯是指发送方在发送消息后,不需要等待接收方的立即回应,就可以继续执行其他操作。接收方在处理完消息后,可能会在未来的某个时间点给出回应
异步通讯的特点是:
- 非阻塞:发送方在发送消息后可以立即继续其他工作,不会因为等待回应而被阻塞
- 解耦:发送方和接收方在时间上解耦,可以独立处理各自的任务
- 灵活:异步通讯可以处理更复杂的通信模式,如消息队列、事件驱动等
- 资源利用率高:更高效地利用资源,因为不需要等待,可以提高系统的吞吐量
电子邮件是一个异步通讯的例子,你可以发送一封邮件后继续做其他事情,收件人可以在任何时间回复邮件(微信聊天也是一个异步通讯的例子)
2. 同步调用的缺点
我们以支付业务
为例分析同步调用的缺点
支付业务采用的是同步调用的方式,因为我们在执行更新支付状态
操作和更新订单状态
之前,需要先知道扣减余额
操作的结果,这种同步调用方式存在几个问题
2.1 业务耦合
第一个问题就是业务耦合的问题,对于支付服务来说,最重要的一件事就是扣减用户的余额,然后更新支付状态
后续的更新订单状态
操作跟支付服务是没什么关系的,但是支付成功之后确实需要更新订单状态,所以支付服务不得不调用交易服务来更新订单状态
那有同学就说了,我在支付服务里面加一行代码不就可以调用交易服务了吗,听起来没什么问题,但是业务是会变化的,产品经理的脑洞你也是想象不到的
想象一下,产品经理提了一个新的需求,用户支付成功之后要发一个短信通知用户,产品经理一提需求,我们就要更改源代码
某一天,产品经理又提了一个新需求,用户支付成功之后,要为用户增加一定的积分
这种同步调用的方式拓展性比较差,不符合面向对象编程中的开闭原则
2.2 性能较差
如果采用同步调用的方式,支付服务需要等待其它所有服务完成操作,耗时会大大增长,十分影响用户的体验
2.3 级联失败
想象一下,交易服务出现故障了,而这个故障迟迟没有得到解决,最终就很有可能拖垮支付服务,导致支付服务的资源被耗尽,也出现故障,出现级联失败的情况
3. 什么情况下使用同步调用
经过上面的分析,有同学可能会有这样的疑问:既然同步调用有这么多问题,为什么我们还要用同步调用呢,什么情况下使用同步调用呢
一般来说,使用同步调用的场景都有一个特点:下一步操作依赖于上一步操作的结果
以上面的支付业务为例,交易服务、通知服务、积分服务都依赖于支付服务的结果
当支付服务成功扣减用户余额并成功更新支付状态之后,交易服务、通知服务、积分服务就可以开始执行相应的操作了
然而,通知服务不依赖于交易服务,积分服务也不依赖于通知服务
在成功扣减用户余额并成功更新支付状态之后,支付业务就已经完成了
所以说,支付服务完成了之后,只需要通知交易服务、通知服务、积分服务执行相应的操作,而不需要等待交易服务、通知服务、积分服务都完成之后再返回结果
4. 异步调用
异步调用基于消息通知,一般包含三个角色消息
- 发送者:投递消息的人
- 消息代理:管理、暂存、转发消息的人
- 消息接收者:接收和处理消息的人
改为异步调用之后,支付服务不再同步调用与支付业务关联度低的服务,而是发送消息通知于支付业务关联度低的服务
5. 异步调用的优点和缺点
5.1 异步调用的优点
5.1.1 解除耦合,拓展性强
即使以后有新业务拓充,支付服务只需要发送一条消息给消息代理,让消息代理通知新业务,拓展性强
5.1.2 无需等待,性能好
支付服务完成之后只需要发送消息给消息代理,让消息代理通知其它服务
5.1.3 故障隔离
即使交易服务出现了故障,也不会影响到支付服务
5.1.4 削峰填谷
假如支付服务正在面临着很大的压力,流量时高时低(呈波浪形)。如果采用同步调用的方式,当流量很高的时候,交易服务、通知服务、积分服务可能扛不住
但如果采用异步调用的方式,就很少会出现交易服务、通知服务、积分服务扛不住的情况。为什么呢,因为消息代理容量很大。在高并发的情况下,用户每成功支付一次,支付服务只需要发送一条消息给消息代理,这些像洪水一般的消息都会被消息代理拦住
消息代理会保存这些消息,后续服务可以根据自己的处理速度,从消息代理中一条一条地取出信息并处理。这样一来,后续服务承受的压力将会变得很平缓
5.2 异步调用的缺点
5.2.1 不能得到调用结果
异步调用一般是通知对方执行某个操作,无法知道对方执行操作后的结果
5.2.2 不确定下游业务执行是否成功
异步调用通知下游业务后,无法知道下游业务是否执行成功
5.2.3 业务安全依赖于消息代理的可靠性
下游业务的执行依赖于消息代理的可靠性,一旦消息代理出现故障,下游业务将无法执行
6. MQ 的技术选型
MQ:Message Queue,消息队列
以下是当前主流的消息队列
在这里重点提一下 Kafka ,Kafka 的吞吐量非常高,适合大规模日志场景
目前国内大部分公司采用的都是 RabbitMQ
7. 安装 RabbitMQ 并启动 RabbitMQ
RabbitMQ是基于 Erlang 语言开发的开源消息通信中间件(官网:RabbitMQ)
我们基于 docker 安装 RabbitMQ
7.1 搜索 RabbitMQ 镜像
sudo docker search rabbitmq
7.2 下载 RabbitMQ 镜像
sudo docker pull rabbitmq
检查 RabbitMQ 镜像是否下载成功
sudo docker images
7.3 启动 RabbitMQ
sudo docker run \
-e RABBITMQ_DEFAULT_USER=wuyanzu \
-e RABBITMQ_DEFAULT_PASS=bhoLdSvpd0UAOysh \
-v rabbitmq-plugins:/plugins \
--name rabbitmq \
--hostname rabbitmq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:latest
指令说明:
-
sudo docker run
: 基本的Docker命令,用于启动一个新的容器实例 -
-e RABBITMQ_DEFAULT_USER=wuyanzu
: 设置RabbitMQ服务的默认用户名为wuyanzu
-
-e RABBITMQ_DEFAULT_PASS=kZoeSW$$xS5i^Cum
: 设置RabbitMQ服务的默认密码为bhoLdSvpd0UAOysh
-
-v rabbitmq-plugins:/plugins
: 将一个名为rabbitmq-plugins
的卷映射到容器的/plugins
目录,用于存放RabbitMQ的插件。这里的rabbitmq-plugins
是一个卷的名称,而不是宿主机的路径 -
--name rabbitmq
: 指定容器的名称为rabbitmq
-
--hostname rabbitmq
: 设置容器的主机名为rabbitmq
-
-p 15672:15672
: 将宿主机的端口15672
映射到容器的端口15672
,这是RabbitMQ管理界面的默认端口 -
-p 5672:5672
: 将宿主机的端口5672
映射到容器的端口5672
,这是RabbitMQ用于AMQP协议通信的默认端口 -
-d
: 在后台运行容器(守护进程) -
rabbitmq:latest
: 使用最新的RabbitMQ官方镜像来创建容器
7.4 访问 RabbitMQ 的管理页面
接下来进入 RabbitMQ 的管理界面,在浏览器输入以下地址(将 IP 地址换成你的虚拟机的 IP 地址)
http://127.0.0.1:15672/
输入用户名和密码后进入到 RabbitMQ 的管理页面
7.5 可能遇到的问题
7.5.1 安全组和防火墙未开放端口
如果无法进入RabbitMQ的管理界面,记得先在安全组和防火墙中开放 15672 和 5672 端口
在 Ubuntu 中开放15672 和 5672 端口
sudo ufw allow 15672
sudo ufw allow 5672
sudo ufw reload
在 CentOS 中开放15672 和 5672 端口
sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
sudo firewall-cmd --reload
7.5.2 RabbitMQ 没有安装 Web 插件
如果开放防火墙端口后还是无法访问 RabbitMQ 的管理界面,可能是安装 RabbitMQ 没有安装 Web 插件
以下是 RabbitMQ 安装 Web 插件的方法
第一步:进入容器内部
sudo docker exec -it rabbitmq bash
第二步:安装 Web 插件
rabbitmq-plugins enable rabbitmq_management
安装插件后退出容器内部
exit
8. RabbitMQ 的整体架构和核心概念
RabbitMQ 有几个核心概念:
- Publisher:消息发送者
- Consumer:消息的消费者
- Queue:消息队列,存储消息
- Exchange:交换机,负责路由消息
- VirtualHost:虚拟主机,用于数据隔离
RabbitMQ 的整体架构
9. RabbitMQ 快速入门
注意事项:交换机只能路由和转发消息,不能存储消息
9.1 新建队列
创建一个名为 hello.queue 的队列
9.2 绑定队列与交换机
我们将刚才新创建的 hello.queue 队列与 amq.fanout 交换机绑定(fanout意为扇出)
绑定成功后的界面
9.3 发送消息
我们在 amq.fanout 交换机中发送一条消息,消息的内容为 Hello, RabbitMQ!
发送消息后,查看交换机的总览信息
查看队列中的消息数
查看消息的具体内容
9.4 可能遇到的问题
如果你发现
- 交换机的 overview 页面没有折线图
- Queues 页面也没有与消息相关的信息
- 点击
channels
后出现Stats in management UI are disabled on this node
信息
需要先修改 RabbitMQ的 配置
第一步:进入容器内部
sudo docker exec -it rabbitmq bash
第二步:修改配置
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
第三步:重启容器
先退出容器内部
exit
再重启容器
sudo docker restart rabbitmq
10. 数据隔离
10.1 新建用户
新建一个名为 CaiXuKun 的用户,密码为 T1rhFXMGXIOYCoyi ,角色指定为 admin
可以看到,新用户对任意一个 VirtualHost 都是没有访问权限的
用新用户的账号登录管理台,虽然能看到所有 VirtualHost 的信息,但是无法对任意一个 VirtualHost 进行操作
10.2 为新用户创建一个 VirtualHost
用新用户的账号登录管理台,创建一个名为 /blog 的 VirtualHost
10.3 测试不同 VirtualHost 之间是否有数据隔离
可以看到,不同的 VirtualHost 之间有不同的交换机
对某一个 VirtualHost 操作不会影响到另一个 VirtualHost
11. 在 SpringBoot 项目中集成 RabbitMQ
后端环境:
- SpringBoot:3.0.2
- JDK:17.0.7
11.1 AMQP 和 SpringAMQP
SpringAMQP 的官网:Spring AMQP
11.2 快速入门
新建一个 SpringBoot 项目,并创建 consumer 和 publisher 两个子模块,项目的整体结构如下
11.2.1 引入 Maven 依赖
在父工程中引入 SpringAMQP 的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
11.2.2 编写与 RabbitMQ 有关的配置信息
在 consumer 和 publisher 模块的 application.yml 中分别编写与 RabbitMQ 有关的配置信息
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /blog
username: CaiXuKun
password: T1rhFXMGXIOYCoyi
11.3 完成一个简单的案例
案例要求如下:
- 在 RabbitMQ 的控制台中创建名为 simple.queue 的队列(队列归属的 VirtualHost 为 /blog)
- 在 publisher 模块中,利用 SpringAMQP 直接向 simple.queue 队列发送消息
- 在 consumer 服务中,利用 SpringAMQP 编写消费者,监听 simple.queue 队列
11.3.1 创建队列
11.3.2 发送消息
在 publisher 模块中编写测试类,用户向队列发送消息
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest(classes = PublisherApplication.class)
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessageToQueue() {
String queueName = "simple.queue";
String msg = "Hello, SpringAMQP!";
rabbitTemplate.convertAndSend(queueName, msg);
}
}
在 RabbitMQ 的控制台可以看到,消息成功发送
11.3.3 接收消息
SpringAMQP 提供了声明式的消息监听,我们只需要通过@RabbitListener
注解在方法上声明要监听的队列名称,将来 SpringAMQP 就会把消息传递给使用了@RabbitListener
注解的方法
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String message) {
System.out.println("消费者收到了 simple.queue 的消息:【" + message + "】");
}
}
启动 consumer 模块的启动类之后,就可以看到消息的内容
12. Work Queues 模型
12.1 Work Queues 的概念
Work Queues,简单地来说,就是让多个消费者绑定到一个队列,共同消费队列中的消息
虽然有多个消费者绑定同一个队列,但是队列中的某一条消息只会被一个消费者消费
我们实现一个小案例,模拟 Work Queues,实现一个队列绑定多个消费者
案例要求如下:
- 在RabbitMQ的控制台创建一个队列,名为 work.queue
- 在 publisher 服务中定义测试方法,在 1 秒内产生 50 条消息,发送到work.queue
- 在 consumer 服务中定义两个消息监听者,都监听 work.queue 队列
- 消费者 1 每秒处理 50 条消息,消费者 2 每秒处理 5 条消息
在 publisher服务的 SpringAmqp 测试类中添加以下方法,该方法可以在 1 秒内产生 50 条消息
@Test
void testWorkQueue() throws InterruptedException {
String queueName = "work.queue";
for (int i = 1; i <= 50; i++) {
String message = "Hello, work queues_" + i;
rabbitTemplate.convertAndSend(queueName, message);
Thread.sleep(20);
}
}
在 consumer 服务的 RabbitMQListener 类中添加以下方法,监听 work.queue 队列
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) {
System.out.println("消费者1 收到了 work.queue的消息:【" + message + "】");
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) {
System.err.println("消费者2 收到了 work.queue的消息...... :【" + message + "】");
}
12.2 Work Queues 模型的消息推送机制
如果有两个或两个以上的消费者监听同一个队列,默认情况下 RabbitMQ 会采用轮询的方法将消息分配给每个队列
但每个消费者的消费能力可能是不一样的,我们给两个消费者中的代码设置不同的休眠时间,模拟消费能力的不同
@RabbitListener(queues = "work.queue")
public void listenWorkQueue1(String message) throws InterruptedException {
System.out.println("消费者1 收到了 work.queue的消息:【" + message + "】");
Thread.sleep(20);
}
@RabbitListener(queues = "work.queue")
public void listenWorkQueue2(String message) throws InterruptedException {
System.err.println("消费者2 收到了 work.queue的消息...... :【" + message + "】");
Thread.sleep(100);
}
经过测试可以发现,即使两个队列的消费能力不一样,默认情况下 RabbitMQ 还是会采用轮询的方法将消息分配给每个队列,也就是平均分配
但这不是我们想要的效果,我们想要的效果是消费能力强的消费者处理更多的消息,甚至能够帮助消费能力弱的消费者
怎么样才能达到这样的效果呢,只需要在配置文件中添加以下信息
spring:
rabbitmq:
listener:
simple:
prefetch: 1
这个配置信息相当于告诉消费者要一条一条地从队列中取出消息,只有处理完一条消息才能取出下一条
这样一来,就可以充分利用每一台机器的性能,让消费能力强的消费者处理更多的消息,同时还可以避免消息在消费能力较弱的消费者上发生堆积的情况
13. 交换机
真正的生产环境都会经过交换机来发送消息,而不是直接发送到队列
交换机的作用:
- 接收 publisher 发送的消息
- 将消息按照规则路由到与交换机绑定的队列
交换机的类型有以下三种:
- Fanout:广播
- Direct:定向
- Topic:话题
注意事项:交换机只能路由和转发消息,不能存储消息
13.1 Fanout 交换机
13.1.1 Fanout 交换机的概念
Fanout 交换机会将接收到的消息广播到每一个跟其绑定的 queue ,所以也叫广播模式
13.1.2 快速上手
我们做一个小案例来体验 Fanout 交换机的效果,案例要求如下:
- 在 RabbitMQ 控制台中,声明队列 fanout.queue1 和 fanout.queue2
- 在 RabbitMQ 控制台中,声明交换机 blog.fanout,将两个队列与其绑定
- 在 consumer 服务中,编写两个消费者方法,分别监听 fanout.queue1 和 fanout.queue2 队列
- 在 publisher 服务中编写测试方法,向 blog.fanout 交换机发送消息
声明交换机
在 consumer 服务的 RabbitMQListener 类中添加以下方法,分别监听 fanout.queue1 和 fanout.queue2 队列
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String message) {
System.out.println("消费者1 收到了 fanout.queue1的消息:【" + message + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String message) {
System.err.println("消费者2 收到了 fanout.queue2的消息...... :【" + message + "】");
}
在 publisher 服务的 SpringAmqp 测试类中添加以下方法,向 blog.fanout 交换机发送消息
@Test
void testSendFanout() {
String exchangeName = "blog.fanout";
String message = "Hello, fanout exchange";
rabbitTemplate.convertAndSend(exchangeName, null, message);
}
13.2 Direct 交换机
13.2.1 Direct 交换机的概念
Direct 交换机会将接收到的消息根据规则路由到指定的队列,被称为定向路由
- 每一个 Queue 都与 Exchange 设置一个 bindingKey
- 发布者发送消息时,指定消息的 RoutingKey
- Exchange 将消息路由到 bindingKey 与消息 routingKey 一致的队列
需要注意的是:同一个队列可以绑定多个 bindingKey ,如果有多个队列绑定了同一个 bindingKey ,就可以实现类似于 Fanout 交换机的效果。由此可以看出,Direct 交换机的功能比 Fanout 交换机更强大
13.2.2 快速上手
我们做一个小案例来体验 Direct 交换机的效果,案例要求如下:
- 在 RabbitMQ 控制台中,声明队列 direct.queue1 和 direct.queue2
- 在 RabbitMQ 控制台中,声明交换机 blog.direct ,将上面创建的两个队列与其绑定
- 在 consumer 服务中,编写两个消费者方法,分别监听 direct.queue1 和 direct.queue2
- 在 publisher 服务中编写测试方法,利用不同的 RoutingKey 向 blog.direct 交换机发送消息
为 direct.queue1队列 和 direct.queue2 队列分别指定 bindingKey
在 consumer 服务的 RabbitMQListener 类中添加以下方法,分别监听 direct.queue1 和 direct.queue2 队列
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String message) {
System.out.println("消费者1 收到了 direct.queue1的消息:【" + message + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String message) {
System.err.println("消费者2 收到了 direct.queue2的消息...... :【" + message + "】");
}
在 publisher 服务的 SpringAmqp 测试类中添加以下方法,向 blog.direct交换机发送消息
@Test
void testSendDirect() {
String exchangeName = "blog.direct";
String blueMessage = "蓝色通知,警报解除,哥斯拉放的是气球";
rabbitTemplate.convertAndSend(exchangeName, "blue", blueMessage);
String redMessage = "红色警报,由于日本排放核污水,惊现哥斯拉!";
rabbitTemplate.convertAndSend(exchangeName, "red", redMessage);
String yellowMessage = "黄色通知,哥斯拉来了,快跑!";
rabbitTemplate.convertAndSend(exchangeName, "yellow", yellowMessage);
}
13.3 Topic 交换机(推荐使用)
13.3.1 Topic 交换机的概念
Topic Exchange 与 Direct Exchange类似,区别在于 Topic Exchange 的 routingKey 可以是多个单词的列表(多个 routingKey 之间以.
分割)
Queue 与 Exchange 指定 bindingKey 时可以使用通配符
- #:代指 0 个或多个单词
- *:代指 1 个单词
- Topic 交换机能实现的功能 Direct 交换机也能实现,不过用 Topic 交换机实现起来更加方便
- 如果某条消息的 topic 符合多个 queue 的 bindingKey ,该条消息会发送给符合条件的所有 queue ,实现类似于 Fanout 交换机的效果
13.3.2 快速上手
我们做一个小案例来体验 Topic 交换机的效果,案例要求如下:
- 在 RabbitMQ 控制台中,声明队列 topic.queue1 和 topic.queue2
- 在 RabbitMQ 控制台中,声明交换机 blog.topic ,将两个队列与其绑定
- 在 consumer 服务中编写两个消费者方法,分别监听 topic.queue1 和 topic.queue2
- 在 publisher 服务中编写测试方法,利用不同的 routingKey 向 blog.topic 发送消息
为 topic.queue1 和 topic.queue2 队列分别指定 bindingKey
在 consumer 服务的 RabbitMQListener 类中添加以下方法,分别监听 direct.queue1 和 direct.queue2 队列
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String message) {
System.out.println("消费者1 收到了 topic.queue1的消息:【" + message + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String message) {
System.err.println("消费者2 收到了 topic.queue2的消息...... :【" + message + "】");
}
在 publisher 服务的 SpringAmqp 测试类中添加以下方法,向 blog.direct交换机发送消息
@Test
void testSendTopic() {
String exchangeName = "blog.topic";
String weatherMessage = "今天天气挺不错,我的心情的挺好的";
rabbitTemplate.convertAndSend(exchangeName, "china.weather", weatherMessage);
String newsMessage = "蓝色通知,警报解除,哥斯拉放的是气球";
rabbitTemplate.convertAndSend(exchangeName, "china.news", newsMessage);
}
14. 在 SpringBoot 项目中声明队列和交换机的方式
我们之前创建队列和交换机都是在 RabbitMQ 的控制台页面中创建的,不仅十分繁琐,还有可能打错队列和交换机的名。而且,不同的环境(开发环境、测试环境、生产环境)可能会有不同的队列和交换机,手动创建队列和交换机效率十分低下
接下来为大家介绍两种在 SpringBoot 项目中声明队列和交换机的方式
14.1 编程式声明
14.1.1 SpringAQMP提供的创建队列和交换机的类
SpringAMQP 提供了几个类,用来声明队列、交换机及其绑定关系:
- Queue:用于声明队列,可以用工厂类 QueueBuilder 构建
- Exchange:用于声明交换机,可以用工厂类 ExchangeBuilder 构建
- Binding:用于声明队列和交换机的绑定关系,可以用工厂类 BindingBuilder 构建
14.1.2 快速上手
我们创建一个 Fanout 类型的交换机,并且创建队列与这个交换机绑定
在 consumer 服务中编写 FanoutConfiguration 配置类
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfiguration {
@Bean
public FanoutExchange fanoutExchange3() {
return ExchangeBuilder.fanoutExchange("blog.fanout3").build();
}
@Bean
public FanoutExchange fanoutExchange4() {
return new FanoutExchange("blog.fanout4");
}
@Bean
public Queue fanoutQueue3() {
return new Queue("fanout.queue3");
}
@Bean
public Queue fanoutQueue4() {
return QueueBuilder.durable("fanout.queue4").build();
}
@Bean
public Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange3) {
return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange3);
}
@Bean
public Binding fanoutBinding4() {
return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange4());
}
}
启动 consumer 的启动类之后,队列、交换机、队列和交换机之间的关系就会自动创建
创建 Queue 时,如果没有指定 durable 属性,则 durable 属性默认为 true
14.1.3 编程式声明的缺点
编程式声明有一个缺点,当队列和交换机之间绑定的 routingKey 有很多个时,编码将会变得十分麻烦
以下是一个队列与 Direct 类型交换机绑定两个 routingKey 时的代码
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfiguration {
@Bean
public DirectExchange directExchange3() {
return new DirectExchange("blog.direct3");
}
@Bean
public Queue directQueue3() {
return new Queue("direct.queue3");
}
@Bean
public Queue directQueue4() {
return new Queue("direct.queue4");
}
@Bean
public Binding directQueue3BindingRed(Queue directQueue3, DirectExchange directExchange3) {
return BindingBuilder.bind(directQueue3).to(directExchange3).with("red");
}
@Bean
public Binding directQueue3BindingBlue(Queue directQueue3, DirectExchange directExchange3) {
return BindingBuilder.bind(directQueue3).to(directExchange3).with("blue");
}
@Bean
public Binding directQueue4BindingRed(Queue directQueue4, DirectExchange directExchange3) {
return BindingBuilder.bind(directQueue4).to(directExchange3).with("red");
}
@Bean
public Binding directQueue4BindingBlue(Queue directQueue4, DirectExchange directExchange3) {
return BindingBuilder.bind(directQueue4).to(directExchange3).with("yellow");
}
}
14.2 注解式声明(推荐使用)
SpringAMOP 提供了基于@RabbitListener
注解声明队列和交换机的方式
我们先在 RabbitMQ 的控制台删除 blog.direct 交换机、 direct.queue1 队列和 direct.queue2 队列
再改造 consumer 服务的 RabbitMQListener 类的监听 direct.queue1 队列和 direct.queue2 队列的方法
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "blog.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String message) {
System.out.println("消费者1 收到了 direct.queue1的消息:【" + message + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "blog.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String message) {
System.out.println("消费者2 收到了 direct.queue2的消息:【" + message + "】");
}
15. 消息转换器
在了解消息转换器之前,我们先来做一个小案例,案例的内容是利用 SpringAMQP 发送一条消息,消息的内容为一个 Java 对象
案例要求如下:
- 在 RabbitMQ 控制台创建一个队列,名为 object.queue
- 编写单元测试,向该队列中直接发送一条消息,消息的内容为 Map
- 在控制台查看消息
在 publisher 服务的 SpringAmqpTests 测试类中新增 testSendObject 方法
@Test
void testSendObject() {
Map<String, Object> hashMap = new HashMap<>(2);
hashMap.put("name", "Tom");
hashMap.put("age", 21);
rabbitTemplate.convertAndSend("object.queue", hashMap);
}
成功发送消息后,我们在 RabbitMQ 的控制台查看消息的具体内容
可以发现,消息的内容类型为 application/x-java-serialized-object
,并且消息的内容也变成一堆乱码
我们本来是想发送一个简单的仅含有姓名和年龄两个字段的简短信息,但是消息却变成了一堆乱码,不仅可读性大大下降,而且占用的空间也大大地增加了,这显然不是我们想要的效果
15.1 默认的消息转换器
Spring 处理对象类型的消息是由 org.springframework.amap.support.converter.MessageConverter
接口来处理的,该接口默认实现是 SimpleMessageConverter
SimpleMessageConverter
类是基于 JDK 提供的 ObjectOutputStream
来类完成序列化的,这种序列化方式存在以下问题:
- 使用 JDK 序列化有安全风险(如果序列化后的消息被恶意篡改,在反序列化的过程中可能会执行一些高危的代码)
- 经过 JDK 序列化的消息占用空间很大
- 经过 JDK 序列化的消息可读性很差
15.2 自定义消息转换器
一般建议采用 JSON 序列化代替默认的 JDK 序列化
要使用 JSON 序列化,需要先引入 jackson 依赖(在项目的父工程中引入)
如果是 Web 项目,无需引入该依赖,因为 spring-boot-starter-web 依赖中已包含该依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
接着在 consumer 服务和 publisher 服务中配置 MessageConverter
@Bean
public MessageConverter jacksonMessageConvertor(){
return new Jackson2JsonMessageConverter();
}
再次发送对象类型的消息,可以看到消息已经成功转换成 JSON 类型的字符串
我们也可以在 consumer 服务的 RabbitMQListener 类中添加对 object.queue 队列的监听(用什么类型发,就用什么类型接收)
@RabbitListener(queues = "object.queue")
public void listenObjectQueue(Map<String, Object> hashMap) {
System.out.println("消费者收到了 object.queue的消息:【" + hashMap + "】");
}
启动 consumer 服务的启动类之后,在控制台中可以看到被转换成 JSON 格式的消息
在控制台中会看到报错信息,因为之前有一条用 JDK 序列化的消息,现在改用了 jackson 序列化,序列化和反序列化用的序列化器不一样,肯定会报错
报错后,消息就没了,出现了消息丢失的现象,该怎么解决呢,可以参考我的另一篇博文:RabbitMQ 高级篇