文章目录
- RabbitMQ第二个实操小案例——WorkQueue
RabbitMQ第二个实操小案例——WorkQueue
讲第二个案例之前,我们先看下前面第一个案例的模型:
可以看到,我们只有一个发布者和一个消费者,通过Queue队列,实现最简单的消息的发送和接收。但是,如果现在我们的发布者他每秒发布100条数据,而我们的消费者每秒只能处理90条数据,那么每秒我们的队列将有10条数据被卡在队列中,而队列的容量是有限的,随着累计的时间长了,我们的消息也会因为来不及处理,导致后面发送的消息没来得及被接收就被销毁掉了。
这个时候,就要引出我们的第二个模型了——工作队列了,往下看:
我们先看一下工作队列的模型,如下:
WorkQueue的模型跟前面第一个案例Hello,World!的模型,最明显的区别其实就是,第一个案例他只有一个消费者。我们知道RabbitMQ他的消息是阅完即焚,即消费者一旦接收,这个消息直接就从Queue中被弹出了。
而现在这个案例,他有两个消费者(画两个只是方便,他当然也可以有3个、4个),他的消息应该是通过某种算法做负载均衡送到不同的消费者,让消费者进行处理,让消息不至于处理不过来,从而导致滞留在Queue中的消息被弹出。
废话少说,代码走起!
思路如下:
1、我们先让Publish服务每秒发布50条消息到 simple.queue,来演示消息的频繁发送。
2、在Consumer服务中定义两个消费者,来监听我们的 simple.queue队列。
3、消费者1每秒处理40条消息,消费者2每秒处理30条消息。
首先,我们在Publish服务下编写测试方法:
@Test
public void testWorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "Hello, I'm ";
for (int i = 1; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName, message+i);
Thread.sleep(20);
}
}
然后,我们编写我们的两个消费者。(为了不影响我们的实验,记得把上一个实验写的方法注释掉)
// @RabbitListener(queues = "simple.queue")
// public void listenQueueMessage(String msg) throws InterruptedException{
// System.out.println("监听到的消息为:【"+ msg +"】");
// }
@RabbitListener(queues = "simple.queue")
public void listenWorkQueueMessage1(String msg) throws InterruptedException{
//打印白色的字体
System.out.println("消费者1监听到的消息为:【"+ msg +"】");
Thread.sleep(25);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueueMessage2(String msg) throws InterruptedException{
//打印红色的字体
System.err.println("消费者2监听到的消息为:【"+ msg +"】");
Thread.sleep(33);
}
然后我们启动我们的消费者的服务。
然后我们跑一下我们的Publish的测试方法,开始见证奇迹:
看到这里,有些人可能会好奇,怎么消费者1执行的都是奇数的消息,消费者2执行的都是偶数的消息,其实这个不是偶然哈,这个是必然的,现在我们把消费者1的睡眠时间改成5,消费者2的睡眠时间改为50,再看一眼结果。(这就很明显了,红色的是2、4、6、8、10),白色的是1、3、5、7、9。
造成这种情况的原因是 RabbitMQ 内部的 消息预取 机制导致的,那么什么叫消息预取呢?其实也很简单,就是说 RabbitMQ 他的管道,他不管你处理快不快,我拿到了我就先预定好,这个给消费者1,这个给消费者2,这个给消费者1,这个给消费者2,以此类推,一直这么分下去。最后两个消费者就会拿到一样的消息,但是!这明显不是我们要的,有些服务器他快,他处理起来快,我们应该多给他几条,有些服务器他慢,我们应该让他少处理几条。怎么玩呢?其实也很简单,配置一下RabbitMQ预取的数量(每次获取的消息数量,处理完后这一批后再获取下一批)就可以了:
在消费者服务的配置文件中,配置我们的RabbitMQ的预取数如下:spring.rabbitmq.listener.simple.prefetch
spring:
rabbitmq:
host: 192.168.83.130
port: 5672
virtual-host: /
username: admin
password: root
listener:
simple:
#预取数:每次接收的消息数,处理完才会接收下一批消息,这里我们设置为一条
prefetch: 1
配置好后,我们重启一下我们的消费者服务,然后看一下结果: