这篇文章,主要介绍消息队列RabbitMQ七种模式之WorkQueues工作队列模式。
目录
一、工作队列模式
1.1、什么是Work Queues模式
1.2、工作队列模式的使用
(1)引入依赖
(2)编写生产者
(3)编写两个消费者
一、工作队列模式
1.1、什么是Work Queues模式
在某些情况下,当RabbitMQ中的消息积压的非常多的时候,一个消费者没办法消费,那么这个时候就可以增加消费者数量,以此来加快消息的消费。我们把所有消费者都看作是在同一个队列里面的,那么这个队列就可以理解称为工作队列,因为都是进行消费消息的。
Work Queues模式下,每一个消费者能够分配到的消息数量都是大致相同的,RabbitMQ采用轮询的方式,依次给每一个消费者分发消息。
工作队列模式提高了消息的并发消费,并且每一个消息只会被分发到一个消费者上面。
1.2、工作队列模式的使用
(1)引入依赖
<!-- 引入 RabbitMQ 依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
(2)编写生产者
package com.rabbitmq.demo.workqueues;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @version 1.0.0
* @Date: 2023/2/25 16:23
* @Copyright (C) ZhuYouBin
* @Description: 消息生产者
*/
public class Producer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接的 RabbitMQ 服务地址
factory.setHost("127.0.0.1"); // 默认就是本机
factory.setPort(5672); // 默认就是 5672 端口
// 3、获取连接
Connection connection = null; // 连接
Channel channel = null; // 通道
try {
connection = factory.newConnection();
// 4、获取通道
channel = connection.createChannel();
// 5、指定需要操作的消息队列,如果队列不存在,则会创建
channel.queueDeclare("queue_demo_2023", false, false, false, null);
// 6、发送消息
for (int i = 0; i < 50; i++) {
String message = "这是Work Queues模式,发送的第【" + (i+1) + "】条消息数据";
channel.basicPublish("", "queue_demo_2023", null, message.getBytes());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != channel) {
try {
channel.close();
} catch (Exception e) {}
}
if (null != connection) {
try {
connection.close();
} catch (Exception e) {}
}
}
}
}
(3)编写两个消费者
为了实现Work Queues工作队列模式,需要编写多个消费者,为了简单,这里就编写两个消费者(代码一样)。
package com.rabbitmq.demo.workqueues;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @version 1.0.0
* @Date: 2023/2/25 16:30
* @Copyright (C) ZhuYouBin
* @Description: 消息消费者
*/
public class Consumer {
public static void main(String[] args) {
// 1、创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2、设置连接的 RabbitMQ 服务地址
factory.setHost("127.0.0.1"); // 默认就是本机
factory.setPort(5672); // 默认就是 5672 端口
// 3、获取连接
Connection connection = null; // 连接
Channel channel = null; // 通道
try {
connection = factory.newConnection();
// 4、获取通道
channel = connection.createChannel();
// 5、指定需要操作的消息队列,如果队列不存在,则会创建
channel.queueDeclare("queue_demo_2023", false, false, false, null);
// 6、消费消息
DeliverCallback callback = new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
// 接收消息
try {
// 模拟消费者消费消息时候很慢的情况
Thread.sleep(2000);
} catch (Exception e) {}
System.out.println("这是接收的消息:" + new String(delivery.getBody()));
}
};
channel.basicConsume("queue_demo_2023", true, callback, i->{});
} catch (Exception e) {
e.printStackTrace();
}
}
}
启动消费者,查看控制台输入日志,此时可以发现,生产者总共发送了50条消息,两个消费者都是消费了25条消息,因为RabbitMQ是采用轮询的方式分发消息的(注意:先启动消费者,在启动生产者,不然看不到效果)。
到此,RabbitMQ中的工作队列模式就介绍完啦。
综上,这篇文章结束了,主要介绍消息队列RabbitMQ七种模式之WorkQueues工作队列模式。