一 RabbitMQ概念
1 MQ
消息队列
MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信。
- 同步通信相当于两个人当面对话,你一言我一语。必须及时回复
- 异步通信相当于通过第三方转述对话,可能有消息的延迟,但不需要二人时刻保持联系。
消息
两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。
队列
数据结构中概念。在队列中,数据先进先出,后进后出。
2 MQ的优势
应用解耦
在电商平台中,用户下订单需要调用订单系统,此时订单系统还需要调用库存系统、支付系统、物流系统完成业务。此时会产生两个问题:
- 如果库存系统出现故障,会造成整个订单系统崩溃。
- 如果需求修改,新增了一个X系统,此时必须修改订单系统的代码。
如果在系统中引入MQ,即订单系统将消息先发送到MQ中,MQ再转发到其他系统,则会解决以下问题:
-
由于订单系统只发消息给MQ,不直接对接其他系统,如果库存系统出现故障,不影响整个订单。
-
如果需求修改,新增了一个X系统,此时无需修改订单系统的代码,只需修改MQ将消息发送给X系统即可。
异步提速
如果订单系统同步访问每个系统,则用户下单等待时长如下:
如果引入MQ,则用户下单等待时长如下:
削峰填谷
假设我们的系统每秒只能承载1000请求,如果请求瞬间增多到每秒 5000,则会造成系统崩溃。此时引入mq即可解决该问题
使用了MQ之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
3 MQ的劣势
- 系统可用性降低: 系统引入的外部依赖越多,系统稳定性越差。一旦MQ宕机,就会对业务造成影响。
- 系统复杂度提高: MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用。
- 一致性问题: A系统处理完业务,通过MQ给B、C、D三个系统发消息数据,如果B系统、C系统处理成功,D系统处理失败,则会造成数据处理的不一致。
4 MQ应用场景
-
抢红包、秒杀活动、抢火车票等
这些业务场景都是短时间内需要处理大量请求,如果直接连接系统处理业务,会耗费大量资源,有可能造成系统瘫痪。
而使用MQ后,可以先让用户将请求发送到MQ中,MQ会先保存请求消息,不会占用系统资源,且MQ会进行消息排序,先请求的秒杀成功,后请求的秒杀失败。
-
消息分发
如电商网站要推送促销信息,该业务耗费时间较多,但对时效性要求不高,可以使用MQ做消息分发。
-
数据同步
假如我们需要将数据保存到数据库之外,还需要一段时间将数据同步到缓存(如Redis)、搜索引擎(如Elasticsearch)中。此时可以将数据库的数据作为消息发送到MQ中,并同步到缓存、 搜索引擎中。
-
异步处理
在电商系统中,订单完成后,需要及时的通知子系统(进销存系 统发货,用户服务积分,发送短信)进行下一步操作。为了保证订单系统的高性能,应该直接返回订单结果,之后让MQ通知子系统做其他非实时的业务操作。这样能保证核心业务的高效及时。
-
离线处理
在银行系统中,如果要查询近十年的历史账单,这是非常耗时的操作。如果发送同步请求,则会花费大量时间等待响应。此时使用MQ发送异步请求,等到查询出结果后获取结果即可。
5 AMQP
RabbitMQ是由Erlang语言编写的基于AMQP的MQ产品。
AMQP
即Advanced Message Queuing Protocol(高级消息队列协议),是 一个网络协议,专门为消息中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受不同中间件产品,不同开发语言等条件的限制。2006年AMQP规范发布,类比HTTP。
AMQP工作过程
生产者(Publisher)将消息发布到交换机(Exchange),交换机根据规则将消息分发给交换机绑定的队列(Queue),队列再将消息投递给订阅了此队列的消费者。
6 RabbitMQ工作原理
Producer
消息的生产者。也是一个向交换机发布消息的客户端应用程序。
Connection
连接。生产者/消费者和RabbitMQ服务器之间建立的TCP连接。
Channel
信道。是TCP里面的虚拟连接。例如:Connection相当于电缆, Channel相当于独立光纤束,一条TCP连接中可以创建多条信道,增加连接效率。无论是发布消息、接收消息、订阅队列都是通过信道完成的。
Broker
消息队列服务器实体。即RabbitMQ服务器
Virtual host
虚拟主机。出于多租户和安全因素设计的,把AMQP的基本组件划分到一个虚拟的分组中。每个vhost本质上就是一个mini版的 RabbitMQ服务器,拥有自己的队列、交换机、绑定和权限机制。当多个不同的用户使用同一个RabbitMQ服务器时,可以划分出多个虚拟主机。RabbitMQ默认的虚拟主机路径是/
Exchange
交换机。用来接收生产者发送的消息,并根据分发规则,将这些消息分发给服务器中的队列中。不同的交换机有不同的分发规则。
Queue
消息队列。用来保存消息直到发送给消费者。它是消息的容器, 也是消息的终点。消息一直在队列里面,等待消费者链接到这个队列将其取走。
Binding
消息队列和交换机之间的虚拟连接,绑定中包含路由规则,绑定信息保存到交换机的路由表中,作为消息的分发依据。
Consumer
消息的消费者。表示一个从消息队列中取得消息的客户端应用程序。
RabbitMQ为什么使用信道而不直接使用TCP连接通信?
TCP连接的创建和销毁开销特别大。创建需要3次握手,销毁需 要4次分手。高峰时每秒成千上万条TCP连接的创建会造成资源 巨大的浪费。而且操作系统每秒处理TCP连接数也是有限制的, 会造成性能瓶颈。而如果一条线程使用一条信道,一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。
二 RabbitMQ安装
1 安装Erlang
RabbitMQ是使用Erlang语言编写的,所以在安装RabbitMQ前需要 先安装Erlang环境
安装Erlang所需的依赖
yum install -y epel-release
添加存储库条目
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm
rpm -Uvh erlang-solutions-1.0-1.noarch.rpm
安装Erlang
yum install erlang-24.2.1
查看Erlang是否安装成功
erl -version
2 安装RabbitMQ
为了外部能够正常访问RabbitMQ服务,先关闭防火墙
# 关闭运行的防火墙
systemctl stop firewalld.service
# 禁止防火墙自启动
systemctl disable firewalld.service
RabbitMQ是通过主机名进行访问的,必须给服务器添加主机名
# 修改文件
vim /etc/sysconfig/network
# 添加如下内容
NETWORKING=yes
HOSTNAME=itbaizhan
# 修改文件
vim /etc/hosts
# 添加如下内容
服务器ip itbaizhan
使用xftp上传RabbitMQ压缩文件
安装RabbitMQ
# 解压RabbitMQ
tar xf rabbitmq-server-generic-unix-3.9.13.tar.xz
# 重命名:
mv rabbitmq_server-3.9.13 rabbitmq
# 移动文件夹:
mv rabbitmq /usr/local/
配置环境变量
# 编辑/etc/profile文件
vim /etc/profile
#添加如下内容
export PATH=$PATH:/usr/local/rabbitmq/sbin
# 运行文件,让修改内容生效
source /etc/profile
开启管控台插件
rabbitmq-plugins enable rabbitmq_management
后台运行
#启动rabbitmq
rabbitmq-server -detached
#停止rabbitmq
rabbitmqctl stop
通过管控台访问RabbitMQ
路径: http://ip地址:15672 ,用户名: guest ,密码: guest
此时会提示guest账户只允许本地使用,我们可以配置允许使用 guest远程访问
# 创建配置文件夹
mkdir -p /usr/local/rabbitmq/etc/rabbitmq
# 创建配置文件
vim /usr/local/rabbitmq/etc/rabbitmq/rabbitmq.conf
# 添加如下内容
loopback_users=none
# 重启RabbitMQ
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
3 账户管理
guest账户默认只允许本地使用,我们可以创建新账户远程访问 RabbitMQ
创建账户
# 创建账户
rabbitmqctl add_user 用户名 密码
给用户授予管理员角色
rabbitmqctl set_user_tags 用户名 administrator
给用户授权
# "/"表示虚拟机
# itbaizhan表示用户名
# ".*" ".*" ".*" 表示完整权限
rabbitmqctl set_permissions -p "/" itbaizhan ".*" ".*" ".*"
通过管控台访问rabbitmq
4 管控台
5 Docker安装
关闭RabbitMQ服务
rabbitmqctl stop
在Centos7中安装docker
# 安装Docker
yum install -y docker
# 启动docker
systemctl start docker
# 设置docker自启动
systemctl enable docker
# 测试docker是否安装成功
docker run hello-world
拉取镜像
docker pull rabbitmq
启动RabbitMQ容器
docker run -d --hostname itbaizhan --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
开启管控台插件
# 查询rabbitmq容器ID
docker ps
# 进入容器
docker exec -it 容器ID /bin/bash
# 开启管控台插件
rabbitmq-plugins enable rabbitmq_management
# 退出容器
ctrl+p+q
通过管控台访问rabbitmq
路径: http://ip地址:15672 ,用户名: guest ,密码: guest
关闭RabbitMQ容器
docker stop rabbit
三 RabbitMQ工作模式
1 简单模式
1.1 概念
RabbitMQ共有六种工作模式:简单模式(Simple)、工作队列模式(Work Queue)、发布订阅模式(Publish/Subscribe)、路由模式(Routing)、通配符模式(Topics)、远程调用模式(RPC, 不常用,该文档不对此模式进行讲解)
首先我们讲解简单模式:
特点:
- 一个生产者对应一个消费者,通过队列进行消息传递。
- 该模式使用direct交换机,direct交换机是RabbitMQ默认交换机。
1.2 项目搭建
接下来我们使用JAVA代码操作RabbitMQ,让其按照简单模式进行工作。
JMS
由于MQ产品很多,操作方式各有不同,于是JAVA提供了一套规则 ——JMS,用于操作消息中间件。JMS即Java消息服务 (Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件的API。JMS是JavaEE规范中的一种,类比JDBC。很多 MQ产品都实现了JMS规范,例如ActiveMQ。RabbitMQ官方并没有实现JMS规范,但是开源社区有JMS的实现包。
创建项目
启动RabbitMQ
# 开启管控台插件
rabbitmq-plugins enable rabbitmq_management
# 启动rabbitmq
rabbitmq-server -detached
创建普通maven项目,添加RabbitMQ依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.14.0</version>
</dependency>
1.3 编写生产者
接下来我们编写生产者代码创建消息:
//生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.创建队列,如果该队列已经存在,则使用该队列
/**
* 参数一:队列名
* 参数二:是否持久化,true表示mq重启后队列还在
* 参数三:是否私有化,false表示所有的消费者都可以访问,true表示只有第一次拥有它的消费者才能访问
* 参数四:是否自动删除,true表示不再使用该队列时自动删除队列
* 参数五:其他额外参数
*/
channel.queueDeclare("simple_queue", false, false, false, null);
// 5.发送消息
String message = "hello rabbitmq";
/**
* 参数一:交换机名,""表示默认交换机
* 参数二:是路由键,简单模式就是队列名
* 参数三:其他额外参数
* 参数四:要传递的消息字节数组
*/
channel.basicPublish("", "simple_queue", null, message.getBytes());
// 6.关闭信道和连接
channel.close();
connection.close();
System.out.println("发送成功");
}
}
运行生产者后,我们可以看到在RabbitMQ中创建了队列,队列中已经有了消息。
1.4 编写消费者
接下来我们编写消费者代码消费消息:
//消费者
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
/**
* 参数一:监听的队列名
* 参数二:是否自动签收,如果设置为false,则需要手动确认消息已收到,否则mq会一直发送消息
* 参数三:Consumer的实现类,重写该类方法表示接收到的消息如何消费
*/
channel.basicConsume("simple_queue", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("接收消息,消息为:" + message);
}
});
}
}
运行消费者后,我们可以看到在RabbitMQ中的消息已经被消费。
2 工作队列模式
2.1 概念
与简单模式相比,工作队列模式(Work Queue)多了一些消费者,该模式也使用direct交换机,应用于处理消息较多的情况。特点如下:
- 一个队列对应多个消费者。
- 一条消息只会被一个消费者消费。
- 消息队列默认采用轮询的方式将消息平均发送给消费者。
2.2 编写生产者
接下来我们编写生产者代码创建大量消息:
//生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.创建队列,持久化队列
channel.queueDeclare("work_queue", true, false, false, null);
// 5.发送消息,参数三表示该消息为持久化消息,即除了保存到内存中还会保存到磁盘中
for (int i = 1; i <= 100; i++) {
channel.basicPublish("", "work_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
("这是今天的第" + i + "条消息").getBytes());
}
// 6.关闭资源
channel.close();
connection.close();
System.out.println("发送成功");
}
}
效果如下:
2.3 编写消费者
接下来我们编写三个消费者监听同一个队列:
//消费者1
public class Consumer1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
channel.basicConsume("work_queue", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费者1消费消息,消息为:" + message);
}
});
}
}
//消费者2
public class Consumer2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
channel.basicConsume("work_queue", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费者2消费消息,消息为:" + message);
}
});
}
}
//消费者3
public class Consumer3 {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
channel.basicConsume("work_queue", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("消费者3消费消息,消息为:" + message);
}
});
}
}
效果如下:
3 发布订阅模式
3.1 概念
在开发过程中,有一些消息需要不同消费者进行不同的处理,如电商网站的同一条促销信息需要短信发送、邮件发送、站内信发送 等。此时可以使用发布订阅模式(Publish/Subscribe)
特点:
- 生产者将消息发送给交换机,交换机将消息转发到绑定此交换机的
每个队列中
。 - 工作队列模式的交换机只能将消息发送给一个队列,发布订阅模式的交换机能将消息发送给多个队列。发布订阅模式使用fanout交换机。
3.2 编写生产者
接下来我们编写发布订阅模式的生产者:
//生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.创建交换机
/**
* 参数一:交换机名
* 参数二:交换机类型
* 参数三:交换机持久化
*/
channel.exchangeDeclare("exchange_fanout", BuiltinExchangeType.FANOUT, true);
// 5.创建队列
channel.queueDeclare("SEND_MAIL", true, false, false, null);
channel.queueDeclare("SEND_MESSAGE", true, false, false, null);
channel.queueDeclare("SEND_STATION", true, false, false, null);
// 6.交换机绑定队列
/**
* 参数一:队列名
* 参数二:交换机名
* 参数三:路由关键词,发布订阅模式写""即可
*/
channel.queueBind("SEND_MAIL", "exchange_fanout", "");
channel.queueBind("SEND_MESSAGE", "exchange_fanout", "");
channel.queueBind("SEND_STATION", "exchange_fanout", "");
// 7.发送消息
for (int i = 1; i <= 10; i++) {
channel.basicPublish("exchange_fanout", "", null,
("您好,尊敬的用户,秒杀商品开始了!" + i).getBytes());
}
// 8.关闭资源
channel.close();
connection.close();
}
}
效果如下:
3.3 编写消费者
接下来编写三个消费者,分别监听各自的队列。
//邮件消费者
public class ConsumerMail {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MAIL", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("发送邮件:" + message);
}
});
}
}
//站内信消费者
public class ConsumerStation {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
channel.basicConsume("SEND_STATION", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("发送站内信:" + message);
}
});
}
}
//短信消费者
public class ConsumerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("发送短信:" + message);
}
});
}
}
也可以使用工作队列+发布订阅模式同时使用,两个消费者同时监听 一个队列:
//短信消费者2
public class ConsumerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MESSAGE", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("发送短信2:" + message);
}
});
}
}
效果如下:
4 路由模式
4.1 概念
使用发布订阅模式时,所有消息都会发送到绑定的队列中,但很多时候,不是所有消息都无差别的发布到所有队列中。比如电商网站的促销活动,双十一大促可能会发布到所有队列;而一些小的促销活动为了节约成本,只发布到站内信队列。此时需要使用路由模式 (Routing)完成这一需求。
特点:
- 每个队列绑定路由关键字RoutingKey
- 生产者将带有RoutingKey的消息发送给交换机,交换机根据RoutingKey转发到指定队列。路由模式使用direct交换机。
4.2 编写生产者
接下来我们编写路由模式的生产者:
//生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.创建交换机
channel.exchangeDeclare("exchange_routing", BuiltinExchangeType.DIRECT, true);
// 5.创建队列
channel.queueDeclare("SEND_MAIL2", true, false, false, null);
channel.queueDeclare("SEND_MESSAGE2", true, false, false, null);
channel.queueDeclare("SEND_STATION2", true, false, false, null);
// 6.交换机绑定队列
channel.queueBind("SEND_MAIL2", "exchange_routing", "import");
channel.queueBind("SEND_MESSAGE2", "exchange_routing", "import");
channel.queueBind("SEND_STATION2", "exchange_routing", "import");
channel.queueBind("SEND_STATION2", "exchange_routing", "normal");
// 7.发送消息
channel.basicPublish("exchange_routing","import",null,
"双十一大促活动".getBytes());
channel.basicPublish("exchange_routing", "normal", null,
"小型促销活动".getBytes());
// 8.关闭资源
channel.close();
connection.close();
}
}
4.3 编写消费者
接下来我们编写路由模式的消费者:
//邮件消费者
public class ConsumerMail {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MAIL2", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("发送邮件:" + message);
}
});
}
}
//短信消费者
public class ConsumerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MESSAGE2", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("发送短信:" + message);
}
});
}
}
//站内信消费者
public class ConsumerStation {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
channel.basicConsume("SEND_STATION2", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("发送站内信:" + message);
}
});
}
}
运行生产者和消费者,效果如下:
5 通配符模式
5.1 概念
通配符模式(Topic)是在路由模式的基础上,给队列绑定带通配符的路由关键字,只要消息的RoutingKey能实现通配符匹配,就会将消息转发到该队列。通配符模式比路由模式更灵活,使用topic交换机。
通配符规则:
- 消息设置RoutingKey时,RoutingKey由多个单词构成,中间以 . 分割。
- 队列设置RoutingKey时, # 可以匹配任意多个单词, * 可以匹配任意一个单词。
5.2 编写生产者
接下来我们编写通配符模式的生产者:
//生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.创建交换机
channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC, true);
// 5.创建队列
channel.queueDeclare("SEND_MAIL3", true, false, false, null);
channel.queueDeclare("SEND_MESSAGE3", true, false, false, null);
channel.queueDeclare("SEND_STATION3", true, false, false, null);
// 6.交换机绑定队列
channel.queueBind("SEND_MAIL3", "exchange_topic", "#.mail.#");
channel.queueBind("SEND_MESSAGE3", "exchange_topic", "#.message.#");
channel.queueBind("SEND_STATION3", "exchange_topic", "#.station.#");
// 7.发送消息
channel.basicPublish("exchange_topic", "mail.message.station",
null,
"双十一大促活动".getBytes());
channel.basicPublish("exchange_topic", "station",
null,
"小型促销活动".getBytes());
// 8.关闭资源
channel.close();
connection.close();
}
}
5.3 编写消费者
接下来我们编写通配符模式的消费者:
//邮件消费者
public class ConsumerMail {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MAIL3", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("发送邮件:" + message);
}
});
}
}
//短信消费者
public class ConsumerMessage {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
channel.basicConsume("SEND_MESSAGE3", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("发送短信:" + message);
}
});
}
}
//站内信消费者
public class ConsumerStation {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.66.100");
connectionFactory.setPort(5672);
connectionFactory.setUsername("itbaizhan");
connectionFactory.setPassword("itbaizhan");
connectionFactory.setVirtualHost("/");
// 2.创建连接
Connection connection = connectionFactory.newConnection();
// 3.建立信道
Channel channel = connection.createChannel();
// 4.监听队列
channel.basicConsume("SEND_STATION3", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("发送站内信:" + message);
}
});
}
}
四 SpringBoot整合RabbitMQ
1 项目搭建
之前我们使用原生JAVA操作RabbitMQ较为繁琐,接下来我们使用 SpringBoot整合RabbitMQ,简化代码编写。
创建SpringBoot项目,引入RabbitMQ起步依赖
<!-- RabbitMQ起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写配置文件
spring:
rabbitmq:
host: 192.168.66.100
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
#日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
2 创建对列和交换机
SpringBoot整合RabbitMQ时,需要在配置类创建队列和交换机, 写法如下:
@Configuration
public class RabbitConfig {
private final String EXCHANGE_NAME = "boot_topic_exchange";
private final String QUEUE_NAME = "boot_queue";
// 创建交换机
@Bean("bootExchange")
public Exchange getExchange() {
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME) // 交换机类型
.durable(true) // 是否持久化
.build();
}
// 创建队列
@Bean("bootQueue")
public Queue getMessageQueue() {
return new Queue(QUEUE_NAME); // 队列名
}
// 交换机绑定队列
@Bean
public Binding bindMessageQueue(
@Qualifier("bootExchange") Exchange exchange,
@Qualifier("bootQueue") Queue queue) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("#.message.#")
.noargs();
}
}
3 编写生产者
SpringBoot整合RabbitMQ时,提供了工具类RabbitTemplate发送 消息,编写生产者时只需要注入RabbitTemplate即可发送消息。
@SpringBootTest
public class TestProducer {
// 注入RabbitTemplate工具类
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() {
/**
* 发送消息
* 参数1:交换机
* 参数2:路由key
* 参数3:要发送的消息
*/
rabbitTemplate.convertAndSend(
"boot_topic_exchange",
"message",
"双十一开始了!");
}
}
运行生产者代码,即可看到消息发送到了RabbitMQ中:
4 编写消费者
我们编写另一个SpringBoot项目作为RabbitMQ的消费者
创建SpringBoot项目,引入RabbitMQ起步依赖
<!-- rabbitmq起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写配置文件
spring:
rabbitmq:
host: 192.168.66.100
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
#日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
编写消费者,监听队列
@Component
public class Consumer {
// 监听队列
@RabbitListener(queues = "boot_queue")
public void listen_message(String message) {
System.out.println("发送短信:" + message);
}
}
启动项目,可以看到消费者会消费队列中的消息
五 消息的可靠性投递
1 概念
RabbitMQ消息投递的路径为:
生产者
—> 交换机
—> 队列
—> 消费者
在RabbitMQ工作的过程中,每个环节消息都可能传递失败,那么 RabbitMQ是如何监听消息是否成功投递的呢?
- 确认模式(confirm)可以监听消息是否从生产者成功传递到交换机。
- 退回模式(return)可以监听消息是否从交换机成功传递到队列。
- 消费者消息确认(Consumer Ack)可以监听消费者是否成功处理消息。
首先我们准备两个SpringBoot项目,分别代表生产者和消费者,配置文件如下:
spring:
rabbitmq:
host: 192.168.66.100
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
#日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
在生产者的配置类创建交换机和队列
@Configuration
public class RabbitConfig {
private final String EXCHANGE_NAME="my_topic_exchange";
private final String QUEUE_NAME="my_queue";
// 1.创建交换机
@Bean("bootExchange")
public Exchange getExchange(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME) // 交换机类型
.durable(true) // 是否持久化
.build();
}
// 2.创建队列
@Bean("bootQueue")
public Queue getMessageQueue(){
return QueueBuilder
.durable(QUEUE_NAME) // 队列持久化
.build();
}
// 3.将队列绑定到交换机
@Bean
public Binding bindMessageQueue(
@Qualifier("bootExchange") Exchange exchange,
@Qualifier("bootQueue") Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("my_routing")
.noargs();
}
}
2 确认模式
确认模式(confirm)可以监听消息是否从生产者成功传递到交换 机,使用方法如下:
生产者配置文件开启确认模式
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
# 开启确认模式
publisher-confirm-type: correlated
生产者定义确认模式的回调方法
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testConfirm(){
// 定义确认模式的回调方法,消息向交换机发送后会调用confirm方法
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 被调用的回调方法
* @param correlationData 相关配置信息
* @param ack 交换机是否成功收到了消息
* @param cause 失败原因
*/
@Override
public void confirm(CorrelationData correlationData,
boolean ack, String cause) {
if (ack){
System.out.println("confirm接受成功!");
}else{
System.out.println("confirm接受失败,原因为:"+cause);
// 做一些处理。
}
}
});
rabbitTemplate.convertAndSend("my_topic_exchange",
"my_routing","send message...");
}
}
3 退回模式
退回模式(return)可以监听消息是否从交换机成功传递到队列, 使用方法如下:
生产者配置文件开启退回模式
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
# 开启确认模式
publisher-confirm-type: correlated
# 开启回退模式
publisher-returns: true
生产者定义退回模式的回调方法
@SpringBootTest
public class ProducerTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testReturn(){
// 定义退回模式的回调方法。交换机发送到队列失败后才会执行returnedMessage方法
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
/**
* @param returned 失败后将失败信息封装到参数中
*/
@Override
public void returnedMessage(ReturnedMessage returned){
System.out.println("消息对象:"+returned.getMessage());
System.out.println("错误码:"+returned.getReplyCode());
System.out.println("错误信息:"+returned.getReplyText());
System.out.println("交换机:"+returned.getExchange());
System.out.println("路由键:"+returned.getRoutingKey());
// 处理消息...
}
});
rabbitTemplate.convertAndSend("my_topic_exchange",
"my_routing1","send message...");
}
}
4 Ack
在RabbitMQ中,消费者接收到消息后会向队列发送确认签收的消 息,只有确认签收的消息才会被移除队列。这种机制称为消费者消息确认(Consumer Acknowledge,简称Ack)。类似快递员派送快递也需要我们签收,否则一直存在于快递公司的系统中。
消息分为自动确认和手动确认。自动确认指消息只要被消费者接收到,无论是否成功处理消息,则自动签收,并将消息从队列中移除。但是在实际开发中,收到消息后可能业务处理出现异常,那么消息就会丢失。此时需要设置手动签收,即在业务处理成功再通知 签收消息,如果出现异常,则拒签消息,让消息依然保留在队列当 中。
自动确认:spring.rabbitmq.listener.simple.acknowledge=“none”
手动确认:spring.rabbitmq.listener.simple.acknowledge=“manual”
消费者配置开启手动签收
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
# 开启手动签收
listener:
simple:
acknowledge-mode: manual
消费者处理消息时定义手动签收和拒绝签收的情况
@Component
public class AckConsumer {
@RabbitListener(queues = "my_queue")
public void listenMessage(Message message, Channel channel) throws
IOException, InterruptedException {
// 消息投递序号,消息每次投递该值都会+1
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
int i = 1/0; //模拟处理消息出现bug
System.out.println("成功接受到消息:"+message);
// 签收消息
/**
* 参数1:消息投递序号
* 参数2:是否一次可以签收多条消息
*/
channel.basicAck(deliveryTag,true);
}catch (Exception e){
System.out.println("消息消费失败!");
Thread.sleep(2000);
// 拒签消息
/**
* 参数1:消息投递序号
* 参数2:是否一次可以拒签多条消息
* 参数3:拒签后消息是否重回队列
*/
channel.basicNack(deliveryTag,true,true);
}
}
}
六 RabbitMQ高级特性
1 消费端限流
之前我们讲过MQ可以对请求进行“削峰填谷”,即通过消费端限流的方式限制消息的拉取速度,达到保护消费端的目的。
消费端限流的写法如下:
生产者批量发送消息
@Test
public void testSendBatch() {
// 发送十条消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing",
"sendmessage..."+i);
}
}
消费端配置限流机制
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
listener:
simple:
# 限流机制必须开启手动签收
acknowledge-mode: manual
# 消费端最多拉取5条消息消费,签收后不满5条才会继续拉取消息。
prefetch: 5
消费者监听队列
@Component
public class QosConsumer{
@RabbitListener(queues = "my_queue")
public void listenMessage(Message message, Channel channel) throws
IOException, InterruptedException {
// 1.获取消息
System.out.println(newString(message.getBody()));
// 2.模拟业务处理
Thread.sleep(3000);
// 3.签收消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}
}
2 利用限流实现不公平分发
在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处 理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以采用不公平分发,即谁处理的快,谁处理的消息多。
使用方法如下:
生产者批量发送消息
@Test
public void testSendBatch() {
// 发送十条消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing",
"sendmessage..."+i);
}
}
消费端配置不公平分发
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
listener:
simple:
# 限流机制必须开启手动签收
acknowledge-mode: manual
# 消费端最多拉取1条消息消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发
prefetch: 1
编写两个消费者
@Component
public class UnfairConsumer {
// 消费者1
@RabbitListener(queues = "my_queue")
public void listenMessage1(Message message, Channel channel) throws Exception{
//1.获取消息
System.out.println("消费者1:"+new String(message.getBody(),"UTF-8"));
//2. 处理业务逻辑
Thread.sleep(500); // 消费者1处理快
//3. 手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
// 消费者2
@RabbitListener(queues = "my_queue")
public void listenMessage2(Message message, Channel channel) throws Exception{
//1.获取消息
System.out.println("消费者2:"+new String(message.getBody(),"UTF-8"));
//2. 处理业务逻辑
Thread.sleep(3000);// 消费者2处理慢
//3. 手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
3 消息存活时间
RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL), 当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ 可以对队列的所有消息设置存活时间,也可以对某条消息设置存活时间。
设置队列所有消息存活时间
在创建队列时设置其存活时间:
@Configuration
public class RabbitConfig2 {
private final String EXCHANGE_NAME="my_topic_exchange2";
private final String QUEUE_NAME="my_queue2";
// 1.创建交换机
@Bean("bootExchange2")
public Exchange getExchange2(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)
.durable(true)
.build();
}
// 2.创建队列
@Bean("bootQueue2")
public Queue getMessageQueue2(){
return QueueBuilder
.durable(QUEUE_NAME)
.ttl(10000) //队列的每条消息存活10s
.build();
}
// 3.将队列绑定到交换机
@Bean
public Binding bindMessageQueue2(
@Qualifier("bootExchange2") Exchange exchange,
@Qualifier("bootQueue2") Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("my_routing")
.noargs();
}
}
生产者批量生产消息,测试存活时间
@Test
public void testSendBatch2() throws
InterruptedException {
// 发送十条消息
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing",
"sendmessage..."+i);
}
}
设置单条消息存活时间
@Test
public void testSendMessage() {
//设置消息属性
MessageProperties messageProperties = new MessageProperties();
//设置存活时间
messageProperties.setExpiration("10000");
// 创建消息对象
Message message = new Message("sendmessage...".getBytes(), messageProperties);
// 发送消息
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
}
注意:
- 如果设置了单条消息的存活时间,也设置了队列的存活时间,以时间短的为准。
- 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。
@Test
public void testSendMessage2() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// 1.创建消息属性
MessageProperties messageProperties = new MessageProperties();
// 2.设置存活时间
messageProperties.setExpiration("10000");
// 3.创建消息对象
Message message = new Message(("send message..." +i).getBytes(), messageProperties);
// 4.发送消息
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing", message);
} else {
rabbitTemplate.convertAndSend("my_topic_exchange", "my_routing",
"sendmessage..." + i);
}
}
}
在以上案例中,i=5的消息才有过期时间,10s后消息并没有马上被移除,但该消息已经不会被消费了,当它到达队列顶端时会被移除。
4 优先级队列
假设在电商系统中有一个订单催付的场景,即客户在一段时间内未付款会给用户推送一条短信提醒,但是系统中分为大型商家和小型商家。比如像苹果,小米这样大商家一年能给我们创造很大的利润,所以在订单量大时,他们的订单必须得到优先处理,此时就需要为不同的消息设置不同的优先级,此时我们要使用优先级队列。
优先级队列用法如下:
@Configuration
public class RabbitConfig3 {
private final String EXCHANGE_NAME="priority_exchange";
private final String QUEUE_NAME="priority_queue";
// 1.创建交换机
@Bean(EXCHANGE_NAME)
public Exchange priorityExchange(){
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)
.durable(true)
.build();
}
// 2.创建队列
@Bean(QUEUE_NAME)
public Queue priorityQueue(){
return QueueBuilder
.durable(QUEUE_NAME)
//设置队列的最大优先级,最大可以设置到255,官网推荐不要超过10,
//如果设置太高比较浪费资源
.maxPriority(10)
.build();
}
// 3.将队列绑定到交换机
@Bean
public Binding bindPriority(
@Qualifier(EXCHANGE_NAME) Exchange exchange,
@Qualifier(QUEUE_NAME) Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("my_routing")
.noargs();
}
}
编写生产者
@Test
public void testPriority() {
for (int i = 0; i < 10; i++) {
if (i == 5) {
// i为5时消息的优先级较高
MessageProperties messageProperties = new MessageProperties();
messageProperties.setPriority(9);
Message message = new Message(("send message..." +i).getBytes(),
messageProperties);
rabbitTemplate.convertAndSend("priority_exchange", "my_routing", message);
} else {
rabbitTemplate.convertAndSend("priority_exchange", "my_routing",
"send message..."+ i);
}
}
}
编写消费者
@Component
public class PriorityConsumer {
@RabbitListener(queues ="priority_queue")
public void listenMessage(Message message, Channel channel) throws Exception{
//获取消息
System.out.println(newString(message.getBody()));
//手动签收
channel.basicAck(message.getMessageProperties().getDeliveryTag(),true);
}
}
七 RabbitMQ死信队列
1 概念
在MQ中,当消息成为死信(Dead message)后,消息中间件可以将其从当前队列发送到另一个队列中,这个队列就是死信队列。而在RabbitMQ中,由于有交换机的概念,实际是将死信发送给了死信交换机(Dead Letter Exchange,简称DLX)。死信交换机和死信队列和普通的没有区别。
消息成为死信的情况:
- 队列消息长度到达限制。
- 消费者拒签消息,并且不把消息重新放入原队列。
- 消息到达存活时间未被消费。
2 代码实现
创建死信队列
@Configuration
public class RabbitConfig4 {
private final String DEAD_EXCHANGE = "dead_exchange";
private final String DEAD_QUEUE = "dead_queue";
private final String NORMAL_EXCHANGE = "normal_exchange";
private final String NORMAL_QUEUE = "normal_queue";
// 死信交换机
@Bean(DEAD_EXCHANGE)
public Exchange deadExchange(){
return ExchangeBuilder
.topicExchange(DEAD_EXCHANGE)
.durable(true)
.build();
}
// 死信队列
@Bean(DEAD_QUEUE)
public Queue deadQueue(){
return QueueBuilder
.durable(DEAD_QUEUE)
.build();
}
// 死信交换机绑定死信队列
@Bean
public Binding bindDeadQueue(
@Qualifier(DEAD_EXCHANGE) Exchange exchange,
@Qualifier(DEAD_QUEUE)Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("dead_routing")
.noargs();
}
// 普通交换机
@Bean(NORMAL_EXCHANGE)
public Exchange normalExchange(){
return ExchangeBuilder
.topicExchange(NORMAL_EXCHANGE)
.durable(true)
.build();
}
// 普通队列
@Bean(NORMAL_QUEUE)
public Queue normalQueue(){
return QueueBuilder
.durable(NORMAL_QUEUE)
.deadLetterExchange(DEAD_EXCHANGE) // 绑定死信交换机
.deadLetterRoutingKey("dead_routing") // 死信队列路由关键字
.ttl(10000) // 消息存活10s
.maxLength(10) // 队列最大长度为10
.build();
}
// 普通交换机绑定普通队列
@Bean
public Binding bindNormalQueue(
@Qualifier(NORMAL_EXCHANGE) Exchange exchange,
@Qualifier(NORMAL_QUEUE) Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("my_routing")
.noargs();
}
}
测试死信队列
生产者发送消息
@Test
public void testDlx(){
// 1.存活时间过期后变成死信
// rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
// 2.超过队列长度后变成死信
// for (int i = 0; i < 20; i++){
// rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
// }
// 3.消息拒签但不返回原队列后变成死信
rabbitTemplate.convertAndSend("normal_exchange","my_routing","测试死信");
}
消费者拒收消息
@Component
public class DlxConsumer {
@RabbitListener(queues ="normal_queue")
public void listenMessage(Messagemessage, Channel channel) throws
IOException {
// 拒签消息
/**
* 参数1:消息投递序号
* 参数2:是否一次可以拒签多条消息
* 参数3:拒签后消息是否重回队列
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),true,false);
}
}
八 RabbitMQ延迟队列
1 概念
延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。
例如:用户下单后,30分钟后查询订单状态,未支付则会取消订单。
但RabbitMQ中并未提供延迟队列功能,我们可以使用死信队列实现延迟队列的效果。
2 死信队列实现
接下来我们使用死信队列实现延迟队列
创建SpringBoot订单模块,添加SpringMVC、RabbitMQ、 lombok依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
编写配置文件
spring:
rabbitmq:
host: 192.168.0.162
port: 5672
username: itbaizhan
password: itbaizhan
virtual-host: /
# 日志格式
logging:
pattern:
console: '%d{HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n'
创建队列和交换机
@Configuration
public class RabbitConfig {
// 订单交换机和队列
private final String ORDER_EXCHANGE = "order_exchange";
private final String ORDER_QUEUE = "order_queue";
// 过期订单交换机和队列
private final String EXPIRE_EXCHANGE = "expire_exchange";
private final String EXPIRE_QUEUE = "expire_queue";
// 过期订单交换机
@Bean(EXPIRE_EXCHANGE)
public Exchange deadExchange(){
return ExchangeBuilder
.topicExchange(EXPIRE_EXCHANGE)
.durable(true)
.build();
}
// 过期订单队列
@Bean(EXPIRE_QUEUE)
public Queue deadQueue(){
return QueueBuilder
.durable(EXPIRE_QUEUE)
.build();
}
// 将过期订单队列绑定到交换机
@Bean
public Binding bindDeadQueue(
@Qualifier(EXPIRE_EXCHANGE) Exchange exchange,
@Qualifier(EXPIRE_QUEUE)Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("expire_routing")
.noargs();
}
// 订单交换机
@Bean(ORDER_EXCHANGE)
public Exchange normalExchange(){
return ExchangeBuilder
.topicExchange(ORDER_EXCHANGE)
.durable(true)
.build();
}
// 订单队列
@Bean(ORDER_QUEUE)
public Queue normalQueue(){
return QueueBuilder
.durable(ORDER_QUEUE)
.ttl(10000) // 存活时间为10s,模拟30min
.deadLetterExchange(EXPIRE_EXCHANGE) // 绑定死信交换机
.deadLetterRoutingKey("expire_routing") //死信交换机的路由关键字
.build();
}
// 将订单队列绑定到交换机
@Bean
public Binding bindNormalQueue(
@Qualifier(ORDER_EXCHANGE) Exchange exchange,
@Qualifier(ORDER_QUEUE) Queue queue){
return BindingBuilder
.bind(queue)
.to(exchange)
.with("order_routing")
.noargs();
}
}
编写下单的控制器方法,下单后向订单交换机发送消息
@RestController
public class OrderController {
@Autowired
private RabbitTemplate rabbitTemplate;
//下单
@GetMapping("/place/{orderId}")
public String placeOrder(@PathVariable String orderId){
System.out.println("处理订单数据...");
// 将订单id发送到订单队列
rabbitTemplate.convertAndSend("order_exchange", "order_routing", orderId);
return "下单成功,修改库存";
}
}
编写监听死信队列的消费者
// 过期订单消费者
@Component
public class ExpireOrderConsumer {
// 监听队列
@RabbitListener(queues ="expire_queue")
public void listenMessage(String orderId){
System.out.println("查询"+orderId+"号订单的状态,如果已支付则无需处理,如果未支付则需要回退库存");
}
}
下单测试:
访问http://localhost:8080/place/10001
3 插件实现
在使用死信队列实现延迟队列时,会遇到一个问题:RabbitMQ只 会移除队列顶端的过期消息,如果第一个消息的存活时长较长,而第二个消息的存活时长较短,则第二个消息并不会及时执行。
RabbitMQ虽然本身不能使用延迟队列,但官方提供了延迟队列插件,安装后可直接使用延迟队列。
安装延迟队列插件
使用xftp将插件上传至虚拟机
rabbitmq_delayed_message_exchange-3.9.0.ez
安装插件
# 将插件放入RabbitMQ插件目录中
mv rabbitmq_delayed_message_exchange3.9.0.ez /usr/local/rabbitmq/plugins/
# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启RabbitMQ服务
#停止rabbitmq
rabbitmqctl stop
#启动rabbitmq
rabbitmq-server restart -detached
此时登录管控台可以看到交换机类型多了延迟消息
使用延迟队列
创建延迟交换机和延迟队列
@Configuration
public class RabbitConfig2 {
private final String DELAYED_EXCHANGE = "delayed_exchange";
private final String DELAYED_QUEUE = "delayed_queue";
//1.延迟交换机
@Bean(DELAYED_EXCHANGE)
public Exchange delayedExchange() {
// 创建自定义交换机
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type","topic"); // topic类型的延迟交换机
return new CustomExchange(DELAYED_EXCHANGE, "xdelayed-message", true, false, args);
}
//2.延迟队列
@Bean(DELAYED_QUEUE)
public Queue delayedQueue() {
return QueueBuilder
.durable(DELAYED_QUEUE)
.build();
}
// 3.绑定
@Bean
public Binding bindingDelayedQueue(
@Qualifier(DELAYED_QUEUE) Queue queue,
@Qualifier(DELAYED_EXCHANGE) Exchange exchange) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with("order_routing")
.noargs();
}
}
编写下单的控制器方法
@GetMapping("/place2/{orderId}")
public String placeOrder2(@PathVariable String orderId) {
System.out.println("处理订单数据...");
// 设置消息延迟时间为10秒
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(10000);
eturn message;
}
};
// 将订单id发送到订单队列
rabbitTemplate.convertAndSend("delayed_exchange", "order_routing", orderId,
messagePostProcessor);
return "下单成功,修改库存";
}
编写延迟队列的消费者
@RabbitListener(queues = "delayed_queue")
public void listenMessage(String orderId){
System.out.println("查询"+orderId+"号订单的状态,如果已支付则无需处理,如果未支付则需要回退库 存");
}
下单测试:
访问http://localhost:8080/place/10002
九 RabbitMQ集群
1 集群搭建
在生产环境中,当单台RabbitMQ服务器无法满足消息的吞吐量及安全性要求时,需要搭建RabbitMQ集群。
设置两个RabbitMQ服务
# 关闭RabbitMQ服务
rabbitmqctl stop
# 设置服务一
RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=rabbit1 rabbitmq-server start -detached
# 设置服务二
RABBITMQ_NODE_PORT=5674 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener
[{port,15674}]" RABBITMQ_NODENAME=rabbit2 rabbitmq-server start -detached
将两个服务设置到同一集群中
# 关闭服务2
rabbitmqctl -n rabbit2 stop_app
# 重新设置服务2
rabbitmqctl -n rabbit2 re
# 将服务2加入服务1中
rabbitmqctl -n rabbit2 join_cluster rabbit1@localhost
# 启动服务2
rabbitmqctl -n rabbit2 start_app
2 镜像队列
搭建了集群后,虽然多个节点可以互相通信,但队列只保存在了一 个节点中,如果该节点故障,则整个集群都将丢失消息。
# 关闭服务1
rabbitmqctl -n rabbit1 stop_app
此时我们需要引入镜像队列机制,它可以将队列消息复制到集群中的其他节点上。如果一个节点失效了,另一个节点上的镜像可以保证服务的可用性。
在管控台点击 Admin —> Policies 设置镜像队列
此时某个节点故障则不会影响整个集群
3 负载均衡
无论是生产者还是消费者,只能连接一个RabbitMQ节点,而在我们使用RabbitMQ集群时,如果只连接一个RabbitMQ节点,会造成 该节点的压力过大。我们需要平均的向每个RabbitMQ节点发送请 求,此时需要一个负载均衡工具帮助我们分发请求,接下来使用 Haproxy做负载均衡:
安装Haproxy
yum -y install haproxy
配置Haproxy
vim /etc/haproxy/haproxy.cfg
添加如下内容:
# 以下为修改内容
defaults
# 修改为tcp
mode tcp
# 以下为添加内容
listen rabbitmq_cluster
# 对外暴露端口
bind 0.0.0.0:5672
mode tcp
balance roundrobin
# 代理RabbitMQ的端口
server node1 127.0.0.1:5673 check inter 5000 rise 2 fall 2
server node2 127.0.0.1:5674 check inter 5000 rise 2 fall 2
listen stats
# Haproxy控制台路径
bind 192.168.0.162:8100
mode http
option httplog
stats enable
stats uri /rabbitmq-stats
stats refresh 5s
启动Haproxy
haproxy -f /etc/haproxy/haproxy.cfg
访问Haproxy控制台:
http://192.168.0.162:8100/rabbitmq-stats
生产者连接Haproxy发送消息
// 生产者
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.0.162");
connectionFactory.setPort(5672);
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setVirtualHost("/");
Connection conn = connectionFactory.newConnection();
Channel channel =conn.createChannel();
channel.queueDeclare("simple_queue",false, false, false, null);
channel.basicPublish("","simple_queue", null,"hello!rabbitmq!".getBytes());
channel.close();
conn.close();
}
}