工作队列
逻辑图
<!-- SpringBoot 消息队列的起步依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
轮询分发 Round-robin
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class WorkQueueProducer {
/**
* 生产者 → 消息队列
* 创建连接工厂,并设置参数
* 创建连接 Connection
* 创建通道 Channel
* 创建队列
* 发送消息
**/
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
if(true){
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
}
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建队列
/**
* String queue 队列名称
* boolean durable 是否持久化,
* boolean exclusive 含义一:是否独占,是否只能有一个消费者监听
* 含义二:connection 关闭是否删除队列
* boolean autoDelete 是否自动删除,当没有消费者的时候是否自动删除
* Map<String, Object> arguments 参数
*/
channel.queueDeclare("WorkQueues",true,false,false,null);
//发送消息
/**
* String exchange : 交换机名称,简单模式不使用交换机
* String routingKey : 路由规则,当不使用交换机时,路由键需要和队列名称相同
* BasicProperties props : 配置参数
* byte[] body : 消息体,真实的数据
*/
for (int i = 0; i < 20; i++) {
String str = "WorkQueues is so easy!\t" + i + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());;
System.out.println("发送消息:\t" + str);
channel.basicPublish("","WorkQueues",null,str.getBytes());
}
//释放资源
channel.close();
connection.close();
System.out.println("消息发送成功");
}
}
- 与简单队列几乎没有什么不同
消费者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class WorkQueueConsumerA {
/**
* 消息队列 ← 消费者
* 创建连接工厂,并设置参数
* 创建连接 Connection
* 创建通道 Channel
* 订阅队列
* 接收消息
*/
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,并设置参数
ConnectionFactory factory = new ConnectionFactory();
if (true) {
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
}
//创建连接 Connection
Connection connection = factory.newConnection();
//创建通道 Channel
Channel channel = connection.createChannel();
/**
* consumerTag 消费信息标签
* delivery 回执
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
byte[] body = delivery.getBody();
System.out.println("消费消息:\t" + new String(body));
};
/**
* basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
* String queue : 队列名称
* boolean autoAck : 是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列
* DeliverCallback deliverCallback : 回调函数
* CancelCallback cancelCallback : 消费者取消订阅时的回调函数
*/
channel.basicConsume("WorkQueues", true, deliverCallback, consumerTag -> {
});
}
}
- 再创建一个类 WorkQueueConsumerB,代码与 WorkQueueConsumerA 一样,只是类型不同
测试
-
先启动生产者,查看 RabbitMQ 网页控制台
-
先将2个消费者启动
- 第一个消费者启动的时候,会将所有的都消费掉
- 将两个都启动之后,再启动生产者
-
再启动生产者
公平分发 Fair
如果机器 A 性能很好,一下子就处理完了,其他时间一直空闲,而机器 B 性能很差,很久都不能处理完一条,但是队列还是一人一条的轮询分发,这就造成 A 性能浪费,B 处理慢
我们采用公平分发
采用 basicQos(prefetchCount=1) ,来限制 MQ 只发不超过1条的消息给同一个消费者,当消费者处理完消息,给 MQ 反馈了,MQ 才会进行第二次发送
生产者
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeoutException;
public class WorkQueueProducer {
/**
* 生产者 → 消息队列
* 创建连接工厂,并设置参数
* 创建连接 Connection
* 创建通道 Channel
* 创建队列
* 发送消息
**/
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
if(true){
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
}
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//创建队列
/**
* String queue 队列名称
* boolean durable 是否持久化,
* boolean exclusive 含义一:是否独占,是否只能有一个消费者监听
* 含义二:connection 关闭是否删除队列
* boolean autoDelete 是否自动删除,当没有消费者的时候是否自动删除
* Map<String, Object> arguments 参数
*/
channel.queueDeclare("WorkQueues",true,false,false,null);
//发送消息
/**
* String exchange : 交换机名称,简单模式不使用交换机
* String routingKey : 路由规则,当不使用交换机时,路由键需要和队列名称相同
* BasicProperties props : 配置参数
* byte[] body : 消息体,真实的数据
*/
for (int i = 0; i < 20; i++) {
String str = "WorkQueues is so easy!\t" + i + "\t" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());;
System.out.println("发送消息:\t" + str);
channel.basicPublish("","WorkQueues",null,str.getBytes());
}
//释放资源
channel.close();
connection.close();
System.out.println("消息发送成功");
}
}
消费者
消费者A
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class WorkQueueConsumerA {
/**
* 消息队列 ← 消费者
* 创建连接工厂,并设置参数
* 创建连接 Connection
* 创建通道 Channel
* 订阅队列
* 接收消息
*/
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,并设置参数
ConnectionFactory factory = new ConnectionFactory();
if (true) {
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
}
//创建连接 Connection
Connection connection = factory.newConnection();
//创建通道 Channel
Channel channel = connection.createChannel();
/**
* prefetchCount 设为 1
* MQ 发送小于等于 1 的数据给消费者
* 当消费者消费完这几条数据,就会给 MQ 一个反馈,MQ 再次发送
*/
channel.basicQos(1);
/**
* consumerTag 消费信息标签
* delivery 回执
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
byte[] body = delivery.getBody();
System.out.println("消费消息:\t" + new String(body));
try {
/**
* 睡眠 1 秒,模拟等待
*/
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* 手动回执
* long deliveryTag
* boolean multiple
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
/**
* basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
* String queue : 队列名称
* boolean autoAck : 是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列
* DeliverCallback deliverCallback : 回调函数
* CancelCallback cancelCallback : 消费者取消订阅时的回调函数
*/
channel.basicConsume("WorkQueues", false, deliverCallback, consumerTag -> {
});
}
}
消费者B
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class WorkQueueConsumerB {
/**
* 消息队列 ← 消费者
* 创建连接工厂,并设置参数
* 创建连接 Connection
* 创建通道 Channel
* 订阅队列
* 接收消息
*/
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂,并设置参数
ConnectionFactory factory = new ConnectionFactory();
if (true) {
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
}
//创建连接 Connection
Connection connection = factory.newConnection();
//创建通道 Channel
Channel channel = connection.createChannel();
/**
* prefetchCount 设为 1
* MQ 发送小于等于 1 的数据给消费者
* 当消费者消费完这几条数据,就会给 MQ 一个反馈,MQ 再次发送
*/
channel.basicQos(1);
/**
* consumerTag 消费信息标签
* delivery 回执
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
byte[] body = delivery.getBody();
System.out.println("消费消息:\t" + new String(body));
try {
/**
* 睡眠 1 秒,模拟等待
*/
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
/**
* 手动回执
* long deliveryTag
* boolean multiple
*/
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
/**
* basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback)
* String queue : 队列名称
* boolean autoAck : 是否自动确认,如果true,消费者接收到消息会自动发送一个回执给消息队列
* DeliverCallback deliverCallback : 回调函数
* CancelCallback cancelCallback : 消费者取消订阅时的回调函数
*/
channel.basicConsume("WorkQueues", false, deliverCallback, consumerTag -> {
});
}
}
-
与轮询的区别
-
channel.basicQos(1);
-
try { /** * 睡眠 1 秒,模拟等待 */ TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); }
-
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
-
自动确认的 autoAck 改为 false channel.basicConsume("WorkQueues", false, deliverCallback, consumerTag -> {});
-
测试
SpringBoot整合
小结
2 个消费者监听同一个队列,消息被平均分配到 2 个消费者,提高了处理效率,3个4个消费者效率更高
轮询分发:假设有100条消息,A消费者消费50条,B消费者消费50条,但是 A 机器是8核32G的,B机器是1核1G的,显然 B机器消费慢,A机器一直空闲
公平分发:性能好的机器多消费一点,性能差的少消费一点,负载均衡