前言
本节内容是关于使用redis的过期key,通过开启其监听失效策略,模拟订单延迟任务的执行流程。其核心原理是通过使用redis订阅与发布的方式,将过期失效的key通过广播的方式,发布给客户端,客户端可以监听此消息进而消费消息。需要注意的是官方并不推荐此方式,因为其容易造成数据丢失,例如没有客户端消费消息,消息也会丢失。对于一些安全性要求比较低的场景,可以使用此方式实现延迟队列。
正文
- 引入redis的pom依赖
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>
- application.yml中配置redis连接
spring:
data:
redis:
host: 127.0.0.1
port: 6379
database: 0
connect-timeout: 30000
timeout: 30000
lettuce:
pool:
enabled: true
max-active: 200
max-idle: 50
max-wait: -1
min-idle: 10
shutdown-timeout: 100
- 配置redis的缓冲池,并注入redis的消息监听容器
package com.yundi.tps.config;
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.cache.CacheManager;
import org.springframework.cache.caffeine.CaffeineCacheManager;
import org.springframework.cache.support.CompositeCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import java.util.concurrent.TimeUnit;
@Configuration
public class RedisConfig {
@Bean
public CacheManager cacheManager(RedisConnectionFactory connectionFactory) {
// redis缓存管理器
RedisCacheConfiguration defaultCacheConfig = RedisCacheConfiguration.defaultCacheConfig()
.serializeValuesWith(RedisSerializationContext
.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()));
RedisCacheManager redisCacheManager = RedisCacheManager.builder(connectionFactory)
.cacheDefaults(defaultCacheConfig)
.transactionAware()
.build();
return new CompositeCacheManager(redisCacheManager);
}
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
serializer.serialize(objectMapper);
template.setValueSerializer(serializer);
template.setKeySerializer(new StringRedisSerializer());
template.afterPropertiesSet();
return template;
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
- 实现一个KeyExpirationEventMessageListener过期的监听器RedisKeyExpirationListener
package com.yundi.tps.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* Redis失效事件 key
*
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
//notify-keyspace-events Ex
// 匹配规则
String patternRule = new String(pattern);
log.info("patternRule:{}", patternRule);
// 监听的通道
byte[] channel = message.getChannel();
log.info("channel:{}", new String(channel));
// 过期的key
String expireKey = message.toString();
log.info("expireKey:{}", expireKey);
//TODO 处理订单的后续业务逻辑
}
}
- 实现一个创建订单的延时任务接口,模拟订单超时
package com.yundi.tps.controller;
import com.baomidou.mybatisplus.core.toolkit.IdWorker;
import com.yundi.xyxc.tps.common.ApiResponse;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.concurrent.TimeUnit;
@Tag(name = "订单管理")
@RestController
@RequestMapping("/api/tps/order")
public class OrderController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Operation(summary = "创建订单")
@PostMapping("save")
public ApiResponse save() {
String orderId = String.valueOf(IdWorker.getId());
stringRedisTemplate.opsForValue().setIfAbsent(orderId, orderId, 60, TimeUnit.SECONDS);
return ApiResponse.ok();
}
}
- 开启redis key的失效监听,在redis配置中添加以下配置
notify-keyspace-events Ex
- 启动redis服务和客户端项目,发送延时订单任务,看客户端是否能够消费到此延迟任务
结语
需要注意的是,该方式实现的延迟任务安全性较低,对于安全性高的场景,并不推荐此种方式。关于使用redis的key监听,实现延迟任务实战内容到这里就结束了,下期见。。。。。。