前言
本文章将介绍RabbitMQ的Work模式,其中这个模式又细分为轮询分发和公平分发,本文将会用Java代码结合RabbitMQ的web管理界面进行实操演示。
官网文档地址:https://rabbitmq.com/getstarted.html
什么是Work模式
RabbitMQ的Work模式是一种简单的消息队列模式,也叫做“竞争消费者模式”或“任务分发模式”。在这种模式下,多个消费者同时监听同一个队列,当队列中有消息时,多个消费者之间会进行竞争,只有一个消费者能够获得这个消息并进行处理。其他消费者则需要等待下一个消息的到来。
工作原理是这样的:生产者向队列中发送任务消息,多个消费者同时监听队列,当队列中有消息时,RabbitMQ会将消息分发给其中的一个消费者。每个消费者都会独立处理自己分配到的任务,消费者之间的工作是平等的,不会区分谁优先谁没有优先,直到队列中的任务被消费完或者没有消费者在处理为止。
在工作模式中,RabbitMQ允许多个消费者同时监听同一个队列,并且每个消费者只能接收一条消息,这使得消息在执行过程中可以分布到多个消费者中,并且每个消费者可以执行自己的任务。这种模式广泛应用于分布式系统中的任务调度或者并行处理等场景中。
Work模式中的轮询分发
在消息队列中,轮询是指多个消费者从消息队列中获取消息的方式。消费者将以轮询的方式获取消息,也就是每个消费者按照顺序逐个接收消息。当一个消费者接收到一条消息时,它会对这个消息进行处理,然后确认已经处理完成,随后再尝试获取下一条消息,如果当前消费者在处理消息时,消息队列对其它消费者进行了轮询派发,则消息队列会将该消息重新分配给其他消费者。
比如说,消息队列中有消息 A、B、C、D、E 五条消息,同时有三个消费者(消费者 1、消费者 2 和消费者 3)在等待消息,那么轮询的过程大致如下:
- 消费者 1 获取消息 A,并进行处理,确认已经处理完毕,然后尝试获取下一条消息;
- 消费者 2 获取消息 B,并进行处理,确认已经处理完毕,然后尝试获取下一条消息;
- 消费者 3 获取消息 C,并进行处理,确认已经处理完毕,然后尝试获取下一条消息;
- 消费者 1 尝试获取下一条消息,但此时没有消息可供获取;
- 消费者 2 尝试获取下一条消息,但此时没有消息可供获取;
- 消费者 3 尝试获取下一条消息,此时消息队列轮询派发了消息 D 给消费者 3,因此消费者 3 获取消息 D 并进行处理;
- 消费者 1 尝试获取下一条消息,此时消息队列轮询派发了消息 E 给消费者 1,因此消费者 1 获取消息 E 并进行处理;
- 消费者 2 尝试获取下一条消息,但此时没有消息可供获取。
这样一直轮询下去,直到所有的消息都被处理完毕。
代码实现
创建一个maven项目,导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--RabbitMQ依赖-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
</dependencies>
创建生产者,代码如下:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("ip地址");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
//===============================end topic模式==================================
for (int i = 1; i <= 20; i++) {
//消息的内容
String msg = "学习:" + i + "号";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue1", null, msg.getBytes());
}
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
对于消费者,我们需要创建两个来进行演示轮询分发,并且这两个消费者还是指向同一个消息队列:
//work1
import com.rabbitmq.client.*;
import java.io.IOException;
public class Work1 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("ip地址");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work1");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
// channel.queueDeclare("queue1", false, false, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(2000);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work1-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class Work2 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("ip地址");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work2");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, true, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
//channel.basicQos(1);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(200);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work2-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
注意:执行上面三段代码的开始执行顺序,work1和work2先开启,后面在开始producer分发任务。可以想想为什么要这样(doge)
我们需要注意到的是,work1和work2的处理速度是不一样的,work1每次处理消息后,会使线程睡眠2秒,而work2的处理速度是0.2秒,当我们执行开始后,你会发现,work2已经结束了消息的接收,但是work1还在工作,work2不会继续到消息队列中获取消息,它是忙完就下班了!但是最终结果一定是均分的,结果如下:
在这些work1 和 work2 的两端代码中,有一个代码是值得我们了解一下的
Channel finalChannel = channel;
finalChannel.basicQos(1);
这段代码是用来设置RabbitMQ消费者的最大并发处理量(max prefetch count),即同时从队列中预取的消息的个数。通过这个属性的设置,我们可以让RabbitMQ在可控的范围内平衡系统的负载,避免所有消息都在一瞬间被一台消费端处理完而造成其他消费端处于闲置状态的情况。
一般来讲,如果没有显式的设置 prefetch count,RabbitMQ会默认设置一个 prefetch count 值为 0,这时候它将会帮助你处理尽可能多的消息。如果你需要其他值,可以使用channel.basicQos 方法进行设置。在多个消费者队列的情况下,不同队列可能分配不同的 max prefetch count 值。
在上面的代码中,可以看到我们设置了 prefetch count 值为 1。这意味着在当前消费者从队列中预取的消息达到了 1 条之后,它需要等待确认(acknowledge)之前的消息处理完后才能预取下一条消息。这样可以避免当前消费者占用过多的 RabbitMQ 资源,也可以保证程序的稳定性。
Work模式中的公平分发
与轮询分发不同的是,公平分发是一种类似于能力越大责任越大的机制,这个分发模式下,如果某个消费者的带宽大处理速度快,处理的消息数量就多,反之亦然。简而言之,按劳分配
代码
生产者代码与轮询模式的一样,就不在放出来
消费者代码如下:
import com.rabbitmq.client.*;
import java.io.IOException;
public class Work1 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("43.139.42.244");
connectionFactory.setPort(5678);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work1");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
// channel.queueDeclare("queue1", false, false, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(2000);
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work1-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
public class Work2 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("43.139.42.244");
connectionFactory.setPort(5678);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work2");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, true, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
//channel.basicQos(1);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(200);
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work2-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
公平分发的代码与轮询代码不同的地方如下两张图对比
※公平分发
※轮询分发
这个不同地方的参数为:**autoAck,它表示的意义为是否开启自动确认模式。**如果为true,表示一旦消费端成功收到消息,就立即确认消息,RabbitMQ就会将其从队列中删除。如果为false,则需要手动确认消息。
这个参数的设置意义就是实现消息的手动确认,处理不同线程处理任务不同的耗时而导致一个线程忙死一个线程闲死的资源没有充分利用问题。使用这个手动确认,我们就可以做到“按劳分配”的机制
按照work1+work2->producer的先后,执行代码,结果如下:
因为work1的线程睡眠为2秒,work2为0.2秒,所以work2 能力大,处理的消息就越多。
总结
(1)当队列里消息较多时,我们通常会开启多个消费者处理消息;公平分发和轮询分发都是我们经常使用的模式。
(2)轮询分发的主要思想是“按均分配”,不考虑消费者的处理能力,所有消费者均分;这种情况下,处理能力弱的服务器,一直都在处理消息,而处理能力强的服务器,在处理完消息后,处于空闲状态;
(3)公平分发的主要思想是”能者多劳”,按需分配,能力强的干的多。