延迟任务
- 定时任务:有固定周期的,有明确的触发时间
- 延迟队列:没有固定的开始时间,它常常是由一个事件触发的,而在这个事件触发之后的一段时间内触发另一个事件,任务可以立即执行,也可以延迟
应用场景:
场景一:订单下单之后30分钟后,如果用户没有付钱,则系统自动取消订单;如果期间下单成功,任务取消。
场景二:接口对接出现网络问题,1分钟后重试,如果失败,2分钟重试,直到出现阈值终止。
延迟队列技术
DelayQueue
JDK自带DelayQueue 是一个支持延时获取元素的阻塞队列, 内部采用优先队列 PriorityQueue 存储元素,同时元素必须实现 Delayed 接口;在创建元素时可以指定多久才可以从队列中获取当前元素,只有在延迟期满时才能从队列中提取元素
RabbitMQ实现延迟任务
TTL:Time To Live (消息存活时间)
死信队列:Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以重新发送另一个交换机(死信交换机)
- 模拟业务逻辑。
- 导入依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--以及SpringBoot Web启动器等依赖-->
- 配置Yaml文件。
server:
port: 8006
spring:
rabbitmq:
host: 192.168.208.128
port: 5672
virtual-host: /
username: admin
password: admin
- 创建交互机与队列。
@Configuration
public class OrderRabbitConfig {
// 创建下单交换机
@Bean
public DirectExchange orderExchange(){
return new DirectExchange("order.exchange",true,false);
/*
* 第二参数:是否持久化,交换机将在服务器重启后继续存在。
* 第三参数:如果服务器在不再使用交换机时是否删除。
*/
}
// 创建延迟队列
@Bean
public Queue delayQueue(){
return QueueBuilder
.durable("order.queue")
.ttl(20000) //过期时间,单位毫秒
.deadLetterExchange("dl.exchange")//指定死信交换机
.deadLetterRoutingKey("dl.key")//指定死信路由key
.build();
}
// 绑定死信交换机和死信队列
@Bean
public Binding bindDelayQueueToExchange(
DirectExchange orderExchange,
Queue delayQueue
){
return BindingBuilder.bind(delayQueue).to(orderExchange).with("order.key");
}
// 创建死信交换机
@Bean
public DirectExchange createDeadLettterExchange(){
return new DirectExchange("dl.exchange",true,false);
}
// 创建死信队列
@Bean
public Queue createDeadLettterQueue(){
return QueueBuilder
.durable("dl.queue")
.build();
}
// 绑定死信交换机和死信队列
@Bean
public Binding bindQueueToExchange(){
return BindingBuilder
.bind(createDeadLettterQueue())
.to(createDeadLettterExchange())
.with("dl.key");
}
}
- 创建监听器消费死信队列。
@Component
public class OrderListener {
@Autowired(require = false)
private OrderService orderService;
@RabbitListener(queues = "dl.queue")
public void handleOrderMsg(String msg){
System.out.println("处理失效订单:"+new Date());
//通过msg中的订单ID获取订单对象
Order order = orderService.getById(Long.parseLong(msg));
//先判断该订单是否已经支付
//如果还是未支付,才进行修改(status=5,取消时间更新,把库存还原)
System.out.println(msg+" = status = 5,取消时间更新,把库存还原)");
}
}
- 创建处理请求模拟下单。
@RestController
public class SendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg")
public String sendMsg(){
System.out.println("下单时间:"+new Date());
rabbitTemplate.convertAndSend("order.exchange","order.key","OrderId");
return "发送成功";
}
}
redis实现延迟任务
zset数据类型的去重有序(分数排序)特点进行延迟。例如:时间戳作为score进行排序
- 引入Redisson依赖。
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.3</version>
</dependency>
<!--以及SpringBoot Web使用的依赖-->
- 配置yaml文件。
server:
port: 8002
spring:
application:
name: redisson-demo
redis:
host: 192.168.88.143
redisson:
config:
clusterServersConfig:
nodeAddresses:
- "redis://192.168.88.143:6379"
- 发送延迟任务消息。
@RestController
@RequestMapping("/test")
public class TestController {
// 注入RedissonClient客户端
@Autowired
private RedissonClient client;
// 发送消息
@GetMapping("/send/{time}")
public ResponseEntity send(@PathVariable("time") Long time) {
// 获取一个阻塞队列 指定队列的名称
RBlockingQueue<String> blockingQueue = client.getBlockingQueue("test_block");
// 创建一个延迟队列 指定使用哪个阻塞队列
RDelayedQueue<String> delayedQueue = client.getDelayedQueue(blockingQueue);
// 发送延迟消息 指定消息内容 延迟时间 时间单位
delayedQueue.offer("hello redis", time, TimeUnit.SECONDS);
return ResponseEntity.ok("发送成功" + new Date());
}
}
- 消费消息,执行延迟任务。
@Service
public class MsgListener {
// 注入RedissonClient客户端
@Autowired
private RedissonClient client;
// 实例化该类时首先执行这个方法
@PostConstruct
public void handleMessage() {
// 使用线程运行
new Thread(new Runnable() {
@Override
public void run() {
// 获取一个阻塞队列 指定队列的名称
RBlockingQueue<String> blockingQueue =
client.getBlockingQueue("test_block");
// 在监听器启动时先发一条自定义消息,主要是用于激活这个队列
// 创建一个延迟队列 指定使用哪个阻塞队列
RDelayedQueue<String> delayedQueue =
client.getDelayedQueue(blockingQueue);
// 发送延迟消息 指定消息内容 延迟时间 时间单位
delayedQueue.offer("init", 1, TimeUnit.SECONDS);
while (true) {
String msg = null;
try {
// 拉取消息 指定超时时间 默认这个方法是阻塞的
msg = blockingQueue.poll(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (!StringUtils.isEmpty(msg)) {
System.out.println("接收到消息: " + msg + ", " + new Date());
}
}
}
}).start();
}
}