关于 RabbitMQ
消息队列(Message Queuing,简写为 MQ)最初是为了解决金融行业的特定业务需求而产生的。慢慢的,MQ 被应用到了更多的领域,然而商业 MQ 高昂的价格让很多初创公司望而却步,于是 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)应运而生。
随着 AMQP 草案的发布,两个月后,RabbitMQ 1.0 就发布了。
RabbitMQ 的架构模型可以分为客户端和服务端两部分,客户端包括生产者和消费者,服务端包括虚拟主机、交换器和队列。
整体的流程非常简单,生产者将消息发送到服务端,消费者从服务端获取对应的消息。
生产者在发送消息前需要先确认发送给哪个虚拟主机的哪个交换器,再由交换器通过路由键将消息转发给与之绑定的队列。
最后,消费者到指定的队列中获取自己的消息进行消费。
客户端
生产者和消费者都属于客户端。
生产者:消息的发送方,将要发送的消息封装成一定的格式,发送给服务端。消息包括消息体和标签。
消费者:消息的接收方,负责消费消息体。
服务端
虚拟主机、交换机、队列都属于服务端。
虚拟主机:用来对交换器和队列进行逻辑隔离,在同一个虚拟主机下,交换器和队列的名称不能重复。有点类似 Java 中的包,同一个包下,不能有相同名称的类或者接口。
交换器:负责接收生产者发来的消息,并根据规则分配给对应的队列,不生产消息,只是消息的搬运工。
队列:负责存储消息,生产者发送的消息会放在这里,消费者从这里取。
连接和信道
连接和信道是两个不同的概念,连接的英文叫 connection,信道叫 channel。
连接里包含了多条信道,连接用的是 TCP 连接,因为 AMQP 就是用 TCP 实现的。
为什么不直接使用连接,而要在连接的基础上新建信道呢?
因为 TCP 连接是比较昂贵的,新建需要三次握手,销毁需要四次挥手,所以如果每个线程在想 RabbitMQ 服务端发送/接收消息的时候都新建一个 TCP 连接,就会非常的消耗资源,于是就有了信道。
信道是线程私有的,连接是线程共享的。
信道+连接的模式,既保证了线程之间的私密性,又减少了系统开销。
业务场景
消息队列的主要功能有三种:
- 异步处理,比如说在做电商业务的时候,提交订单的动作可能涉及到创建订单、扣除库存、增加用户积分、发送订单邮件等。它们并不是一个串行的操作,可以把发送订单邮件和增加用户积分交给消息队列去做。
- 系统解耦,消息队列可以作为不同系统之间的桥梁,且不受系统技术栈的约束。
- 缓冲削峰,消息队列可以将大量的请求放到队列中,然后再按照一定的顺序规则交给业务服务器处理。
工作模式
RabbitMQ 支持 7 种工作模式:
- 简单模式
- 工作队列模式
- 广播模式
- 路由模式
- 动态路由模式
- 远程模式
- 生产者确认模式
我们这里只演示前三种,
简单模式
简单模式真的超级简单,生产者将消息发送给队列,消费者从队列中获取消息队列即可。
生活中就类似于 快递员将包裹放到快递柜,然后给取件人发一个取件码,取件人通过取件码去快递柜里取包裹📦。
工作队列模式
工作队列模式在本质上只比简单模式对了一个队列,消费者从一个变成了多个。生产者将消息放入到队列中,多个消费者会一次进行消费。
比如说有 3 个消费者,生产者向队列发送 3 条消息,3 个消费者会没人消费一条消息,有点雨露均沾的意味。
当然了,也可以通过配置,将其改成能者多劳的模式。
广播模式
与工作队列模式不同,广播模式就有交换器参与了。在广播模式下,即使生产者只生产了一条消息,它对应的所有消费者都能全部接收,真正做到了公平公正公开。
安装配置 RabbitMQ
RabbitMQ 的安装方式可以参考官方:
Installing RabbitMQ | RabbitMQ
- 服务器数据统计——消息投递情况,以及连接、信道、交换器、队列、消费者的数量
- RabbitMQ 节点信息——erlang 进程、内存、磁盘空间等
- 端口和 Web 信息
- 。。。
启动RabbitMQ服务。
rabbitmq-server.bat
我们点击 admin 面板 点击虚拟主机新建一个 codingmore 的虚拟主机。
并新建一个用户 admin:
并设置它的权限。
整合 RabbitMQ
第一步,在 pom.xml 文件中添加 RabbitMQ 的 starter 依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
第二步,在 application.yml 文件中添加 RabbitMQ 的配置。
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: admin
password: admin
virtual-host: codingmore
简单模式
第三步,新建 RabbitMQController 控制器类,添加 sendSimple 生产者接口。
@RestController
@Api(tags = "整合 RabbitMQ")
@RequestMapping("/rabbitmq")
public class RabbitMQController {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostMapping("/sendSimple")
@ApiOperation("简单模式")
public ResultObject sendSimple(String routingKey, String message) {
rabbitTemplate.convertAndSend(routingKey, message);
return ResultObject.success("ok");
}
}
RabbitTemplate 是 Spring Boot 为我们封装好的操作 RabbitMQ 的工具类。
第四步,新建 SimpleConsumer 类,添加简单模式的消费者。
@Slf4j
@Component
@RabbitListener(queuesToDeclare = @Queue("simple"))
public class SimpleConsumer {
@RabbitHandler
public void receive(String message) {
log.info("简单模式:{}", message);
}
}
启动服务,在浏览器地址栏访问 http://localhost:8080/doc.html
打开 Swagger。
输入参数,点击发送。
在Intellij IDEA 中可以看到输出信息。
这就表示我们完成了 RabbitMQ 的简单模式。
工作队列模式
在 RabbitMQController 控制器中添加 sendWork 工作队列接口:
@PostMapping("/sendWork")
@ApiOperation("工作队列模式")
public ResultObject sendWork(String routingKey, String message) {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend(routingKey, "第" + i + "消息:" + message);
}
return ResultObject.success("ok");
}
新建 WorkConsumer 类,添加工作队列模式的消费者。
@Slf4j
@Component
public class WorkConsumer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receiveOne(String message) {
log.info("工作队列模式 receiveOne:{}", message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receiveTwo(String message) {
log.info("工作队列模式 receiveTwo:{}", message);
}
}
build 服务,在浏览器地址栏打开 http://localhost:8080/doc.html
刷新 Swagger。
输入参数,点击发送。
在Intellij IDEA 中可以看到输出信息。
这就表示我们完成了 RabbitMQ 的工作队列模式。
广播模式
在 RabbitMQController 控制器中添加 sendBroadcast 广播接口:
@PostMapping("/sendBroadcast")
@ApiOperation("广播模式")
public ResultObject sendBroadcast(String exchange, String message) {
rabbitTemplate.convertAndSend(exchange, "",message);
return ResultObject.success("ok");
}
新建 BroadcastConsumer 类,添加广播模式的消费者。
@Slf4j
@Component
public class BroadcastConsumer {
@RabbitListener(bindings = @QueueBinding(value = @Queue,
exchange = @Exchange(name = "fanout", type = "fanout")))
public void receiveOne(String message) {
log.info("广播模式 receiveOne:{}", message);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue,
exchange = @Exchange(name = "fanout", type = "fanout")))
public void receiveTwo(String message) {
log.info("广播模式 receiveTwo:{}", message);
}
}
注意这里的 Exchange(交换器)名字要是 fanout,它是 RabbitMQ 默认的一种交换器。
Fanout模式不需要处理路由键(所以我们在 sendBroadcast 接口中,convertAndSend 方法中传递的 routingKey 是空的),我们只需要简单的将队列绑定到exchange上,发送到exchange的每一个消息都会被转发到与该exchange绑定的所有队列上。
Fanout类型的Exchange转发消息是最快的。除此之外,还有 Direct Exchange、Topic Exchange,大家可以去了解一下。
build 服务,在浏览器地址栏打开 http://localhost:8080/doc.html
刷新 Swagger。
输入参数,点击发送。
在Intellij IDEA 中可以看到输出信息。
可以看到两个消费者都消费了消息,这就表示我们完成了 RabbitMQ 的广播模式。