RabbitMQ是一个基于AMQP协议的消息队列系统,支持多种消息传递模式,包括点对点(P2P)、发布/订阅(Pub/Sub)和路由模式。RabbitMQ 的设计目标是提供高可用性、可扩展性和可靠性,适用于企业级应用集成、任务调度、异步处理等场景。
1、特点
- 多语言支持:RabbitMQ支持多种编程语言的客户端库,如Java、Python、C++、Go等。
- 丰富的消息传递模式:支持P2P、Pub/Sub和路由模式,满足不同的业务需求。
- 持久化和事务支持:RabbitMQ支持消息的持久化存储和事务处理,确保消息不会丢失。
- 插件机制:RabbitMQ提供了丰富的插件机制,可以扩展其功能,例如监控、管理、安全等。
- 高可用性和集群支持:RabbitMQ支持集群部署,提供高可用性和水平扩展能力。
- 可视化管理界面:RabbitMQ提供了一个Web管理界面,方便用户管理和监控队列、交换机、消费者等。
2、核心概念
(1)、生产者(Producer)
- 定义:生产者是向RabbitMQ发送消息的应用程序。它可以将消息发送到交换机(Exchange),也可以直接发送到队列(Queue)。
- 职责:负责生成和发送消息,通常不需要关心消息的路由和消费情况。
(2)、消费者(Consumer)
- 定义:消费者是从RabbitMQ队列中接收消息的应用程序。它可以订阅一个或多个队列,并处理接收到的消息。
- 职责:负责从队列中拉取消息并进行处理,确保消息被正确处理后确认(ACK)。
(3)、交换机(Exchange)
- 定义:交换机是RabbitMQ中的核心组件之一,负责接收生产者发送的消息,并根据路由规则将消息转发到一个或多个队列。
- 类型:
- Direct Exchange:根据精确的路由键(Routing Key)将消息转发到指定的队列。
- Fanout Exchange:将消息广播到所有绑定的队列,忽略路由键。
- Topic Exchange:根据路由键的模式匹配(如通配符)将消息转发到符合条件的队列。
- Headers Exchange:根据消息头中的属性进行路由,而不是路由键。
(4)、队列(Queue)
- 定义:队列是存储消息的地方。消息会按照先进先出(FIFO)的原则从队列中取出。
- 特性:
- 持久化:可以选择是否将队列和消息持久化到磁盘,确保在服务器重启后消息不会丢失。
- 排他性:可以创建排他队列,只有创建该队列的连接可以访问它。
- 自动删除:可以在没有消费者时自动删除队列。
(5)、绑定(Binding)
- 定义:绑定是将队列与交换机关联起来的机制。通过绑定,交换机可以知道将消息发送到哪些队列。
- 作用:定义了交换机和队列之间的关系,决定了消息如何从交换机路由到队列。
(6)、消息(Message)
- 定义:消息是生产者发送给消费者的单位数据。每个消息包含两部分:消息体(Body)和消息属性(Properties)。
- 属性:
- Delivery Mode:决定消息是否持久化(1表示非持久化,2表示持久化)。
- Content Type:指定消息的内容类型(如application/json、text/plain等)。
- Headers:可以包含自定义的键值对,用于传递额外的元数据。
- Routing Key:用于指定消息的目标队列或交换机。
(7)、确认机制(Acknowledgment)
- 定义:当消费者成功处理完一条消息后,必须向RabbitMQ发送确认(ACK)。只有在收到确认后,RabbitMQ才会将该消息从队列中移除。
- 确认方式:
- 手动确认:消费者显式调用ack()方法确认消息。
- 自动确认:消费者在接收到消息后立即自动确认,适合简单场景,但可能会导致消息丢失。
3、架构示意图
RabbitMQ的结构大致如下图,对于Publisher,Consumer,Exchange,Queue几个概念在上面已经做了解释。
Virtual Host:虚拟主机,是对exchange和queue等资源的逻辑分组。
Channel:每次连接建立的通道。可以理解为如图Publisher,Consumer与RabbitMQ之间的连线。
示例图:
工作流程:
第一步:Publisher建立与RabbitMQ的通道Channel,通过该Channel把Message发送给消息队列。
第二步:消息队列的exchange和queue都可以接收Publisher发送的消息。exchange可以理解为是路由器,之后会把消息转发到具体queue上,在被消费。
- queue直接接收消息一般用于点对点模式的场景。
关系:Publisher–>queue–>Consumer - exchange接收消息,在将消息转发给对应的queue,一般用于发布订阅的场景。
关系:Publisher–>exchange–>queue–>Consumer
第三步:Consumer建立与RabbitMQ的通道Channel,通过该Channel消费指定queue的消息。
4、Exchange分类
Exchange交换机是RabbitMQ中的核心组件之一,负责接收生产者发送的消息,并根据路由规则将消息转发到一个或多个队列。
类型:
- Fanout Exchange:将消息广播到所有绑定的队列,忽略路由键。
- Direct Exchange:根据精确的路由键(Routing Key)将消息转发到指定的队列。
- Topic Exchange:根据路由键的模式匹配(如通配符)将消息转发到符合条件的队列。
(1)、Fanout广播(单提供,多消费)
模型:
交换机和队列需要绑定关系。生产者的消息发给交换机,而不是直接发给队列,交换机会把消息转发给绑定的队列中,在进行广播消费。
代码示例:
配置类:
配置1个广播(Fanout)交换机和两个队列(指定名称),并将交换机和两个队列关系绑定。
消费者类:
监听两个队列。
提供者类:
消息发布到交换机上,由交换机的绑定关系转发到指定的队列中。
(2)、Direct路由(广播增强版)
广播模式下,只要消费者订阅了队列,就能消费队列的所有消息。
路由模式相对广播模式,增加了key的概念(可以理解为主题)。发送者发送消息时指定消息的key。消费者可以仅配置对应key的消息进行消费,而无需全部都接收。
示例理解:
某个新闻APP可以推送所有的体育新闻。用户A希望仅关注篮球的新闻,用户B希望仅关注羽毛球的新闻,用于C希望能看到篮球和羽毛球两种类型的新闻。那么分别把这几种类型的新闻转发到不同key的队列中,用户根据自己的意愿分别订阅不同key的新闻就可以了。
示例图:
代码示例:
消费者示例:
需要指定监听的队列,指定监听的交换机为路由模式,指定key。
生产者示例:
需要指定交换机和发送消息的key。
(3)、Topic主题(路由的增强版本)
Topic主题在路由的基础上,增加了通配符去标识一类key,简化了key的配置。
如:china.#,可以匹配china. china.news, china.weather…
示例图:
代码示例:
消费者示例:
指定监听的队列,监听交换机及模式topic,指定key(可加通配符)。
生产者示例:
指定key如果符合的队列的key,队列就会收到消息。
5、消息转换器
因为消息队列不仅可以传string对象的消息,还可以传Object等任意数据类型的消息。
如果传Object对象消息时,会序列化后保存到队列中。默认序列化是MessageConverter实现,底层是jdk的序列化,这样在消息队列中我们不仅看不懂消息,且占用的消息体会很大,浪费资源。所以可以使用类型转换器进行转换。
默认序列化后的消息如下:
解决方法:
配置JSON序列化器替代默认的序列化器。
代码示例:(生产者)
导入依赖:
注入json的序列化器:
注入的JSON转换器会替换默认JDK的MessageConverter转换器。
再次调用生产者生产消息:
可以看到很短,能看懂的消息。
消费者示例:
如果生产者自定义了消息序列化器,那么消费者也要注入相同的序列化器,不然解析会乱码。
导入依赖:
注入json的序列化:
需要和发送者序列化器一致,否则会造成消息乱码。
注意,消费者的入参消息需要和生产者发送的消息数据类型也要保持一致。建议:可以都使用Map<String,Object>对象。
6、RabbitMQ部署–docker
(1)、加载镜像
如果没有压缩过的tar镜像,可以直接使用pull命令拉取指定版本的镜像。
(2)、查看镜像
(3)、创建容器,指定用户名admin,密码123321,15672为可视化端口,5672为服务端口
命令示例:
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
运行结果:
(4)、查看启动信息
(5)、打开浏览器查看
如上的启动命令中,15672为web访问端口,5672为业务端口。
(6)、输入启动时的用户名和密码即可登录
7、应用场景
- 企业级应用集成:RabbitMQ适合用于不同系统之间的解耦和异步通信。
- 任务调度和异步处理:可以将任务放入队列中,由后台进程逐步处理,避免阻塞主线程。
- 事件驱动架构:支持Pub/Sub模式,适用于构建事件驱动的应用程序。
乘风破浪会有时,直挂云帆济沧海!!!