一、MQ与RabbitMQ概述
1. MQ简述
MQ(Message Queue)消息队列,是基础数据结构中 “先进先出” 的一种数据结构,也是在消息的传输过程中保存消息的容器(中间件),多用于分布式系统之间进行通信。
一般MQ用来解决系统耦合、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。(AP架构)
分布式系统有两种通信方式:直接远程调用 和 借助第三方(MQ)完成间接通信。(发送方称为生产者,接收方称为消费者)
2. MQ的优势与劣势
2.1 MQ的优势
MQ的优势:(应用解耦、异步、削峰)
- 应用解耦:提高系统容错性和可维护性;
- 异步提速:提升用户体验和系统吞吐量;
- 削峰填谷:提高系统稳定性。
1、应用解耦
2、异步提速
3、削峰填谷
填谷:
使用了MQ之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”,从而提升系统的稳定性。
2.2 MQ的劣势
引入MQ会遇到下列问题:
- 消息可靠性问题(如何确保发送的消息至少被消费者消费一次,避免消息丢失问题)
- 延迟消息问题 (如何实现消息的延迟投递,解决方案:使用延时队列、TTL、延迟队列插件实现)
- 高可用问题(如何避免单点MQ故障而导致的不可用问题,解决方案:搭建MQ集群)
- 消息堆积问题(如何解决数百万消息堆积,无法及时消费的问题)
3. 常见的MQ产品
市面上有很多MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用Rdis充当消息队列的场景。在实际技术选型时,需要结合自身需求及MQ产品特征来综合考虑。
几种常见MQ的对比:
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang(二郎神,高并发语言) | Java | Java | Scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议,社区封装了http协议支持 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般(万级) | 差 | 高(十万级) | 非常高(十万级) |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
功能特性 | 并发能力强,性能极其好,延迟低,社区活跃,管理界面丰富 | 老牌产品,成熟度高,文档较多 | MQ功能比较完备,扩展性佳 | 只支持主要的MQ功能,毕竟是为大数据领域准备的。 |
-
追求可用性(高->低):Kafka、 RocketMQ 、RabbitMQ;
-
追求可靠性:RabbitMQ、RocketMQ;
-
追求吞吐能力:RocketMQ、Kafka;
-
追求消息低延迟:RabbitMQ、Kafka。
4. RabbitMQ简述
RabbitMQ官网地址:http://www.rabbitmq.com/
RabbitMQ是基于AMQP协议使用Erlang语言开发的一款消息队列产品。
AMQP (全称Advanced Message Queuing Protocol,表示高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。(同类的协议还有MQTT用于物联网场景下)
RabbitMQ中的一些角色:(AMQP协议消息中间件类似)
- publisher:生产者;
- consumer:消费者;
- exchange :交换机,负责消息路由;
- queue:队列,存储消息;
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离。
RabbitMQ工作模式:
RabbitMQ提供了6种工作模型,但是我们常用的只有5种:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。(第六种RPC远程调用不属于mq)
- 官网对应模式介绍:https://www.rabbitmq.com/getstarted.html
JMS(JavaMessage Service)
- JMS,Java消息服务应用程序接口,即Java操作消息中间件的API;
- JMS是JavaEE规范的一种,类比JDBC;
- 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ官方没有提供JMS的实现包,但是开源社区有。
二、RabbitMQ安装与配置
1. 基于docker快速安装RabbitMQ
扩展:docker-compose安装rabbitmq:https://gitee.com/aopmin/docker-compose/blob/master/Linux/RabbitMQ/docker-compose.yml
1、拉取镜像
docker pull rabbitmq:3.8-management
2、运行容器
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v /docker/rabbitmq/plugins:/plugins \
--name rabbitmq \
--hostname my-rabbit \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3.8-management
- \ 代表换行
- -e 指定环境变量
- -e RABBITMQ_DEFAULT_USER=admin用户名
- -e RABBITMQ_DEFAULT_PASS=123456密码
- -v 挂载目录或文件 (数据卷)
- -p 15672:15672 用于web管理页面使用的端口 (管理员页面)
- -p 5672:5672 用于生产和消费端使用的端口(通信端口,也就是在代码里使用)
- -d 后台运行
- –name rabbitmq 容器名字
- –hostname my-rabbit(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名);
3、启动xxx插件
# 进入容器
docker exec -it rabbitmq /bin/bash
# 启动xxx插件
rabbitmq-plugins enable xxx
RabbitMQ管理端:
管理端访问地址:http://ip:15672/
2. 创建用户和虚拟机
1、添加一个新用户:
添加成功后列表会显示该用户,但是这个用户没有操作权限,需要为他创建一个虚拟机:
2、创建虚拟机
为指定用户授权:
最后该用户就可以操作这个虚拟机了:
三、RabbitMQ快速入门
使用简单模型中的基本模式完成消息传递:
官方的HelloWorld示例是基于简单消息队列模型来实现的,其中包括三个角色:
- publisher:消息发布者,将消息发送到队列queue;
- queue:消息队列,负责接受并缓存消息;
- consumer:订阅队列,处理队列中的消息。
1. 基础环境搭建
1、创建父工程mq-demo,并在pom文件中导入如下依赖:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.9.RELEASE</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--SpringAMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
2、创建子模块publisher、consumer,并编写启动类和yml配置文件:
# 日志输出格式配置
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
2. publisher消息发布者实现
消息收发流程:Connection连接、Channel通道、queue队列、exchange 交换机。
publisher消息发布者实现思路:
- 建立连接
- 创建Channel
- 声明队列
- 发送消息
- 关闭连接和channel
1、编写publisher测试代码:
package com.baidou.mq.test;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者
* @author 白豆五
* @version 2023/04/27
* @since JDK8
*/
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.200.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
2、在建立连接处打断点,并以debug方式启动(方便观察每个组件的创建)
查看连接信息:
继续按F8,查看通道信息:
继续按F8,查看队列信息:
最后直接放行程序,查看队列中的消息:
3. consumer消费者实现
consumer消费者实现思路:
- 建立连接
- 创建Channel
- 声明队列
- 订阅消息
1、编写消费者代码
package com.baidou.mq.test;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
* @author 白豆五
* @version 2023/04/27
* @since JDK8
*/
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.200.128");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
2、测试(启动程序后会一直执行,不用的时候将程序结束即可)
四、SpringAMQP与RabbitMQ工作模型
1. SpringAMQP概述
AMQP是消息中间件收发消息的协议(规范),具体实现由各个消息中间厂商实现;(例如 RabbitMQ)
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAMQP的官方地址:https://spring.io/projects/spring-amqp
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系;
- 基于注解的监听器模式,异步接收消息;
- 封装了RabbitTemplate工具,用于发送消息 。
RabbitMQ工作模型:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。
2. BasicQueue 简单队列模型
使用SpringAMQP实现基础消息队列功能:
1、在父工程中引入spring-amqp起步依赖:
<!--SpringAMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、消息发送
2.1、在publisher服务的application.yml中添加rabbitmq配置:
# 配置rabbitmq
spring:
rabbitmq:
host: 192.168.200.128 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: admin # 用户名
password: 123456 # 密码
# 配置日志格式
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
2.2、在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
package com.baidou.mq.test;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* 使用SpringAMQP实现简单队列模型的消息发送
*
* @author 白豆五
* @version 2023/04/27
* @since JDK8
*/
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
// 操作RabbitMQ的模板类
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 测试简单队列模型
*/
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
3、消息接收
3.1、在consumer服务的application.yml中添加rabbitmq配置:
# 配置rabbitmq
spring:
rabbitmq:
host: 192.168.200.128 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: admin # 用户名
password: 123456 # 密码
# 配置日志格式
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
3.2、在consumer服务的com.baidou.mq.listener
包中创建SpringRabbitListener类:
package com.baidou.mq.listener;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 消息监听类
*
* @author 白豆五
* @version 2023/04/27
* @since JDK8
*/
@Component
public class SpringRabbitListener {
/**
* 订阅消息
*
* @param msg
* @throws InterruptedException
*/
@RabbitListener(queues = "simple.queue") // 指定监听的队列名称为simple.queue
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
4、测试
先启动consumer服务(启动类),然后在publisher服务中运行测试代码,发送MQ消息。