文章目录
- 1.什么是MQ
- 1.1 特点
- 1.2 MQ产品分类
- 2.RabbitMQ
- 2.1.RabbitMQ介绍
- 2.2.使用Docker安装RabbitMQ
- 3.SpringBoot中使用RabbitMQ
- 3.1.SpringAMQP
- 3.2使用步骤
1.什么是MQ
RabbitMQ官方文档
消息队列(Message Queue,简称MQ):是在消息的传输过程中保存消息的容器
。用于分布式系统之间进行异步通信
1.1 特点
解耦
异步
削峰
1.2 MQ产品分类
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司社区 | Rabbit | Apache | Apache 阿里 | Apache |
开发语言 | Erlang | Java | Java | Scala&Java |
协议 | AMQP | OpenWire、AUTO、Stomp、MQTT | 自定义 | 自定义 |
单机吞吐量 | 万级 | 万级(最差) | 十万级 | 十万级 |
消息延迟 | 微妙级 | 毫秒级 | 毫秒级 | 毫秒以内 |
特性 | 并发能力很强,延时很低 | 老牌产品,文档较多 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,毕竟是为大数据领域准备的。 |
- 中小型软件公司,建议选 RabbitMQ. 一方面,erlang 语言天生具备高并发的特性,RabbitMQ 的社区十分活跃,可以解决开发过程中遇到的bug,这点对于中小型公司来说十分重要。不考虑 RocketMQ 和 kafka 的原因是中小型软件公司不如互联网公司,数据量没那么大,选消息中间件,应首选功能比较完备的,所以 kafka 和 RocketMQ 排除。
- 大型软件公司,根据具体使用在 RocketMQ 和 kafka 之间二选一。一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量。针对 RocketMQ,大型软件公司也可以抽出人手对 RocketMQ 进行定制化开发,毕竟国内有能力改JAVA源码的人,还是相当多的。至于 kafka ,根据业务场景选择,如果有日志采集功能,肯定是首选 kafka 了。
2.RabbitMQ
2.1.RabbitMQ介绍
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。
2007年,Rabbit 技术公司基于 AMQP 标准采用 Erlang 语言开发的 RabbitMQ1.0 发布。
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类似HTTP。
- 生产者(Publisher):发送消息的应用
- 消费者(Consumer):接收消息的应用
- 队列(Queue):存储消息的缓存,一个队列queue存多条消息
- 消息(Message):由生产者通过RabbitMQ发送给消费者的信息,消息存储在消息队列queue中
- 连接(Connection):连接RabbitMQ和应用服务器的TCP连接
- 通道(Channel):连接里的一个虚拟通道。发送或接收消息都是通过通道进行的
- 交换机(Exchange):生产者发送消息先发送到交换机,再由交换机发送到队列中并保存在队列里。(相当于中转站)
- 绑定(Binding):绑定是队列和交换机的一个关联连接。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
- 路由键(Routing key):路由键是供交换机查看并根据键来决定如何分发消息到队列的一个键。(可以说是消息的目的地址)
- 用户(Users):在RabbitMQ中,可以通过指定的用户名和密码进来连接。每个用户可以分配不同的权限,例如读权限,写权限以及在实例里进行配置的权限。
- 服务器实体(Broker):表示消息队列服务器实体。
- 虚拟主机(Virtual Host):虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
2.2.使用Docker安装RabbitMQ
# 1.拉取RabbitMQ镜像
~]$ docker pull rabbitmq:3.10.6
# 2.创建容器并运行
~]$ docker run \
-e RABBITMQ_DEFAULT_USER=rabbitmq \ #登录用户名
-e RABBITMQ_DEFAULT_PASS=123456 \ #登录密码
--name rabbitmq \
--hostname rabbitmq \
--restart=always \ #Docker启动容器启动
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.10.6
# 3.进入容器内部
~]$ docker exec -it rabbitmq /bin/bash
rabbitmq-plugins enable rabbitmq_management # 开启WEB界面管理插件
#修改 management_agent.disable_metrics_collector = false
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
#退出容器
exit
#重启rabbitmq容器
~]$ docker restart rabbitmq
3.SpringBoot中使用RabbitMQ
3.1.SpringAMQP
SpringBoot中提供了一个spring-boot-starter-amqp依赖用于简化RabbitMQ 在Spring中开发应用。
SpringAMQP 是基于 AMQP 协议定义的一套API,提供了模板来发送和接收消息。包含两部分,spring-amqp 是基础抽象,spring-rabbit 是底层默认实现。 SpringAMQP 提供了三个功能:
- 自动声明队列、交换机及其绑定关系
- 基于注解的监听器模式,异步接收消息 (消费者)
- 封装了 RabbitTemplate 工具,用于发送消息 (生产者)
SpringAMQP 是基于 RabbitMQ 封装的一套模板,配合 SpringBoot 让 RabbitMQ 使用变得非常简单。
SpringAMQP 的官方地址:https://spring.io/projects/spring-amqp
3.2使用步骤
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
创建生产者测试类
@SpringBootTest
public class ProducerTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数
factory.setHost("192.168.6.131");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("rabbitmq");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "basic.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
创建消费者测试类
public class ConsumerTest {
@SneakyThrows
public static void main(String[] args) throws IOException{
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数
factory.setHost("192.168.6.131");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("rabbitmq");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "basic.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}