RabbitMQ高级特性_消费端限流 , [解耦, 限流,降低压力,发送消息]
通过消费端限流的 方式限制消息的拉取速度,达到保护消费端的目的。
下面我们新建springboot项目进行测试:
新建项目myproducer
依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
对application.yml进行配置:
# 配置RabbitMQ spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / # 开启确认模式 publisher-confirm-type: correlated # 开启退回模式 publisher-returns: true |
先手动创建一个交换机(代码创建在下面)
我们先做个不做任何限流的操作进行查看
package com.pb.demo; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;、 @SpringBootTest class MyproducerApplicationTests { @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSearchBatch() { for(int i = 1; i <=10; i++) { rabbitTemplate.convertAndSend("springboot_exchange", "my_routing", "send message....." + i); } } } |
然后创建交换机 springboot_exchange
创建队列my_queue 并绑定路由my_routing
绑定
然后启动测试看看数据能不能发送到队列中去
然后创建消费端项目myconsumer
对application.yml配置文件进行配置
# 配置RabbitMQ spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / listener: simple: # 限流机制必须开启手动签收 acknowledge-mode: manual |
创建包myconsumer并新建类OosConsumer我们来消费消息
package com.pb.demo.myconsumer;
import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component public class OosConsumer { @RabbitListener(queues = "my_queue") public void linsenerConsumer(Message message, Channel channel) throws IOException, InterruptedException { System.out.println("收到了消息:" + new String(message.getBody())); //睡眠下 Thread.sleep(3000L); //签收 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } } |
启动主启动类进行测试:
我们可以看rabbitmq的控制台
启动Consumer的启动类 在启动producer的测试类 可以看到 上面的情况
我么发现消息已经被消费掉了
我们会发现他把所有的消息都堆到unacked中,那么就说明所有的消息都会堆到消费者中,因为我们这里并没有开启限流操作,如果我们有10万条消息那么就会造成消费者的内存溢出或者内存泄漏的问题
下面我们来开启限流注意如果我们开启限流必须是手动签收
修改application.yml在消费者的项目中:
# 配置RabbitMQ spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / listener: simple: # 限流机制必须开启手动签收 acknowledge-mode: manual # 消费端最多拉取5条消息进行消费,签收后消费端不满5条才会继续拉取消息 prefetch: 5 |
RabbitMQ高级特性_利用限流实现不公平分发
在RabbitMQ中,多个消费者监听同一条队列,则队列默认采用的轮询分发。但是在某种场景下这种策略并不是很好,例如消费者1处 理任务的速度非常快,而其他消费者处理速度却很慢。此时如果采 用公平分发,则消费者1有很大一部分时间处于空闲状态。此时可以 采用不公平分发,即谁处理的快,谁处理的消息多。
首先我们模拟公平分发的操作,或者说看看平均分发的弊端
项目myproducer中的生产者的代码不需要动
我们只需要改变项目myconsumer的代码,在myconsumer包中创建类UnfairConsumer,在内部创建两个消费者
记得把OosConsumer类中的消费者注释掉
package com.pb.demo.myconsumer; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class UnfairConsumer { // 消费者1 @RabbitListener(queues = "my_queue") public void listenMessage1(Message message, Channel channel) throws IOException, InterruptedException { // 1.获取消息 System.out.println("消费者1:"+new String(message.getBody())); // 2.模拟业务处理 Thread.sleep(500); // 消费者1处理快 // 3.签收消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); }
// 消费者2 @RabbitListener(queues = "my_queue") public void listenMessage2(Message message, Channel channel) throws IOException, InterruptedException { // 1.获取消息 System.out.println("消费者2:"+new String(message.getBody())); // 2.模拟业务处理 Thread.sleep(3000); // 消费者2处理快 // 3.签收消息 channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); } } |
我们要消费端的限流prefetch: 5 先注释掉
# 配置RabbitMQ spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / listener: simple: # 限流机制必须开启手动签收 acknowledge-mode: manual # 消费端最多拉取5条消息进行消费,签收后消费端不满5条才会继续拉取消息 #prefetch: 5 |
好启动所有项目进行对消费者控制台的查看
我们来实现不公平分发,谁处理的快就让他多处理
在消费端项目中修改application.yml配置文件:
# 配置RabbitMQ spring: rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / listener: simple: # 限流机制必须开启手动签收 acknowledge-mode: manual # 消费端最多拉取5条消息进行消费,签收后消费端不满5条才会继续拉取消息 #prefetch: 5 # 消费端最多拉取1条消息进行消费,这样谁处理的快谁拉取下一条消息,实现了不公平分发 prefetch: 1 |
从新启动项目进行测试:
来查看消费者的控制台:
RabbitMQ高级特性_消息存活时间
RabbitMQ可以设置消息的存活时间(Time To Live,简称TTL), 当消息到达存活时间后还没有被消费,会被移出队列。RabbitMQ 可以对队列的所有消息设置存活时间【需要在创建队列的时候操作】,也可以对某条消息设置存活时间【需要在发送的的时候进行设置】。
设置队列所有消息存活时间
在这里我们需要创建新的交换机和队列:
在项目myproducer中创建配置类在包com.pb.demo下新建配置类RabbitConfig
package com.pb.demo;
import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitConfig { private final String EXCHANGE_NAME="my_topic_exchange2"; private final String QUEUE_NAME="my_queue2";
// 1.创建交换机 @Bean("bootExchange2") public Exchange getExchange(){ return ExchangeBuilder .topicExchange(EXCHANGE_NAME) // 交换机类型 .durable(true) // 是否持久化 .build(); } @Bean("bootExchange2") public Exchange getExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); }
// 2.创建队列 @Bean("bootQueue2") public Queue getMessageQueue(){ return QueueBuilder .durable(QUEUE_NAME) // 队列持久化 .ttl(10000) //对了的存活时间 .build(); }
// 3.将队列绑定到交换机 @Bean public Binding bindMessageQueue(@Qualifier("bootExchange2") Exchange exchange, @Qualifier("bootQueue2") Queue queue){ return BindingBuilder.bind(queue).to(exchange).with("my_routing").noargs(); } } |
然后在myproducer项目的测试类发送10条信息
@Test public void testSearchBatch2() { for(int i = 1; i <=10; i++) { rabbitTemplate.convertAndSend("my_topic_exchange2", "my_routing", "send message....." + i); } } |
没有消费 的情况下:
设置单条消息存活时间【主要是在发送消息的时候设置单条消息的存活时间即可】
在myproducer项目的测试类进行操作:
@Test public void testSendMessage() { // 1.创建消息属性 MessageProperties messageProperties = new MessageProperties(); // 2.设置存活时间 messageProperties.setExpiration("10000"); // 3.创建消息对象 Message message = new Message("send message...".getBytes(), messageProperties); // 4.发送消息 rabbitTemplate.convertAndSend("springboot_exchange", "my_routing", message); } |
然后测试该方法看看能不能给发送的单个消息设置时间
10秒再次查看
我们发现消息已经被清除掉了
注意:
1 如果设置了单条消息的存活时间,也设置了队列的存活时 间,以时间短的为准。 2 消息过期后,并不会马上移除消息,只有消息消费到队列顶端时,才会移除该消息。 |
下面我们来测试下
@Test public void testSendMessage2() { for(int i = 0; i < 10; i++) { //当i==5的时候我们来设置单条信息的时间 if(i == 5) { // 1.创建消息属性 MessageProperties messageProperties = new MessageProperties(); // 2.设置存活时间 messageProperties.setExpiration("10000"); // 3.创建消息对象 Message message = new Message("send message...".getBytes(), messageProperties); // 4.发送消息 rabbitTemplate.convertAndSend("springboot_exchange", "my_routing", message); } else { //否则发送普通消息 rabbitTemplate.convertAndSend("springboot_exchange" , "my_routing", "send message" + i); } } } |
发现并没有被立即删除,但是我们一定要注意第5条消息现在已经不能被消费了,因为mq默认他已经被删除了如果在10秒以后,因为他已经过期了
我们测试下,启动任何一个消费端进行查看: