消息队列(MQ)—RabbitMQ
一 初识MQ
1.同步通信与异步通信
1.同步通信的问题
同步调用的优点在于时效性高,可以立即得到结果
微服务之间基于Feign的调用属于同步方式,存在一些问题
- 耦合性:业务较多时,扩展维护不方便
- 性能下降:一次性调用多个Feign,会导致耗时增加,性能和吞吐量下降
- 资源浪费:调用Feign请求响应的过程中,CPU只能等待,无法做其他操作,造成系统资源的浪费
- 级联失败:当有一个服务宕机时,很容易造成后面的服务阻塞,从而导致级联失败
2.异步调用方案
异步调用常见的实现就是 事件驱动模式
- 服务解耦:新增或减少业务员,通过增加或取消事件订阅即可,无需再去修改上级服务
- 性能提升:上级服务只需发送事件,不再关注其他业务逻辑
- 消除级联:服务没有强依赖,不担心级联失败的问题
- 流量削峰:Broker 起到缓冲作用,根据实际情况分发业务,保护订阅服务。
异步通信的缺点:
- 过分依赖于Broker的可靠性,安全性,吞吐能力
- 业务复杂,业务没有明显的流程线,不好追踪管理
2.MQ介绍
MQ(MessageQueue),中文含义为消息队列,用来存放消息,也就是事件驱动模式中的 Broker
常见的MQ技术包含一下四种:
- RabbitMQ:现阶段使用最多的MQ技术,优点在于消息的低延迟和可靠性
- ActiveMQ:开发语言基于Java,可以进行深度定制
- RocketMQ:开发语言基于Java,可以进行深度定制,阿里巴巴开发
- Kafka:单机吞吐量很高,但消息可靠性不高。
二 RabbitMQ的使用
1.安装与运行
RabbitMQ基于Erlang语言开发的开源消息中间件。
RabbitMQ官方地址:Messaging that just works — RabbitMQ
在DockerHub上拉取RabbitMQ的镜像,然后运行。
#1.拉取RabbitMQ
docker pull rabbitmq:3
#2.运行RabbitMQ镜像,(15672 为管理UI界面的端口,5672为后期通信接口)
docker run \
-e RABBITMQ_DEFAULT_USER=shawn \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name rabbitmq3 \
--hostname myrabbit \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3
- RABBITMQ_DEFAULT_USER :设置默认账户名
- RABBITMQ_DEFAULT_PASS :设置默认密码
安装完成之后不能直接进行访问,需要先进入容器内部开启插件才可以访问
#1.进入 rabbitmq 容器内
docker exec -it myrabbitmq bash
#2.开启插件
rabbitmq-plugins enable rabbitmq_management
#3.如果直接拉取 management 版本的镜像,则无需以上步骤
docker pull rabbitmq:3-management
#4.运行 management 版本的rabbit
docker run \
-e RABBITMQ_DEFAULT_USER=shawn \
-e RABBITMQ_DEFAULT_PASS=123456 \
--name rabbitmq3 \
--hostname myrabbit \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
完成以后即可在浏览器中输入服务器地址+端口号访问了。
踩坑点:Rabbit的Web UI 中Channel模块无法打开并且提示 Stats in management UI are disabled on this node
是因为默认情况下Rabbit是禁止的。
The reason is that the default image disables metrics collector in the management_agent plugin
原因是默认图像禁用management_agent插件中的度量收集器
# 1.进入容器内部
docker exec -it myrabbitmq bash
# 2.切换至配置文件目录下
cd /etc/rabbitmq/conf.d/
# 3.将 management_agent.disable_metrics_collector.conf 文件中的 management_agent.disable_metrics_collector 的值修改为 false
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
# 4.重启 rabbit 容器
docker restart rabbitmq3
2.RabbitMQ概述
RabbitMQ通过 VirtualHost 进行隔离,相互不可见。
RabbitMQ中的相关概念
- channel:操作MQ的工具
- exchange:路由消息到队列中,将消息发送给exchange,由exchange交给队列
- queue:缓存消息
- virtual host:虚拟主机,是对queue,exchange等资源的逻辑分组,不同用户可以访问不同的虚拟主机
三 RabbitMQ消息模型
1.简单队列(Hello World)
1.创建项目引入依赖
创建两个项目分别为发送者和订阅者,引入RabbitMQ的maven坐标
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
或者在初始化项目时直接勾选RabbitMQ
2.编写测试代码示例
消息发送者示例代码:publisher
@Test
void publisher() throws IOException, TimeoutException {
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("192.168.119.101");
factory.setUsername("shawn");
factory.setPassword("123456");
factory.setPort(5672);
factory.setVirtualHost("/");
//建立连接
Connection connection=factory.newConnection();
//创建通道
Channel channel=connection.createChannel();
//创建队列
String queueName="simple.queue";
String message="贾君鹏,你妈喊你回家吃饭!";
channel.basicPublish("",queueName,null,message.getBytes());
System.out.println("发送消息:"+message);
channel.close();
connection.close();
}
消息接受者示例代码:consumer
@Test
void consumer() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.119.101");
factory.setUsername("shawn");
factory.setPassword("123456");
factory.setPort(5672);
factory.setVirtualHost("/");
//创建连接和通道
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
//订阅消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 处理消息
String message=new String(body,"UTF-8");
System.out.println("接受消息:"+message);
}
});
System.out.println("等待接受消息,退出请按 CTRL+C");
}
3.基本消息队列的发送流程
- 建立 Connection
- 创建 Channel
- 使用 Channel 声明队列
- 使用 Channel 向队列发送消息 / 使用 consumer 的消费行为订阅消息
2.SpringAMQP
官方文档:https://spring.io/projects/spring-amqp
AMQP是消息接受与发送的协议或者标准,与语言和平台无关
1.RabbitTemplate
RabbitTemplate是一个Spring封装的用来发送消息的工具类,类似 RedisCache,RestTemplate,可以简单,高效,优雅的实现消息的发送和接收
引入 spring-boot-starter-amqp 依赖,然后在相关项目(publisher 和 consumer )的配置文件中配置相关参数。
spring:
application:
name: publisher
rabbitmq:
host: 192.168.119.101
# 默认端口为5672,使用默认端口时可不写
port: 5672
username: shawn
password: 123456
virtual-host: /
在 publisher 项目中编写测试代码,发送消息
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void publisher() {
String queueName="hello.world";
String message="Hello,RabbitTemplate";
# 这里使用 convertAndSend 进行消息的处理和发送
rabbitTemplate.convertAndSend(queueName,message);
System.out.println("消息发送完成");
}
在 consumer 项目中编写代码,订阅并且接受消息
/**
* 使用 Component 将当前类申明为一个 bean
* 定义一个类,使用 RabbitListener 注解订阅要接受消息的队列
*/
@Component
public class SimpleMessageRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleMessage(String message) {
System.out.println("接受消息:" + message);
}
}
注意:消息一旦消费,就会从消息队列中删除,RabbitMQ没有消息回溯
3.WorkQueue(工作队列)
工作队列模型,多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。
工作队列,可以提高消息处理速度,避免消息堆积
@RabbitListener(queues = "simple.queue")
public void listenWorkMessage1(String message) throws InterruptedException {
System.out.println("消费者【1】接受消息:" + message + "-" + LocalDateTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkMessage2(String message) throws InterruptedException {
System.out.println("消费者【2】接受消息:" + message + "-" + LocalDateTime.now());
Thread.sleep(200);
}
定义两个执行效率不同的消费者,模拟一个简单的工作队列模型,发现消费者并不会因为执行效率的高低自动的增加或处理消息的处理量,而是平均分配。
可以通过消息的欲取机制来控制消息的处理量
欲取机制,即默认情况下,有多少消息就拿多少消息,并不会考虑有多少个消费者。
通过调整 prefetch 属性的值,来控制消费者的消息预取能力
spring:
application:
name: consumer
rabbitmq:
host: 192.168.119.101
port: 5672
username: shawn
password: 123456
virtual-host: /
listener:
simple:
# 通过调整 prefetch 属性的值,来控制消费者的消息预取能力
prefetch: 1
4.发布订阅模式
发布订阅模式(Publish-Subscribe)的核心是,允许将一个消息发给多个消费者。具体的实现方式是加入了exchange(交换机)。
注意:exchange只负责消息的转发,而不是存储。路由失败则消息丢失
1.Exchange(交换机)
交换机的作用:
- 接受 Publisher 发送的消息
- 将消息按照规则路由到与之绑定的队列上
- 不存储消息,只负责消息的转发。路由失败,消息丢失
- FanoutExchange会将消息路由到每一个和它绑定的队列上
需要使用到的 Bean:FanoutExchange Queue Binding
2.FanoutExchange
Fanout Exchange 会将接收到的消息路由到每一个跟其绑定的queue
在 Consumer 项目中定义队列(queue),交换机(exchange),并且将队列绑定到交换机上。
/**
示例: 声明一个交换机和两个队列,并且将队列与交换机进行绑定
*/
@Configuration
public class FanoutExchangeConfig {
/**
*声明一个FanoutExchange对象,并且添加到bean
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("shawn.exchange");
}
/**
*声明一个Queue对象,并且添加到bean
*/
@Bean
public Queue fanoutQueue1() {
return new Queue("shawn.queue1");
}
/**
* 将队列和Exchange进行绑定
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
// 使用 BindingBuilder 提供的方法进行绑定,最后返回Binding对象
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Queue fanoutQueue2() {
return new Queue("shawn.queue2");
}
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
接受消息的监听器示例代码:
@Component
public class SimpleMessageRabbitListener {
@RabbitListener(queues = "shawn.queue1")
public void listenWorkMessage1(String message) throws InterruptedException {
System.out.println("消费者【1】接受消息:" + message + "-" + LocalDateTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "shawn.queue2")
public void listenWorkMessage2(String message) throws InterruptedException {
System.out.println("消费者【2】接受消息:" + message + "-" + LocalDateTime.now());
Thread.sleep(200);
}
}
在 Publisher 项目中编写消息发送的测试代码:
@Test
void testSendExchange() throws InterruptedException {
String exChangeName = "shawn.exchange";
String message = "贾君鹏,你妈喊你回家吃饭";
// 向指定名称的Exchage(交换机)发送消息
rabbitTemplate.convertAndSend(exChangeName, "", message);
}
测试结果:两个队列均能收到消息
3.DirectExchange
DirectExchange会将接受到的消息根据规则路由到指定的Queue,因此成为路由模式(routes)。
- 每一个Queue都与DirectExchange设置一个BindingKey
- 发布者发布消息时,指定消息的RoutingKey
- DirectExchange会将消息路由到BindingKey与消息RoutingKey一致的队列
- 可以为队列(queue)设置多个BindingKey,且一个BindingKey也可以设置给多个队列
当多个队列使用一个BindingKey时,DirectExchange会将消息发送给所有使用了这个BindingKey的队列
这种情况下,DirectExchang与FanoutExchange相同,也属于广播模式
因此可以认为 DirectExchange 可以模拟 FanoutExchang, 且比 FanoutExchange 灵活
示例:基于RabbitListener 实现 DirectExchange
编写 Consumer 项目的代码:
/**
* 使用 RabbitListener 声明要绑定的队列、BindingKey、交换机和交换机类型,BindingKey可以设置多个
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "shawn.queue1"),
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenWorkMessage1(String message) throws InterruptedException {
System.out.println("消费者【1】接受消息:" + message + "-" + LocalDateTime.now());
Thread.sleep(20);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "shawn.queue2"),
exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenWorkMessage2(String message) throws InterruptedException {
System.out.println("消费者【2】接受消息:" + message + "-" + LocalDateTime.now());
Thread.sleep(200);
}
在 Publuisher 项目中编写测试代码:
@Test
void testSendDirectExchange() throws InterruptedException {
String exChangeName = "direct.exchange";
String message = "贾君鹏,你妈喊你回家吃饭";
String routingKey="blue";
rabbitTemplate.convertAndSend(exChangeName,routingKey, message);
}
此时,只有包含指定的 RoutingKey 的队列,才能收到消息。
4.TopicExchange
TopicExchange 与 DirectExchange 的区别在于,routingKey必须是多个单词的列表,并且使用英文句号( . )分割
Queue 与 Exchange 进行绑定时支持通配符:
- (#)表示 0个或多个单词
- (*)表示 一个单词
示例:使用 TopicExchange 实现消息发送和接受
在 Consumer 项目中编写示例代码
@Component
public class TopicExchangeListener {
/**
* listenWorkMessage1 接收和 china.# 有关的消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "shawn.queue1"),
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenWorkMessage1(String message) throws InterruptedException {
System.out.println("消费者【1】接受消息:" + message + "-" + LocalDateTime.now());
Thread.sleep(20);
}
/**
* listenWorkMessage2 接收和 #.news 有关的消息
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "shawn.queue2"),
exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenWorkMessage2(String message) throws InterruptedException {
System.out.println("消费者【2】接受消息:" + message + "-" + LocalDateTime.now());
Thread.sleep(200);
}
}
在 Publisher 项目中编写测试方法
@Test
void testSendTopicExchange() throws InterruptedException {
String exChangeName = "topic.exchange";
String message = "贾君鹏,你妈喊你回家吃饭";
// 此处的 routingKey 需要按照TopicExchange的规则编写
String routingKey = "china.food";
rabbitTemplate.convertAndSend(exChangeName, routingKey, message);
}
定义的交换机和队列信息,均可以在 RabbitMQ Web UI 中看到
四 消息转换器
在SpringAMQP中,接受消息的类型时Object,也就是说,我们可以发送任何类型的对象给消费者,SpringAMQP会帮助我们进行序列化成字节后发送。
Spring中对消息对象的处理是由 SpringAMQP中的一个名为MessageConvert来处理的。默认实现是SimpleMessageConvert,基于JDK的ObjectOutputStream来完成。
通过重新定义一个 MessageConverter 的 Bean 来修改序列化方式。
推荐使用 JSON 方式序列化,消息体将会更加短小精悍,传输速度更快。
引入 jackson-dataformat-xml 依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.14.1</version>
</dependency>
定义 Bean
@Configuration
public class MessageConverterConfig {
@Bean
public Jackson2JsonMessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
编写一个传输对象的简单队列测试示例
@Test
void testSendObjectMessage() {
String queueName = "object.queue";
SystemLog log = new SystemLog();
log.setAddress("陕西省西安市");
log.setAge(29);
log.setName("shawn");
log.setId(1L);
log.setPassword("123456");
rabbitTemplate.convertAndSend("", queueName, log);
}
此时消费者接受到的消息类型将会是Json格式,将Json信息转换对应的对象就可以拿到对象消息了。
注意:发送消息和接收消息时注意使用相同的 MessageConverter。可直接将消息转换为发送时的对象(自定义类型需手动转换)
完结撒花。