RabbitMQ工作队列模式
- 为什么要有工作队列模式
- 如何使用工作队列模式
- 轮询
- 消息确认
- 验证消息确认
- 消息持久化
- 公平调度
- 验证公平调度
- **现在将消费者1中的Thread.sleep(1000)改为Thread.sleep(3000);不添加公平调度相关代码进行测试。**
- 现在将消费者1中的Thread.sleep(1000)改为Thread.sleep(3000);添加公平调度相关代码进行测试。**
- 总结
- 如果博主的文章对您有所帮助,可以评论、点赞、收藏,支持一下博主!!!
为什么要有工作队列模式
1.rabbitmq所有模式都具备的优点:避免立即执行资源密集型型任务,并且不得不等待它完成。我们可以将这个资源密集型任务安排再以后完成。将任务封装为消息并将其发送到队列中,后台运行工作进行将从队列中获取任务,并执行任务。
2.在工作队列模式中可以有多个worker时,任务将在他们之间共享,可以并行处理任务。例如队列中一共有10个任务,有两个worker,他们将共同完成10个任务。可以是第一个worker完成4个任务,第二个worker完成6个任务。又或者是第一个worker完成5个任务,第二个worker完成5个任务。并行处理任务,大大缩短了时间,提高了执行效率。
如何使用工作队列模式
示例业务:一个生产者,生产10条消息(任务)并放入工作队列中。两个消费者(也可以理解为工人)共同执行消息(任务)。
RabbitMQUtils工具类,用于创建连接和信道
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : RabbitMQUtils
* @description : [rabbitmq工具类]
* @createTime : [2023/1/17 8:49]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 8:49]
* @updateRemark : [描述说明本次修改内容]
*/
public class RabbitMQUtils {
/*
* @version V1.0
* Title: getConnection
* @author Wangwei
* @description 创建rabbitmq连接
* @createTime 2023/1/17 8:52
* @param []
* @return com.rabbitmq.client.Connection
*/
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//RabbitMQ服务器ip地址
factory.setHost("ip");
//端口号 默认值为5672
factory.setPort(5672);
//为用户分配的虚拟主机 默认值为/
factory.setVirtualHost("/");
//用户名 默认为guest
factory.setUsername("用户名");
//密码 默认为guest
factory.setPassword("密码");
//创建连接
Connection connection=factory.newConnection();
return connection;
}
/*
* @version V1.0
* Title: getChannel
* @author Wangwei
* @description 创建信道
* @createTime 2023/1/17 8:55
* @param []
* @return com.rabbitmq.client.Channel
*/
public static Channel getChannel() throws IOException, TimeoutException {
Connection connection=getConnection();
Channel channel=connection.createChannel();
return channel;
}
}
Producer生产者:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : Producer
* @description : [生产者1]
* @createTime : [2023/1/17 8:48]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 8:48]
* @updateRemark : [描述说明本次修改内容]
*/
public class Producer {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
//连接RabbitMQ
RabbitMQUtils.getConnection();
//获取信道
Channel channel = RabbitMQUtils.getChannel();
/*
* TASK_QUEUE_NAME 队列名称
* durable 队列是否持久化,true表示队列为持久化, 持久化的队列会存盘,在服务器重启的时候会保证不丢失相关信息
* exclusive 设置是否排他true表示队列为排他的, 如果一个队列被设置为排他队列,该队列仅对首次声明它的连接可见, 并在连接断开时自动删除,
(这里需要注意三点:1.排他队列是基于连接Connection可见的,
同一个连接的不同信道Channel是可以同时访问同一连接创建的排他队列;
"首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,
这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出,
该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景)
*autoDelete 设置是否自动删除。为true 则设置队列为自动删除。自动删除的前提是, 至少有一个消费者连接到这个队列,
之后所有与这个队列连接的消费者都断开时,才会自动删除。
不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这个队列自动删除",
因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列
*arguments 可以设置队列其它参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
*
* */
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
//发送10条消息
for (int i = 0; i <10 ; i++) {
String message="廊坊师范..."+i;
/*
*交换机命名,不填写使用默认的交换机
* routingKey -路由键道具-消息的其他属性-路由头等正文
* 消息正文
* **/
//推送消息
channel.basicPublish("",TASK_QUEUE_NAME, null,message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
ConsumerOne消费者1
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : ConsumerOne
* @description : [消费者1]
* @createTime : [2023/1/17 9:07]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 9:07]
* @updateRemark : [描述说明本次修改内容]
*/
public class ConsumerOne {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
/*
* @version V1.0
* Title: doWork
* @author Wangwei
* @description 模拟任务的执行时间
* @createTime 2023/1/18 9:21
* @param [task]
* @return void
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
//如果消息中存在.则1秒之后继续,有几个.停止几秒
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
ConsumerTwo消费者2
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : ConsumerTwo
* @description : [消费者2]
* @createTime : [2023/1/17 9:10]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 9:10]
* @updateRemark : [描述说明本次修改内容]
*/
public class ConsumerTwo {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
/*
* @version V1.0
* Title: doWork
* @author Wangwei
* @description 模拟任务的执行时间
* @createTime 2023/1/18 9:21
* @param [task]
* @return void
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
//如果消息中存在.则1秒之后继续,有几个.停止几秒
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
运行程序
先启动两个消费者,再启动生产者
可以看到生产者发送了10条消息
消费者1和消费者2分别消费了5条消息
轮询
默认情况下,RabbitMQ将按顺序将每条消息发送给下一个消费者。平均每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询。
从上面我们的示例中就可以看出,两个消费者分别消费了5条消息,并且都是消息的消费都是按照顺序进行消费。
消息确认
执行一个任务可能需要几秒钟的时间,您可能想知道如果使用者启动了一个较长的任务,并且在完成之前终止了该任务会发生什么。在我们当前的代码中,一旦RabbitMQ将消息传递给用户,它就会立即将其标记为删除。在这种情况下,如果终止一个消费者,它刚刚处理的消息就会丢失。发送给这个特定消费者但尚未处理的消息也会丢失。
但是我们不想失去任何消息。如果一个消费者死了,我们希望将任务传递给另一个消费者。
为了确保消息永远不会丢失,RabbitMQ支持消息确认。一个确认信息被消费者发送回来,告诉RabbitMQ已经接收、处理了一个特定的消息,并且RabbitMQ可以自由删除它。
如果一个消费者死了(它的通道关闭了,连接关闭了,或者TCP连接丢失了)而没有发送ack, RabbitMQ会理解消息没有被完全处理,并将其重新排队。如果同时有其他消费者在线,它将迅速将其重新发送给另一个消费者。这样你就可以确保没有信息丢失,即使消费者偶尔死亡。
对消费者交付确认强制执行时间(默认为30分钟)。这有助于检测从不确认交付的错误(卡住)消费者。您可以根据交付确认超时增加此超时时间。
默认情况下,手动消息确认是打开的。一旦我们完成了一个任务,是时候将这个标志设置为false并从worker发送一个适当的确认。
示例代码:
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
消费者完整代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : ConsumerOne
* @description : [消费者1]
* @createTime : [2023/1/17 9:07]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 9:07]
* @updateRemark : [描述说明本次修改内容]
*/
public class ConsumerOne {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
/*
* @version V1.0
* Title: doWork
* @author Wangwei
* @description 模拟任务的执行时间
* @createTime 2023/1/18 9:21
* @param [task]
* @return void
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
//如果消息中存在.则1秒之后继续,有几个.停止几秒
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
使用这段代码,您可以确保即使使用CTRL+C终止一个正在处理消息的消费者,也不会丢失任何东西。在worker终止后不久,所有未确认的消息都将被重新传递。
确认必须在接收传递的同一通道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。
验证消息确认
验证步骤:启动加入消息确认的两个消费者,并启动生产者。再中断消费者1进行观察控制台输出结果。
消费者代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : ConsumerOne
* @description : [消费者1]
* @createTime : [2023/1/17 9:07]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 9:07]
* @updateRemark : [描述说明本次修改内容]
*/
public class ConsumerOne {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
/*
* @version V1.0
* Title: doWork
* @author Wangwei
* @description 模拟任务的执行时间
* @createTime 2023/1/18 9:21
* @param [task]
* @return void
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
//如果消息中存在.则1秒之后继续,有几个.停止几秒
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
生产者:
消费者1
可以看到消费者1在收到第六条消息之后再处理过程中,消费者1被中断了。第六条消息并没有执行完。
消费者2
可以看到第六条消息被重新传递给了消费者2,第六条消息并没有丢失。
再次验证:将两个消费者的消息确认代码除去再进行验证。启动两个消费者,并启动生产者。再中断消费者1进行观察控制台输出结果。
消费者代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : ConsumerTwo
* @description : [消费者2]
* @createTime : [2023/1/17 9:10]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 9:10]
* @updateRemark : [描述说明本次修改内容]
*/
public class ConsumerTwo {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
/*
* @version V1.0
* Title: doWork
* @author Wangwei
* @description 模拟任务的执行时间
* @createTime 2023/1/18 9:21
* @param [task]
* @return void
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
//如果消息中存在.则1秒之后继续,有几个.停止几秒
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
生产者:
消费者1:
消费者1接收到第七条消息时消费者1被中断。
消费者2:
第七条消息并没有被重新发送给消费者2,第七条消息丢失了。
消息持久化
我们已经学会了如何确保即使消费者死亡,消息也不会丢失。但是如果RabbitMQ服务器停止,我们的消息仍然会丢失。
当RabbitMQ退出或崩溃时,它会忘记队列和消息,除非你告诉它不要这样做。为了确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。
首先,我们需要确保队列在RabbitMQ节点重启后仍然存在。为了做到这一点,我们需要声明它是持久的
boolean durable = true;
channel.queueDeclare("task_queue2", durable, false, false, null);
注意:
RabbitMQ不允许你用不同的参数重新定义一个现有的队列,并且会向任何试图这样做的程序返回一个错误。
这个queueDeclare更改需要同时应用于生产者和使用者代码。
此时我们可以确定task_queue队列不会丢失,即使RabbitMQ重新启动。现在我们需要将消息标记为持久消息——通过将MessageProperties(实现BasicProperties)设置为值PERSISTENT_TEXT_PLAIN。
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue2",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
公平调度
您可能已经注意到,调度仍然没有完全按照我们想要的方式工作。例如,在有两个工作人员的情况下,当所有奇数消息都很重而偶数消息都很轻时,一个工作人员将一直很忙,而另一个几乎不做任何工作。RabbitMQ对此一无所知,它仍然会均匀地分发消息。
这是因为RabbitMQ只是在消息进入队列时分派消息。它不会查看消费者的未确认消息的数量。它只是盲目地将每n个消息发送给第n个消费者。
为了克服这个问题,我们可以使用basicQos方法,设置prefetchCount = 1。这告诉RabbitMQ一次不要给一个消费者提供超过一条消息。或者,换句话说,在消费者处理并确认前一条消息之前,不要向它发送新消息。相反,它将把它分派给下一个不忙的工作人员。
int prefetchCount = 1;
channel.basicQos(prefetchCount);
消费者完整代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : ConsumerOne
* @description : [消费者1]
* @createTime : [2023/1/17 9:07]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 9:07]
* @updateRemark : [描述说明本次修改内容]
*/
public class ConsumerOne {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
int prefetchCount = 1;
channel.basicQos(prefetchCount);
//使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
/*
* @version V1.0
* Title: doWork
* @author Wangwei
* @description 模拟任务的执行时间
* @createTime 2023/1/18 9:21
* @param [task]
* @return void
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
//如果消息中存在.则1秒之后继续,有几个.停止几秒
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
验证公平调度
现在将消费者1中的Thread.sleep(1000)改为Thread.sleep(3000);不添加公平调度相关代码进行测试。
生产者完整代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : Producer
* @description : [生产者1]
* @createTime : [2023/1/17 8:48]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 8:48]
* @updateRemark : [描述说明本次修改内容]
*/
public class Producer {
private static final String TASK_QUEUE_NAME = "task_queue1";
public static void main(String[] args) throws IOException, TimeoutException {
//连接RabbitMQ
RabbitMQUtils.getConnection();
//获取信道
Channel channel = RabbitMQUtils.getChannel();
/*
* TASK_QUEUE_NAME 队列名称
* durable 队列是否持久化,true表示队列为持久化, 持久化的队列会存盘,在服务器重启的时候会保证不丢失相关信息
* exclusive 设置是否排他true表示队列为排他的, 如果一个队列被设置为排他队列,该队列仅对首次声明它的连接可见, 并在连接断开时自动删除,
(这里需要注意三点:1.排他队列是基于连接Connection可见的,
同一个连接的不同信道Channel是可以同时访问同一连接创建的排他队列;
"首次"是指如果一个连接己经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,
这个与普通队列不同;即使该队列是持久化的,一旦连接关闭或者客户端退出,
该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景)
*autoDelete 设置是否自动删除。为true 则设置队列为自动删除。自动删除的前提是, 至少有一个消费者连接到这个队列,
之后所有与这个队列连接的消费者都断开时,才会自动删除。
不能把这个参数错误地理解为: "当连接到此队列的所有客户端断开时,这个队列自动删除",
因为生产者客户端创建这个队列,或者没有消费者客户端与这个队列连接时,都不会自动删除这个队列
*arguments 可以设置队列其它参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等
*
* */
channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
//发送10条消息
for (int i = 0; i <10 ; i++) {
String message="廊坊师范..."+i;
/*
*交换机命名,不填写使用默认的交换机
* routingKey -路由键道具-消息的其他属性-路由头等正文
* 消息正文
* **/
//推送消息
channel.basicPublish("",TASK_QUEUE_NAME, null,message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
消费者1完整代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : ConsumerOne
* @description : [消费者1]
* @createTime : [2023/1/17 9:07]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 9:07]
* @updateRemark : [描述说明本次修改内容]
*/
public class ConsumerOne {
private static final String TASK_QUEUE_NAME = "task_queue1";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//channel.basicQos(1);
//使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
/*
* @version V1.0
* Title: doWork
* @author Wangwei
* @description 模拟任务的执行时间
* @createTime 2023/1/18 9:21
* @param [task]
* @return void
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
//如果消息中存在.则1秒之后继续,有几个.停止几秒
if (ch == '.') {
try {
Thread.sleep(3000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
消费者2完整代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : ConsumerTwo
* @description : [消费者2]
* @createTime : [2023/1/17 9:10]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 9:10]
* @updateRemark : [描述说明本次修改内容]
*/
public class ConsumerTwo {
private static final String TASK_QUEUE_NAME = "task_queue1";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//channel.basicQos(1);
//使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
/*
* @version V1.0
* Title: doWork
* @author Wangwei
* @description 模拟任务的执行时间
* @createTime 2023/1/18 9:21
* @param [task]
* @return void
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
//如果消息中存在.则1秒之后继续,有几个.停止几秒
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
执行结果:
消费者1:
消费者2:
结论:
有两个消费者的情况下,一个消费者1一直很忙(完成一条消息需要3秒),消费者2一致很轻松(完成一条消息需要1秒)。
RabbitMQ对此一无所知,它仍然会均匀地分发消息。导致消费者1和消费者2执行了同样的消息数量。
现在将消费者1中的Thread.sleep(1000)改为Thread.sleep(3000);添加公平调度相关代码进行测试。**
消费者1完整代码
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : ConsumerOne
* @description : [消费者1]
* @createTime : [2023/1/17 9:07]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 9:07]
* @updateRemark : [描述说明本次修改内容]
*/
public class ConsumerOne {
private static final String TASK_QUEUE_NAME = "task_queue1";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
//使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
/*
* @version V1.0
* Title: doWork
* @author Wangwei
* @description 模拟任务的执行时间
* @createTime 2023/1/18 9:21
* @param [task]
* @return void
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
//如果消息中存在.则1秒之后继续,有几个.停止几秒
if (ch == '.') {
try {
Thread.sleep(3000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
消费者2完整代码:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @author : [WangWei]
* @version : [v1.0]
* @className : ConsumerTwo
* @description : [消费者2]
* @createTime : [2023/1/17 9:10]
* @updateUser : [WangWei]
* @updateTime : [2023/1/17 9:10]
* @updateRemark : [描述说明本次修改内容]
*/
public class ConsumerTwo {
private static final String TASK_QUEUE_NAME = "task_queue1";
public static void main(String[] args) throws IOException, TimeoutException {
RabbitMQUtils.getConnection();
Channel channel = RabbitMQUtils.getChannel();
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);
//使用额外的DeliverCallback接口来缓冲服务器推送给我们的消息。
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
}
/*
* @version V1.0
* Title: doWork
* @author Wangwei
* @description 模拟任务的执行时间
* @createTime 2023/1/18 9:21
* @param [task]
* @return void
*/
private static void doWork(String task) {
for (char ch : task.toCharArray()) {
//如果消息中存在.则1秒之后继续,有几个.停止几秒
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}
执行结果:
消费者1:
消费者2:
结论:
置prefetchCount = 1。这告诉RabbitMQ一次不要给一个消费者提供超过一条消息。或者,换句话说,在消费者处理并确认前一条消息之前,不要向它发送新消息。相反,它将把它分派给下一个不忙的消费者。
总结
学习一门知识需要亲自动手去验证去证明这种方式是可行了,这样对于这个知识点才算是理解的更深。按照这样做一定行,而不是应该可以,大概可以吧。