目录
6.4.1 RabbitMQ概述
6.4.1.1 什么是RabbitMQ
6.4.1.2 Erlang和AMQP
6.4.1.3 RabbitMQ
6.4.1.3.1 RabbitMQ优点
6.4.1.3.2 应用解耦
6.4.1.3.3 异步处理
6.4.1.3.4 流量削峰
6.4.2 RabbitMQ安装
6.4.3 RabbitMQ架构
6.4.3.1 架构图
6.4.3.1.1 术语
6.4.3.1.2 简单架构图
6.4.3.1.3 完整架构图
6.4.3.2 消息模式
6.4.4 SpringBoot整合RabbitMQ
6.4.4.1 简单模式
6.4.4.1.1 原生态方式
6.4.4.1.2 SpringBoot整合RabbitMQ方式
6.4.4.2 工作队列模式
6.4.4.2.1 说明
6.4.4.2.2 代码
6.4.4.3 发布订阅模式
6.4.4.3.1 说明
6.4.4.3.2 代码
6.4.4.4 路由模式
6.4.4.4.1 说明
6.4.4.4.2 代码
6.4.4.5 主题模式
6.4.4.5.1 说明
6.4.4.5.2 代码
6.4.1 RabbitMQ概述
6.4.1.1 什么是RabbitMQ
假设有个注册动作,之后的流程如下图所示
耗时1.5有点长,能不能把后面的邮箱验证,短信验证在用户提交注册后,和添加数据库一起执行
6.4.1.2 Erlang和AMQP
Erlang是一门动态类型的函数式编程语言,它也是一门解释型语言,由Erlang虚拟机解释执行。从语言模型上说,Erlang是基于Actor模型的实现。在Actor模型里面,万物皆Actor,每个Actor都封装着内部状态,Actor相互之间只能通过消息传递这一种方式来进行通信。对应到Erlang里,每个Actor对应着一个Erlang进程,进程之间通过消息传递进行通信。相比共享内存,进程间通过消息传递来通信带来的直接好处就是消除了直接的锁开销(不考虑Erlang虚拟机底层实现中的锁应用)。
AMQP(Advanced Message Queue Protocol)定义了一种消息系统规范。这个规范描述了在一个分布式的系统中各个子系统如何通过消息交互。而RabbitMQ则是AMQP的一种基于erlang的实现。AMQP将分布式系统中各个子系统隔离开来,子系统之间不再有依赖。子系统仅依赖于消息。子系统不关心消息的发送者,也不关心消息的接受者。
6.4.1.3 RabbitMQ
6.4.1.3.1 RabbitMQ优点
常见的MQ:ActiveMQ,RocketMQ,Kafka,RabbitMQ。
-
语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多门语言,RabbitMQ支持多种语言。
-
效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
-
消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
-
学习成本:RabbitMQ非常简单。
RabbitMQ是用Erlang实现的一个高并发高可靠AMQP消息队列服务器。支持消息的持久化、事务、拥塞控制、负载均衡等特性,使得RabbitMQ拥有更加广泛的应用场景。
优点
- 解耦:降低系统模块的耦合度
- 提高系统响应时间
- 异步消息
- 过载保护,流量削峰
6.4.1.3.2 应用解耦
用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口.
这种做法有一个缺点:
- 当库存系统出现故障时,订单就会失败.
- 订单系统和库存系统高耦合
引入消息队列
- 订单系统: 用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。
- 库存系统: 订阅下单的消息,获取下单消息,进行库操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失
6.4.1.3.3 异步处理
前面讲的用户注册案例
1. 串行的方式
2. 并行的方式
三个任务同时处理,返回给客户端,并行的方式能提高处理的时间
6.4.1.3.4 流量削峰
因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
6.4.2 RabbitMQ安装
RabbitMQ在window和linux下均可安装。以下仅展示Linux下的RabbitMQ安装
docker-compose.yml
version: "3.1"
services:
rabbitmq:
image: daocloud.io/library/rabbitmq:management
restart: always
container_name: rabbitmq
ports:
- 5672:5672 # 服务器端口
- 15672:15672 # 客户端访问服务器的端口
volumes:
- ./data:/var/lib/rabbitmq
[root@iz8vbdmrir2n6xqzrbd93hz /]# cd /opt
# 创建文件夹
[root@iz8vbdmrir2n6xqzrbd93hz opt]# mkdir docker_rabbitmq
# 进入文件夹
[root@iz8vbdmrir2n6xqzrbd93hz opt]# cd docker_rabbitmq/
# 添加docker-compose.yml 将上面的配置信息粘贴进来
[root@iz8vbdmrir2n6xqzrbd93hz docker_rabbitmq]# vim docker-compose.yml
[root@iz8vbdmrir2n6xqzrbd93hz docker_rabbitmq]# docker-compose-Linux-x86_64 up -d
# 查看所有进程
[root@192 docker_rabbitmq]# docker ps -a
浏览器访问:http://自己的ip:15672
云服务器记得开放 15672 和 5672 端口
用户名和密码默认都是:guest
访问
RabbitMQ的图形化管理界面
切换到:docker_rabbitmq 下
# down 停止服务并删除
docker-compose-Linux-x86_64 down
# down 停止服务
docker-compose-Linux-x86_64 stop
systemctl stop firewalld
systemctl disable firewalld
systemctl restart docker
# 启动服务
docker-compose-Linux-x86_64 up -d
6.4.3 RabbitMQ架构
6.4.3.1 架构图
6.4.3.1.1 术语
- Publisher - 生产者:发布消息到RabbitMQ中的Exchange
- Consumer - 消费者:监听RabbitMQ中的Queue中的消息
- Exchange - 交换机:和生产者建立连接并接收生产者的消息
- Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
- Routes - 路由:交换机以什么样的策略将消息发布到Queue
6.4.3.1.2 简单架构图
Publisher生产消息,将消息放到Exchange上,根据Routes将消息分发到Queue,Consumer处理消息
6.4.3.1.3 完整架构图
生产者和虚拟机建立连接,建立完连接后创建一个Channel,通过Channel把消息发送到Exchange上
6.4.3.2 消息模式
官网【RabbitMQ Tutorials — RabbitMQ】
常见的五种模式
- 简单模式
- Work Queues 工作队列模式
- Publish/Subscribe发布订阅模式
- Routing 路由模式
- Topics 主题模式
6.4.4 SpringBoot整合RabbitMQ
6.4.4.1 简单模式
6.4.4.1.1 原生态方式
原生态方式实现简单五种消息模型中的Simple-简单模型。
由于所有的功能都要自己手动引入依赖,手动写代码,所以会比较繁琐
以后用springboot整合后会更简便,springboot会优化代码,同时隐藏一些代码
利用SpringBoot自动生成项目结构
如果不知道如何自动生成点击【6.2 微服务-SpringBoot_老李头喽的博客-CSDN博客】
导入依赖
<!-- amqp依赖 原生方式-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
之后开始创建生产者,消费者
创建Publisher发布者
//生产者
public class Publisher {
public static void main(String[] args) throws Exception{
/*
* 1.创建连接工厂
* 2.获取连接对象
* 3.创建频道
* 4.声明队列
* 5.发布消息
* 6.释放资源
*/
System.out.println("Publisher...");
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//配置连接参数
factory.setHost("192.168.72.129");
//连接服务器端口5672,而不是在浏览器客户端访问端口15672
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//2.获取连接对象
Connection connection = factory.newConnection();
//3.创建频道
Channel channel = connection.createChannel();
//4.声明队列
//配置队列参数
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化,值为true时表示持久化,rabbitmq宕机或重启后,队列依然在
//参数3:exclusive - 当前队列是否为排他队列,值为true时表示与当前连接(connection)绑定,连接关闭,队列消失
//参数4:autoDelete - 当前队列是否自动删除,值为true时表示队列中的消息一旦被消费,该队列会消失
//参数5:arguments - 指定当前队列的相关参数
channel.queueDeclare("helloworldQueue",false,false,false,null);
//5.发布消息
//发布消息到exchange,同时指定路由的规则
// 参数1:指定exchange,使用"",简单模式没有交换机
// 参数2:指定路由的规则,使用具体的队列名称
// 参数3:指定传递的消息所携带的properties,使用null
// 参数4:指定发布的具体消息,byte[]类型
channel.basicPublish("","helloworldQueue",null,"发布者发布,helloworld".getBytes());
//6.释放资源
//注意顺序
// channel.close();
// connection.close();
}
}
创建Consumer消费者
//消费者
public class Consumer {
public static void main(String[] args)throws Exception {
System.out.println("Consumer...");
//配置连接参数
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.72.129");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
//获取连接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//要与生产者中的该方法一致(注:方法中的参数值必须保持一致)
channel.queueDeclare("helloworldQueue",false,false,false,null);
// 回调函数:消费(处理业务)的过程
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
//重写DefaultConsumer里的handleDelivery方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者接收到消息:"+new String(body));
}
};
//消费消息
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true表示接收到消息后,会立即告知RabbitMQ,false表示不告知)
//参数3:consumer - 指定消费回调函数
channel.basicConsume("helloworldQueue",true,defaultConsumer);
}
}
启动Docker
分别启动生产者和消费者进行测试
运行生产者,Queues中会有一条消息,当运行消费者后,消息会被处理,Queues中消息为0
由于没有设置持久化,服务器宕机后,消息队列会清空
在声明队列时需要配置队列参数
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化,值为true时表示持久化,rabbitmq宕机或重启后,队列依然在
//参数3:exclusive - 当前队列是否为排他队列,值为true时表示与当前连接(connection)绑定,连接关闭后队列也会随着消失
//参数4:autoDelete - 当前队列是否自动删除,值为true时表示队列中的消息一旦被消费,该队列会消失
//参数5:arguments - 指定当前队列的相关参数
6.4.4.1.2 SpringBoot整合RabbitMQ方式
为了区别上面的原生态方式,新建包
引入启动器
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.properties中增加配置
# 应用名称
spring.application.name=rabbitmq
# 应用服务 WEB 访问端口
server.port=8080
#rabbitMQ配置
spring.rabbitmq.host=192.168.72.129
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
SimpleConfig
//配置类
@Configuration
public class SimpleConfig {
@Bean
public Queue getQueue() {
return new Queue("simpleQueue", true, false, false, null);
}
}
注意Queue包
Publisher
//生产者
@Component
public class Publisher {
@Autowired
RabbitTemplate rabbitTemplate;
public void send(){
System.out.println("生产者:发布消息");
rabbitTemplate.convertAndSend("","simpleQueue","消息:有一个消息");
}
}
Consumer
//消费者
@Component
@RabbitListener(queues = "simpleQueue") //时刻监听着指定的队列
public class Consumer {
@RabbitHandler //@RabbitListener监听消息队列中是否有消息,有消息就调用@RabbitHandler标识的方法处理
public void receive(String msg) {
System.out.println("接收到消息:" + msg);
}
}
消费者需要用到@RabbitListener和@RabbitHandler注解
测试类
@SpringBootTest
public class RabbitmqApplication2 {
@Autowired
Publisher publisher;
@Test
public void test(){
publisher.send();
}
}
由于消费者有监听机制,所以只需要运行测试类里的publisher中的send方法即可,当消息队列中有消息,监听机制会自动使消费者处理消息
6.4.4.2 工作队列模式
6.4.4.2.1 说明
工作队列模式不需要交换机
一个生产者,一个队列,多个消费者,默认采用公平分发
6.4.4.2.2 代码
WorkConfig配置类
@Configuration
public class WorkConfig {
//工作模式:一个队列,多个消费者
@Bean
public Queue workQueue(){
System.out.println("workqueue模式创建队列");
return new Queue("workQueue",true);
}
}
Publisher生产者
//生产者
@Component
public class Publisher {
@Autowired
RabbitTemplate rabbitTemplate;
public void send(){
System.out.println("生产者:发布消息");
rabbitTemplate.convertAndSend("","workQueue","workqueue");
}
}
Consumer消费者1
//消费者
@Component
@RabbitListener(queues = "workQueue") //时刻监听着指定的队列
public class Consumer {
@RabbitHandler //@RabbitListener监听消息队列中是否有消息,有消息就调用@RabbitHandler标识的方法处理
public void receive(String msg) {
System.out.println("Consumer1 接收到消息:" + msg);
}
}
Consumer2 消费者2
//消费者
@Component
@RabbitListener(queues = "workQueue") //时刻监听着指定的队列
public class Consumer2 {
@RabbitHandler //@RabbitListener监听消息队列中是否有消息,有消息就调用@RabbitHandler标识的方法处理
public void receive(String msg) {
System.out.println("Consumer2 接收到消息:" + msg);
}
}
6.4.4.3 发布订阅模式
6.4.4.3.1 说明
使用该模式需要借助交换机,生产者将消息发送到交换机,再通过交换机到达队列。
交换机分四种:
- Direct:直接交换机,通过消息上的路由键(Routing key)直接对消息进行分发,相当于精确匹配,一对一
- Topic:交换机会将路由键和绑定上的模式进行通配符匹配,相当于模糊匹配,一对多
- Headers:消息头交换机,使用消息头的属性进行消息路由,相当于模糊匹配,一对多
- Fanout:扇出交换机,会将消息发送到所有和它进行绑定的队列上,广播,群发
默认交换机是Direct,发布订阅模式使用Fanout类型交换器
6.4.4.3.2 代码
FanoutConfig
@Configuration
public class FanoutConfig {
//发布订阅模式 一个生产者 一个交换机(FanoutExchange) 两个队列(队列需要绑定到交换机) 两个消费者
//创建扇出交换机
@Bean
public FanoutExchange fanoutExchange(){
System.out.println("创建交换机");
return new FanoutExchange("fanoutExchange",true,false);
}
//创建队列
@Bean
public Queue fanoutqueue1(){
return new Queue("fanoutqueue1",true);
}
@Bean
public Queue fanoutqueue2(){
return new Queue("fanoutqueue2",true);
}
//将队列绑定到交换机
//通过方法获得对象
@Bean
public Binding bindingqueque1(){
//通过BindingBuilder的bind方法将队列和交换机绑定在一起
return BindingBuilder.bind(fanoutqueue1()).to(fanoutExchange());
}
@Bean
public Binding bindingqueque2(){
return BindingBuilder.bind(fanoutqueue2()).to(fanoutExchange());
}
//通过形参获得对象
}
//生产者
@Component
public class Publisher {
@Autowired
RabbitTemplate rabbitTemplate;
public void send(){
System.out.println("生产者:发布消息");
rabbitTemplate.convertAndSend("fanoutExchange","","生产者:发布订阅模式");
}
}
//消费者
@Component
@RabbitListener(queues = "fanoutqueue1") //时刻监听着指定的队列
public class Consumer1 {
@RabbitHandler //@RabbitListener监听消息队列中是否有消息,有消息就调用@RabbitHandler标识的方法处理
public void receive(String msg) {
System.out.println("Consumer1接收到消息:" + msg);
}
}
//消费者
@Component
@RabbitListener(queues = "fanoutqueue2") //时刻监听着指定的队列
public class Consumer2 {
@RabbitHandler //@RabbitListener监听消息队列中是否有消息,有消息就调用@RabbitHandler标识的方法处理
public void receive(String msg) {
System.out.println("Consumer2接收到消息:" + msg);
}
}
6.4.4.4 路由模式
6.4.4.4.1 说明
一个生产者,一个交换机,多个队列,多个消费者
生产者将消息发送到Direct交换机(路由模式需要借助直连交换机实现),在绑定队列和交换机的时候有一个路由键(Routing key),生产者发送的消息会指定一个路由键,那么消息只会发送到相应key相同的队列,接着监听该队列的消费者消费消息。消费者可以选择性的接收消息
6.4.4.4.2 代码
RoutingConfig
//路由模式 一个生产者 一个交换机(直连交换机) 两个队列(路由key) 两个消费者
@Configuration
public class RoutingConfig {
//创建直连交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
//队列
@Bean
public Queue routingQueue1(){
return new Queue("routingQueue1");
}
@Bean
public Queue routingQueue2(){
return new Queue("routingQueue2");
}
//绑定
@Bean
public Binding bingroutingQueue1(){
//直连交换机必须带routingkey
return BindingBuilder.bind(routingQueue1()).to(directExchange()).with("1");
}
@Bean
public Binding bingroutingQueue2(){
//直连交换机必须带routingkey
return BindingBuilder.bind(routingQueue2()).to(directExchange()).with("2");
}
}
注意: 路由模式,绑定队列和交换机时需要指定路由键
RoutingPublisher
//生产者
@Component
public class RoutingPublisher {
@Autowired
RabbitTemplate rabbitTemplate;
public void send(){
System.out.println("生产者:发布消息");
//设置routingkey
rabbitTemplate.convertAndSend("directExchange","1","生产者:消息 1");
rabbitTemplate.convertAndSend("directExchange","2","生产者:消息 2");
rabbitTemplate.convertAndSend("directExchange","3","生产者:消息 3");
}
}
//消费者
@Component
@RabbitListener(queues = "routingQueue1") //时刻监听着指定的队列
public class RoutingConsumer1 {
@RabbitHandler //@RabbitListener监听消息队列中是否有消息,有消息就调用@RabbitHandler标识的方法处理
public void receive(String msg) {
System.out.println("Consumer1接收到消息:" + msg);
}
}
//消费者
@Component
@RabbitListener(queues = "routingQueue2") //时刻监听着指定的队列
public class RoutingConsumer2 {
@RabbitHandler //@RabbitListener监听消息队列中是否有消息,有消息就调用@RabbitHandler标识的方法处理
public void receive(String msg) {
System.out.println("Consumer2接收到消息:" + msg);
}
}
6.4.4.5 主题模式
6.4.4.5.1 说明
一个生产者,一个交换机,多个队列,多个消费者
主题模式又称通配符模式
使用直连交换机可以改善我们的系统,但是它仍有局限性,它不能实现多重条件的路由。
在消息系统中,我们不仅想要订阅基于路由键的队列,还想订阅基于生产消息的源。这时候可以使用Topic交换机。
使用主题交换机时不能采用任意写法的路由键,路由键的形式应该是由点分割的有意义的单词。例如"goods.stock.info"等。路由key最多255字节。
- * :代表一个单词
- # :代表0个或多个单词
6.4.4.5.2 代码
@Configuration
public class TopicConfig {
@Bean
public Queue topicQueue1(){
return new Queue("topicQueue1");
}
@Bean
public Queue topicQueue2(){
return new Queue("topicQueue2");
}
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
@Bean
public Binding bingTopicQueue1(){
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("China.*");
}
@Bean
public Binding bingTopicQueue2(){
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("America.#");
}
}
@Component
public class TopicPublisher {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private TopicExchange topicExchange;
public void send() {
System.out.println("TopicPublisher");
rabbitTemplate.convertAndSend(topicExchange.getName(), "China.beijing", "中国北京");
rabbitTemplate.convertAndSend(topicExchange.getName(), "America.Texas.Houston", "美国德克萨斯休斯顿");
}
}
@Component
@RabbitListener(queues = "topicQueue1")
public class TopicCustomer1 {
@RabbitHandler
public void receive(String msg) {
System.out.println("TopicCustomer1:" + msg);
}
}
@Component
@RabbitListener(queues = "topicQueue2")
public class TopicCustomer2 {
@RabbitHandler
public void receive(String msg) {
System.out.println("TopicCustomer2:" + msg);
}
}
但是如果将 TopicPublisher 中修改为
rabbitTemplate.convertAndSend(topicExchange.getName(), "China.beijing.haidian", "中国北京海淀区");
结果 TopicCustomer1就接收不到消息中国北京海淀
原因就是
- * :代表一个单词
- # :代表0个或多个单词