延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费。
例如下面的业务场景:在支付宝上面买电影票,锁定了一个座位后系统默认会帮你保留15分钟时间,如果15分钟后还没付款那么不好意思系统会自动把座位释放掉。怎么实现类似的功能呢?
- 可以用定时任务每分钟扫一次,发现有占座超过15分钟还没付款的就释放掉。但是这样做很低效,很多时候做的都是些无用功;
- 可以用分布式锁、分布式缓存的被动过期时间,15分钟过期后锁也释放了,缓存key也不存在了;
- 还可以用延迟队列,锁座成功后会发送1条延迟消息,这条消息15分钟后才会被消费,消费的过程就是检查这个座位是否已经是“已付款”状态;
你在公司的协同办公系统上面预约了一个会议,邀请汪产品和陈序员今晚22点准时参加会有。系统还比较智能,除了默认发会议邀请的邮件告知参会者以外,到了今晚21:45分的时候(提前15分钟)就会通知提醒参会人员做好参会准备,会议马上开始...
同样的,这也可以通过轮询“会议预定表”来实现,比如我每分钟跑一次定时任务看看当前有哪些会议即将开始了。当然也可以通过延迟消息来实现,预定会议以后系统投递一条延迟消息,而这条消息比较特殊不会立马被消费,而是延迟到指定时间后再触发消费动作(发通知提醒参会人准备)。不过遗憾的是,在AMQP协议和RabbitMQ中都没有相关的规定和实现。不过,我们似乎可以借助上一小节介绍的“死信队列”来变相的实现。
可以使用rabbitmq_delayed_message_exchange插件实现。
这里和TTL方式有个很大的不同就是TTL存放消息在死信队列(delayqueue)里,二基于插件存放消息在延时交换机里(x-delayed-message exchange)。
- 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
- 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列(queue)并把消息给它
- 队列(queue)再把消息发送给监听它的消费者(customer)
下载插件
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
安装插件:
将插件拷贝到rabbitmq-server的安装路径:/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.4/plugins
启用插件
rabbitmq-plugins list
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启rabbitmq-server
systemctl restart rabbitmq-server
SpringBoot代码案例
(1)xml配置文件与properties连接配置
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
spring.application.name=delayed_exchange
spring.rabbitmq.host=192.168.80.121
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=root
spring.rabbitmq.password=123456
spring.rabbitmq.port=5672
# 设置手动确认消息
#spring.rabbitmq.listener.simple.acknowledge-mode=manual
(2)SpringBootApplication主入口类
package com.lagou.rabbit.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Demo02RabbitmqDelayedApplication {
public static void main(String[] args) {
SpringApplication.run(Demo02RabbitmqDelayedApplication.class, args);
}
}
(3)RabbitMQ的对象配置
package com.lagou.rabbit.demo.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableRabbit
@ComponentScan("com.lagou.rabbit.demo")
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("queue.delayed", true, false, false, null);
}
@Bean
public Exchange exchange() {
Map<String, Object> arguments = new HashMap<>();
// 使用x-delayed-type指定交换器的类型
arguments.put("x-delayed-type", ExchangeTypes.DIRECT);
// 使用x-delayed-message表示使用delayed exchange插件处理消息
return new CustomExchange("ex.delayed", "x-delayed-message", true, false, arguments);
}
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with("key.delayed").noargs();
}
@Bean
@Autowired
public RabbitAdmin rabbitAdmin(ConnectionFactory factory) {
return new RabbitAdmin(factory);
}
@Bean
@Autowired
public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
return new RabbitTemplate(factory);
}
@Bean
@Autowired
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
(4)使用推消息模式接收延迟队列的广播
package com.lagou.rabbit.demo.listener;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
import java.io.IOException;
@Component
public class MyMeetingListener {
@RabbitListener(queues = "queue.delayed")
public void onMessage(Message message, Channel channel) throws IOException {
System.out.println(new String(message.getBody(), message.getMessageProperties().getContentEncoding()));
// 消息确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
(5)开发RestController,用于向延迟队列发送消息,并指定延迟的时长
package com.lagou.rabbit.demo.controller;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.nio.charset.StandardCharsets;
@RestController
public class DelayedController {
@Autowired
private AmqpTemplate amqpTemplate;
@RequestMapping("/meeting/{second}")
public String bookMeeting(@PathVariable Integer second) {
// RabbitMQ只会检查队列头部的消息是否过期,如果过期就放到死信队列
// 假如第一个过期时间很长,10s,第二个消息3s,则系统先看第一个消息,等到第一个消息过期,放到DLX
// 此时才会检查第二个消息,但实际上此时第二个消息早已经过期了,但是并没有先于第一个消息放到DLX。
// 插件rabbitmq_delayed_message_exchange帮我们搞定这个。
MessageProperties messageProperties = MessagePropertiesBuilder.newInstance()
// 设置消息的过期时间
.setHeader("x-delay", (second - 10) * 1000)
.setContentEncoding("utf-8")
.build();
Message message = MessageBuilder.withBody("还有10s开始开会了".getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
// 如果不设置message的properties,也可以使用下述方法设置x-delay属性的值
// rabbitTemplate.convertAndSend("ex.delayed", "key.delayed", message, msg -> {
// 使用定制的属性x-delay设置过期时间,也就是提前5s提醒
// 当消息转换完,设置消息头字段
// msg.getMessageProperties().setHeader("x-delay", (seconds - 5) * 1000);
// return msg;
// });
amqpTemplate.send("ex.delayed","key.delayed",message);
return "会议订好了";
}
}
(6)访问,然后查看输出