RabbitMQ------延迟队列(七)
延迟队列
延迟队列,内部是有序的,特点:延时属性。
简单讲:延时队列是用来存放需要在指定时间被处理的元素队列。
是基于死信队列的消息过期场景。
适用场景
1.订单在十分钟之内未支付则自动取消。
2.用户注册后,三天内没有登陆,则短信提醒。
特点:需要在某个事件发生之后或者之前的特定事件点完成莫一项任务。
整合SpringBoot
导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- RabbitMQ依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.34</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger-ui</artifactId>
<version>2.6.1</version>
</dependency>
<!-- RabbitMQ测试依赖-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<version>2.4.7</version>
<scope>test</scope>
</dependency>
</dependencies>
在配置文件application.properties中写明rabbitmq的IP、端口、用户名以及密码
spring.rabbitmq.host=192.168.200.139
spring.rabbitmq.prot=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123
架构图如图所示
队列1设置的过期时间为10s,队列2设置的过期时间为40s。
代码分三部分:生产者、消费者、以及交换机队列整体作为一部分。
生产者和消费者都不在进行交换机以及队列的声明。
交换机以及队列配置类的书写:
/**
* TTL队列 配置文件类代码
*/
@Configuration
public class TTLQueueConfig {
//普通交换机
public static final String X_EXCHANGE = "X";
//死信交换机
public static final String Y_EXCHANGE = "Y";
//普通队列1 过期时间10s
public static final String QA_QUEUE = "QA";
//普通队列2 过期时间40s
public static final String QB_QUEUE = "QB";
//死信队列
public static final String QD_QUEUE = "QD";
//声明X交换机
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
//声明X交换机
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_EXCHANGE);
}
//声明普通队列1
@Bean("queueA")
public Queue queueA(){
Map<String,Object> arguments = new HashMap<>();
//设置死信队列
arguments.put("x-dead-letter-exchange",QD_QUEUE);
//设置routingkey
arguments.put("x-dead-letter-routing-key","YD");
//设置ttl
arguments.put("x-message-ttl",10000);
return QueueBuilder.durable(QA_QUEUE).withArguments(arguments).build();
}
//声明普通队列2
@Bean("queueB")
public Queue queueB(){
Map<String,Object> arguments = new HashMap<>();
//设置死信队列
arguments.put("x-dead-letter-exchange",QD_QUEUE);
//设置routingkey
arguments.put("x-dead-letter-routing-key","YD");
//设置ttl
arguments.put("x-message-ttl",40000);
return QueueBuilder.durable(QB_QUEUE).withArguments(arguments).build();
}
//声明死信队列
@Bean("queueD")
public Queue queueD(){
return QueueBuilder.durable(QD_QUEUE).build();
}
//绑定
//通过容器名字进行捆绑,绑定普通队列A和交换价X
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//通过容器名字进行捆绑,绑定普通队列B和交换价X
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XA");
}
//通过容器名字进行捆绑,绑定死信队列D和交换价Y
@Bean
public Binding queueDBindingY(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
生产者代码示例:
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
/**
* 发送延迟
* 生产者
*/
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMsgController {
@Autowired
private RabbitTemplate rabbitTemplate;
//开始发消息
@GetMapping("/send/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条消息给两个ttl队列:{}",new Date().toString(),message);
/**
* 交换机
* routingkey
* message
*/
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10秒的队列:"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40秒的队列:"+message);
}
}
消费者代码示例:
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Date;
/**
* 队列TTL 消费者
*/
@Slf4j
@Component
public class DeadLetterConsumer {
//接收消息
@RabbitListener(queues = "QD")
public void receiveD(Message message, Channel channel) throws Exception {
String string = new String(message.getBody(),"UTF-8");
log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),string);
}
}
注意导包,不要导错了。
结果:第一条消息在10s后变成死信消息,然后被消费者消费掉,第二条消息在40s后变成死信消息,然后被消费者消费掉,这样就达成了延迟队列的目的。
局限性:每增加一个延时需求,都需要新增一个普通队列。这样是不合理的。
优化:只有一个延时队列,由生产者指定需要延时多久。
延时队列优化,由生产者指定延时时间
增加一个队列QC,QC不设置过期时间,过期时间由生产者指定。
配置类代码新增QC,不设置存活时间,由生产者发送
//设置普通队列
public static final String QC_QUEUE = "QC";
//设置普通队列
@Bean("queueC")
public Queue queueC(){
HashMap<String,Object> arguments = new HashMap<>();
//设置死信队列
arguments.put("x-dead-letter-exchange",QD_QUEUE);
//设置routingkey
arguments.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QC_QUEUE).withArguments(arguments).build();
}
//绑定
//通过容器名字进行捆绑,绑定普通队列A和交换价X
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
生产者新增代码
//开始发消息
@GetMapping("/sendExpirationMessage/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
log.info("当前时间:{},发送一条消息给QC,ttl队列:{},过期时间为:{}ms",new Date().toString(),message,ttlTime);
/**
* 交换机
* routingkey
* message
* MessagePostProcessor,可以设置存活时间
*/
//ttlTime设置置过期时间
rabbitTemplate.convertAndSend("X","XC","消息来自ttl的队列:"+message,msg->{
//发送消息时,设置存活时间
msg.getMessageProperties().setExpiration(ttlTime);
return msg;
});
}
使用这种方式,消息并不会按时死亡。因为RabbitMQ只会检测第一个消息是否过期,如果过期,会被放入死信队列。
经过测试发现,第一个发送20s过期的消息,第二个发送2s过期的消息,结果依然是20s后,20s消息被消费,之后,2s消息才会被消费。 说明延时队列是按顺序执行。如果第一个消息延时很久,后续消息也会延时,并不会优先执行。
此现象只能通过,基于插件的RabbitMQ进行弥补,自身无法弥补这个缺陷。
RabbitMQ插件实现延时队列
安装插件
在官网上下载
https://www.rabbitmq.com/community-plugins.html
下载rabbitmq_delayed_message_exchange插件。解压放在RabbitMQ的插件目录。
进入RabbitMQ的安装目录下的plgins目录,执行以下命令让该插件生效,然后重启RabbitMQ。
-- 3.8.8代表rabbitmq版本
-- 目录如下
cd /usr/lib/rabbitmq/rabbitmq_server_3.8.8/plugins
-- 安装命令,不用写插件版本号
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
-- 重启rabbitmq
systemctl restart rabbitmq-server(安装时的服务名)
重启好后打开rabbitmq的管理端页面,可以在Exchanges目录下,Add a new exchange,Type 中,会增加一个x-delayed-message的选项。
使用插件,结构更加简单
代表由交换机进行延迟,而不是队列了。
配置类书写
当Bean中不指定名称时,名称默认方法名
自定义交换机时,需要指定交换机类型,而之前未自定交换机,直接创建的DirectExchange交换机
/**
* 延迟交换机
*/
@Configuration
public class DelayedQueueConfig {
//延迟
public static final String DELAYED_EXCHANGE = "delayed.exchange";
//延迟队列
public static final String DELAYED_QUEUE = "delayed.queue";
//routingkey
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
//声明 自定义交换机,基于插件
@Bean
public CustomExchange delayedExchange(){
//String name, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
/**
* 交换机名字
* 类型
* 是否持久化
* 是否自动删除
* 自定义参数
*/
HashMap<String,Object> arguments = new HashMap();
arguments.put("x-delayed-type","direct");
return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",
false,false,arguments);
}
//声明延迟队列
@Bean
public Queue queueDe(){
return new Queue(DELAYED_QUEUE);
}
//绑定 当Bean中不指定名称时,名称默认方法名
@Bean
public Binding queueBindingExchange(
@Qualifier("queueDe") Queue queue,
@Qualifier("delayedExchange") CustomExchange customExchange){
return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
生产者以及消费者代码与之前相同。
结论:可以实现根据过期时间,消费消息。