最近看开源看到一个好用的延时队列组件,已经上生产。代码量很少,主要就是利用Redis监听过期键实现的。然后搞点策略模式柔和柔和。利用Spring Start 封装了一下,全是俺掌握的知识,稍微研究了下就搞懂了。觉得挺有用的,这里分享一下。
Redis 过期键监听
之前写责任链手撸二级缓存的时候,也是借助过期键监听器来更新二级缓存的,详情挪步
CaffeineCache+Redis 接入系统做二层缓存,SPI 思路实现(借鉴 mybatis 二级缓存、自动装配源码)
效果
效果前提: Redis 开启了过期键通知:config set notify-keyspace-events Ex
根据 code 值发布延时任务(10s)。
对应的code 的处理器,10s后收到通知进行处理任务
基于这套组件可实现的功能:订单超时自动取消、会议前 30 分钟自动提醒、订单到点自动收货等,比MQ灵活性更高,RocketMq 老版只支持最高30 分钟的延时任务,这套组件可以指定任意时间。且可无限扩展 topic,满足不同类型的业务。缺点就是严重依赖Redis,需要保证Redis的高可用
RedisExpiredListener配置
利用ApplicationContextAware注册所有的messageHandleRouter处理器,当有消息过来时解析消息格式中的CODE,根据CODE把任务分发给具体的某个messageHandleRouter实现类进行处理。进行业务隔离。
package com.zzh.mybatisplus5.mq;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
public class RedisExpiredListener implements MessageListener, ApplicationContextAware {
/**
* 客户端监听订阅的topic,当有消息的时候,会触发该方法;
* 并不能得到value, 只能得到key。
* 姑且理解为: redis服务在key失效时(或失效后)通知到java服务某个key失效了, 那么在java中不可能得到这个redis-key对应的redis-value。
*/
protected HashMap<Integer, DelayedMessageHandler> handlerRouter;
private static final Logger logger = LoggerFactory.getLogger(RedisExpiredListener.class);
@Override
public void onMessage(Message message, byte[] bytes) {
String expiredKey = message.toString();
// TASK:CODE:VALUE结构
String[] split = expiredKey.split(":");
if (split.length < 2 || !expiredKey.startsWith("TASK:")) {
return;
}
logger.info("[Redis键失效通知] key=" + expiredKey);
StringBuilder value = new StringBuilder();
for (int i = 2; i < split.length; i++) {
value.append(split[i]);
if (i != split.length - 1) {
value.append(":");
}
}
int code = Integer.parseInt(split[1]);
DelayedMessageHandler handler = handlerRouter.get(code);
if (handler != null) {
handler.handle(value.toString());
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.handlerRouter = (HashMap<Integer, DelayedMessageHandler>) applicationContext.getBean("messageHandleRouter");
}
}
DelayedMessageQueue实现类
基础配置类,里面配置了监听哪个 Redis 库的过期事件
package com.zzh.mybatisplus5.mq;
import com.zzh.mybatisplus5.component.CacheComponent;
import org.springframework.beans.factory.annotation.Autowired;
import java.util.concurrent.Callable;
public class RedisNotifyDelayedMessageQueueImpl implements DelayedMessageQueue {
@Autowired
private CacheComponent cacheComponent;
@Override
public Boolean publishTask(Integer code, String value, Integer delay) {
if (delay < 0) {
delay = 1;
}
cacheComponent.putRaw(assembleKey(code, value), "", delay);
return true;
}
@Override
public Boolean deleteTask(Integer code, String value) {
cacheComponent.del(assembleKey(code, value));
return true;
}
@Override
public Long getTaskTime(Integer code, String value) {
return cacheComponent.getKeyExpire(assembleKey(code, value));
}
@Override
public Boolean publishTask(Callable task, Integer delay) {
throw new RuntimeException();
}
public String assembleKey(Integer code, String value) {
if (value == null) {
value = "";
}
StringBuilder sb = new StringBuilder("TASK:");
sb.append(code + ":");
sb.append(value);
return sb.toString();
}
}
Redis 配置
package com.zzh.mybatisplus5.mq;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Configuration
public class RedisAutoConfig {
private static final Logger logger = LoggerFactory.getLogger(RedisAutoConfig.class);
@Value("${spring.redis.database}")
private Integer cacheDB;
@Bean
public Map<Integer, DelayedMessageHandler> messageHandleRouter(List<DelayedMessageHandler> delayedMessageHandlerList) {
return delayedMessageHandlerList.stream().collect(Collectors.toMap(DelayedMessageHandler::getCode, v -> v));
}
@Bean
public RedisExpiredListener redisExpiredListener() {
return new RedisExpiredListener();
}
/**
* 指定 redis 库运行 config set notify-keyspace-events Ex 即可,不然监听无法生效
* redis服务端需要配置 notify-keyspace-events 参数 ,至少包含k或者e
* K 键空间通知,所有通知以 __keyspace@<db>__ 为前缀
* E 键事件通知,所有通知以 __keyevent@<db>__ 为前缀
* g DEL 、 EXPIRE 、 RENAME 等类型无关的通用命令的通知
* $ 字符串命令的通知
* l 列表命令的通知
* s 集合命令的通知
* h 哈希命令的通知
* z 有序集合命令的通知
* x 过期事件:每当有过期键被删除时发送
* e 驱逐(evict)事件:每当有键因为 maxmemory 政策而被删除时发送
* A 参数 g$lshzxe 的别名
*
* @后边可以指定db库,*代表所有库,0代表0库 __keyevent@0__:expired 0库过期的数据
* __keyspace@0__:mykey 0库mykey这个键的所有操作
* __keyevent@0__:del 0库所有del这个命令
*/
@Bean
public RedisMessageListenerContainer container(LettuceConnectionFactory defaultLettuceConnectionFactory, RedisExpiredListener expiredListener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(defaultLettuceConnectionFactory);
//监听指定库的过期key
container.addMessageListener(expiredListener, new PatternTopic("__keyevent@" + cacheDB + "__:expired"));
return container;
}
@Bean
public DelayedMessageQueue delayedMessageQueue() {
return new RedisNotifyDelayedMessageQueueImpl();
}
@Bean
public LettuceConnectionFactory defaultLettuceConnectionFactory(
RedisConfiguration defaultRedisConfig,GenericObjectPoolConfig defaultPoolConfig) {
LettuceClientConfiguration clientConfig =
LettucePoolingClientConfiguration.builder().commandTimeout(Duration.ofMillis(5000))
.poolConfig(defaultPoolConfig).build();
return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);
}
@Bean
public RedisTemplate<String, String> redisTemplate(
LettuceConnectionFactory defaultLettuceConnectionFactory) {
RedisTemplate<String, String> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(defaultLettuceConnectionFactory);
return redisTemplate;
}
@Bean
public StringRedisTemplate stringRedisTemplate(LettuceConnectionFactory defaultLettuceConnectionFactory) {
StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
stringRedisTemplate.setConnectionFactory(defaultLettuceConnectionFactory);
return stringRedisTemplate;
}
@Configuration
public static class DefaultRedisConfig {
@Value("${spring.redis.master-name}")
private String masterName;
@Value("${spring.redis.mode}")
private String mode;
@Value("${spring.redis.host:127.0.0.1:6379}")
private String host;
@Value("${spring.redis.password:}")
private String password;
@Value("${spring.redis.database:0}")
private Integer database;
@Value("${spring.redis.lettuce.pool.max-active:8}")
private Integer maxActive;
@Value("${spring.redis.lettuce.pool.max-idle:8}")
private Integer maxIdle;
@Value("${spring.redis.lettuce.pool.max-wait:-1}")
private Long maxWait;
@Value("${spring.redis.lettuce.pool.min-idle:0}")
private Integer minIdle;
@Bean
public GenericObjectPoolConfig defaultPoolConfig() {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setMaxTotal(maxActive);
config.setMaxIdle(maxIdle);
config.setMinIdle(minIdle);
config.setMaxWaitMillis(maxWait);
return config;
}
@Bean
public RedisConfiguration defaultRedisConfig() {
return getRedisConfiguration(masterName, mode, host, password, database);
}
}
private static RedisConfiguration getRedisConfiguration(String masterName, String mode, String host, String password, Integer database) {
if (mode.equals("single")) {
RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
String[] hostArray = host.split(":");
config.setHostName(hostArray[0]);
config.setPassword(RedisPassword.of(password));
config.setPort(Integer.parseInt(hostArray[1]));
config.setDatabase(database);
return config;
} else if (mode.equals("sentinel")) {
RedisSentinelConfiguration configuration = new RedisSentinelConfiguration();
configuration.setMaster(masterName);
String[] hostList = host.split(",");
List<RedisNode> serverList = new LinkedList<>();
for (String hostItem : hostList) {
String[] hostArray = hostItem.split(":");
RedisServer redisServer = new RedisServer(hostArray[0], Integer.parseInt(hostArray[1]));
serverList.add(redisServer);
}
configuration.setSentinels(serverList);
logger.info("[Redis] 哨兵节点: masterName={}, host={}", masterName, host);
return configuration;
} else {
return null;
}
}
}
顶级策略接口
没啥好说的,老三样
package com.zzh.mybatisplus5.mq;
import java.util.concurrent.Callable;
public interface DelayedMessageQueue {
/**
* 添加延迟秒数,RingDelayedMessageQueueImpl专用,单机实现
* * @param delay seconds
* @return
*/
public Boolean publishTask(Callable task, Integer delay);
/**
* RedisNotifyDelayedMessageQueueImpl专用,集群实现
* 这两个都会被拼接为 TASK:(随机码):CODE:VALUE 当成key存入redis中,因为回调时只会返回key,而不会返回key对应的值
* @param code 回调时用来选择的Handler的CODE
* @param value 回调时使用的值
* @param delay 多少秒后调用
* @return
*/
public Boolean publishTask(Integer code, String value, Integer delay);
/**
* 删除已有任务
* @param code
* @param value
* @return
*/
public Boolean deleteTask(Integer code, String value);
/**
* 获取指定任务还有多少时间执行,如果不存在,返回-2
* @param code
* @param value
* @return
*/
public Long getTaskTime(Integer code, String value);
}
package com.zzh.mybatisplus5.mq;
/**
* 延迟消息处理器
*/
public interface DelayedMessageHandler {
/**
*
* @param value
* @return 处理成功的返回大于0结果,失败返回0
*/
public int handle(String value);
public int getCode();
}
延时队列设计思路
和我之前使用策略模式封装的多个OSS使用,写法简直是一毛一样,详情挪步。策略模式调优(多Oss存储导致代码冗余的问题)
延时队列消息丢失怎么解决
开个定时任务,每隔一分钟定时进行扫表,加了索引、延时消息丢失不多的情况下,查数据会很快。扫到有超时的订单,接着丢到 Redis 延时队列里面,双重保险。同时定时任务加个分布式锁,一台机器运行即可。代码爆红是因为,我拉的开源项目,没跑起来直接看的源码。