延时队列是一种特殊的消息队列,它允许消息在指定的时间后被消费。在微服务架构、电商系统和任务调度场景中,延时队列扮演着关键角色。例如,订单超时自动取消、定时提醒、延时支付等都依赖延时队列实现。
Redis作为高性能的内存数据库,具备原子操作、数据结构丰富和简单易用的特性,本文将介绍基于Redis实现分布式延时队列的四种方式。
1. 基于Sorted Set的延时队列
原理
利用Redis的Sorted Set(有序集合),将消息ID作为member,执行时间戳作为score进行存储。通过ZRANGEBYSCORE
命令可以获取到达执行时间的任务。
代码实现
public class RedisZSetDelayQueue {
private final StringRedisTemplate redisTemplate;
private final String queueKey = "delay_queue:tasks";
public RedisZSetDelayQueue(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* 添加延时任务
* @param taskId 任务ID
* @param taskInfo 任务信息(JSON字符串)
* @param delayTime 延迟时间(秒)
*/
public void addTask(String taskId, String taskInfo, long delayTime) {
// 计算执行时间
long executeTime = System.currentTimeMillis() + delayTime * 1000;
// 存储任务详情
redisTemplate.opsForHash().put("delay_queue:details", taskId, taskInfo);
// 添加到延时队列
redisTemplate.opsForZSet().add(queueKey, taskId, executeTime);
System.out.println("Task added: " + taskId + ", will execute at: " + executeTime);
}
/**
* 轮询获取到期任务
*/
public List<String> pollTasks() {
long now = System.currentTimeMillis();
// 获取当前时间之前的任务
Set<String> taskIds = redisTemplate.opsForZSet()
.rangeByScore(queueKey, 0, now);
if (taskIds == null || taskIds.isEmpty()) {
return Collections.emptyList();
}
// 获取任务详情
List<String> tasks = new ArrayList<>();
for (String taskId : taskIds) {
String taskInfo = (String) redisTemplate.opsForHash()
.get("delay_queue:details", taskId);
if (taskInfo != null) {
tasks.add(taskInfo);
// 从集合和详情中移除任务
redisTemplate.opsForZSet().remove(queueKey, taskId);
redisTemplate.opsForHash().delete("delay_queue:details", taskId);
}
}
return tasks;
}
// 定时任务示例
public void startTaskProcessor() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try {
List<String> tasks = pollTasks();
for (String task : tasks) {
processTask(task);
}
} catch (Exception e) {
e.printStackTrace();
}
}, 0, 1, TimeUnit.SECONDS);
}
private void processTask(String taskInfo) {
System.out.println("Processing task: " + taskInfo);
// 实际任务处理逻辑
}
}
优缺点
优点
- 实现简单,易于理解
- 任务按执行时间自动排序
- 支持精确的时间控制
缺点
- 需要轮询获取到期任务,消耗CPU资源
- 大量任务情况下,
ZRANGEBYSCORE
操作可能影响性能 - 没有消费确认机制,需要额外实现
2. 基于List + 定时轮询的延时队列
原理
这种方式使用多个List作为存储容器,按延迟时间的不同将任务分配到不同的队列中。通过定时轮询各个队列,将到期任务移动到一个立即执行队列。
代码实现
public class RedisListDelayQueue {
private final StringRedisTemplate redisTemplate;
private final String readyQueueKey = "delay_queue:ready"; // 待处理队列
private final Map<Integer, String> delayQueueKeys; // 延迟队列,按延时时间分级
public RedisListDelayQueue(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
// 初始化不同延迟级别的队列
delayQueueKeys = new HashMap<>();
delayQueueKeys.put(5, "delay_queue:delay_5s"); // 5秒
delayQueueKeys.put(60, "delay_queue:delay_1m"); // 1分钟
delayQueueKeys.put(300, "delay_queue:delay_5m"); // 5分钟
delayQueueKeys.put(1800, "delay_queue:delay_30m"); // 30分钟
}
/**
* 添加延时任务
*/
public void addTask(String taskInfo, int delaySeconds) {
// 选择合适的延迟队列
String queueKey = selectDelayQueue(delaySeconds);
// 任务元数据,包含任务信息和执行时间
long executeTime = System.currentTimeMillis() + delaySeconds * 1000;
String taskData = executeTime + ":" + taskInfo;
// 添加到延迟队列
redisTemplate.opsForList().rightPush(queueKey, taskData);
System.out.println("Task added to " + queueKey + ": " + taskData);
}
/**
* 选择合适的延迟队列
*/
private String selectDelayQueue(int delaySeconds) {
// 找到最接近的延迟级别
int closestDelay = delayQueueKeys.keySet().stream()
.filter(delay -> delay >= delaySeconds)
.min(Integer::compareTo)
.orElse(Collections.max(delayQueueKeys.keySet()));
return delayQueueKeys.get(closestDelay);
}
/**
* 移动到期任务到待处理队列
*/
public void moveTasksToReadyQueue() {
long now = System.currentTimeMillis();
// 遍历所有延迟队列
for (String queueKey : delayQueueKeys.values()) {
boolean hasMoreTasks = true;
while (hasMoreTasks) {
// 查看队列头部任务
String taskData = redisTemplate.opsForList().index(queueKey, 0);
if (taskData == null) {
hasMoreTasks = false;
continue;
}
// 解析任务执行时间
long executeTime = Long.parseLong(taskData.split(":", 2)[0]);
// 检查是否到期
if (executeTime <= now) {
// 通过LPOP原子性地移除队列头部任务
String task = redisTemplate.opsForList().leftPop(queueKey);
// 任务可能被其他进程处理,再次检查
if (task != null) {
// 提取任务信息并添加到待处理队列
String taskInfo = task.split(":", 2)[1];
redisTemplate.opsForList().rightPush(readyQueueKey, taskInfo);
System.out.println("Task moved to ready queue: " + taskInfo);
}
} else {
// 队列头部任务未到期,无需检查后面的任务
hasMoreTasks = false;
}
}
}
}
/**
* 获取待处理任务
*/
public String getReadyTask() {
return redisTemplate.opsForList().leftPop(readyQueueKey);
}
/**
* 启动任务处理器
*/
public void startTaskProcessors() {
// 定时移动到期任务
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
// 移动任务线程
scheduler.scheduleAtFixedRate(this::moveTasksToReadyQueue, 0, 1, TimeUnit.SECONDS);
// 处理任务线程
scheduler.scheduleAtFixedRate(() -> {
String task = getReadyTask();
if (task != null) {
processTask(task);
}
}, 0, 100, TimeUnit.MILLISECONDS);
}
private void processTask(String taskInfo) {
System.out.println("Processing task: " + taskInfo);
// 实际任务处理逻辑
}
}
优缺点
优点
- 分级队列设计,降低单队列压力
- 相比Sorted Set占用内存少
- 支持队列监控和任务优先级
缺点
- 延迟时间精度受轮询频率影响
- 实现复杂度高
- 需要维护多个队列
- 时间判断和队列操作非原子性,需特别处理并发问题
3. 基于发布/订阅(Pub/Sub)的延时队列
原理
结合Redis发布/订阅功能与本地时间轮算法,实现延迟任务的分发和处理。任务信息存储在Redis中,而时间轮负责任务的调度和发布。
代码实现
public class RedisPubSubDelayQueue {
private final StringRedisTemplate redisTemplate;
private final String TASK_TOPIC = "delay_queue:task_channel";
private final String TASK_HASH = "delay_queue:tasks";
private final HashedWheelTimer timer;
public RedisPubSubDelayQueue(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
// 初始化时间轮,刻度100ms,轮子大小512
this.timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 512);
// 启动消息订阅
subscribeTaskChannel();
}
/**
* 添加延时任务
*/
public void addTask(String taskId, String taskInfo, long delaySeconds) {
// 存储任务信息到Redis
redisTemplate.opsForHash().put(TASK_HASH, taskId, taskInfo);
// 添加到时间轮
timer.newTimeout(timeout -> {
// 发布任务就绪消息
redisTemplate.convertAndSend(TASK_TOPIC, taskId);
}, delaySeconds, TimeUnit.SECONDS);
System.out.println("Task scheduled: " + taskId + ", delay: " + delaySeconds + "s");
}
/**
* 订阅任务通道
*/
private void subscribeTaskChannel() {
redisTemplate.getConnectionFactory().getConnection().subscribe(
(message, pattern) -> {
String taskId = new String(message.getBody());
// 获取任务信息
String taskInfo = (String) redisTemplate.opsForHash().get(TASK_HASH, taskId);
if (taskInfo != null) {
// 处理任务
processTask(taskId, taskInfo);
// 删除任务
redisTemplate.opsForHash().delete(TASK_HASH, taskId);
}
},
TASK_TOPIC.getBytes()
);
}
private void processTask(String taskId, String taskInfo) {
System.out.println("Processing task: " + taskId + " - " + taskInfo);
// 实际任务处理逻辑
}
// 模拟HashedWheelTimer类
public static class HashedWheelTimer {
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private final long tickDuration;
private final TimeUnit unit;
public HashedWheelTimer(long tickDuration, TimeUnit unit, int wheelSize) {
this.tickDuration = tickDuration;
this.unit = unit;
}
public void newTimeout(TimerTask task, long delay, TimeUnit timeUnit) {
long delayMillis = timeUnit.toMillis(delay);
scheduler.schedule(
() -> task.run(null),
delayMillis,
TimeUnit.MILLISECONDS
);
}
public interface TimerTask {
void run(Timeout timeout);
}
public interface Timeout {
}
}
}
优缺点
优点:
- 即时触发,无需轮询
- 高效的时间轮算法
- 可以跨应用订阅任务
- 分离任务调度和执行,降低耦合
缺点:
- 依赖本地时间轮,非纯Redis实现
- Pub/Sub模式无消息持久化,可能丢失消息
- 服务重启时需要重建时间轮
- 订阅者需要保持连接
4. 基于Redis Stream的延时队列
原理
Redis 5.0引入的Stream是一个强大的数据结构,专为消息队列设计。结合Stream的消费组和确认机制,可以构建可靠的延时队列。
代码实现
public class RedisStreamDelayQueue {
private final StringRedisTemplate redisTemplate;
private final String delayQueueKey = "delay_queue:stream";
private final String consumerGroup = "delay_queue_consumers";
private final String consumerId = UUID.randomUUID().toString();
public RedisStreamDelayQueue(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
// 创建消费者组
try {
redisTemplate.execute((RedisCallback<String>) connection -> {
connection.streamCommands().xGroupCreate(
delayQueueKey.getBytes(),
consumerGroup,
ReadOffset.from("0"),
true
);
return "OK";
});
} catch (Exception e) {
// 消费者组可能已存在
System.out.println("Consumer group may already exist: " + e.getMessage());
}
}
/**
* 添加延时任务
*/
public void addTask(String taskInfo, long delaySeconds) {
long executeTime = System.currentTimeMillis() + delaySeconds * 1000;
Map<String, Object> task = new HashMap<>();
task.put("executeTime", String.valueOf(executeTime));
task.put("taskInfo", taskInfo);
redisTemplate.opsForStream().add(delayQueueKey, task);
System.out.println("Task added: " + taskInfo + ", execute at: " + executeTime);
}
/**
* 获取待执行的任务
*/
public List<String> pollTasks() {
long now = System.currentTimeMillis();
List<String> readyTasks = new ArrayList<>();
// 读取尚未处理的消息
List<MapRecord<String, Object, Object>> records = redisTemplate.execute(
(RedisCallback<List<MapRecord<String, Object, Object>>>) connection -> {
return connection.streamCommands().xReadGroup(
consumerGroup.getBytes(),
consumerId.getBytes(),
StreamReadOptions.empty().count(10),
StreamOffset.create(delayQueueKey.getBytes(), ReadOffset.from(">"))
);
}
);
if (records != null) {
for (MapRecord<String, Object, Object> record : records) {
String messageId = record.getId().getValue();
Map<Object, Object> value = record.getValue();
long executeTime = Long.parseLong((String) value.get("executeTime"));
String taskInfo = (String) value.get("taskInfo");
// 检查任务是否到期
if (executeTime <= now) {
readyTasks.add(taskInfo);
// 确认消息已处理
redisTemplate.execute((RedisCallback<String>) connection -> {
connection.streamCommands().xAck(
delayQueueKey.getBytes(),
consumerGroup.getBytes(),
messageId.getBytes()
);
return "OK";
});
// 可选:从流中删除消息
redisTemplate.opsForStream().delete(delayQueueKey, messageId);
} else {
// 任务未到期,放回队列
redisTemplate.execute((RedisCallback<String>) connection -> {
connection.streamCommands().xAck(
delayQueueKey.getBytes(),
consumerGroup.getBytes(),
messageId.getBytes()
);
return "OK";
});
// 重新添加任务(可选:使用延迟重新入队策略)
Map<String, Object> newTask = new HashMap<>();
newTask.put("executeTime", String.valueOf(executeTime));
newTask.put("taskInfo", taskInfo);
redisTemplate.opsForStream().add(delayQueueKey, newTask);
}
}
}
return readyTasks;
}
/**
* 启动任务处理器
*/
public void startTaskProcessor() {
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
try {
List<String> tasks = pollTasks();
for (String task : tasks) {
processTask(task);
}
} catch (Exception e) {
e.printStackTrace();
}
}, 0, 1, TimeUnit.SECONDS);
}
private void processTask(String taskInfo) {
System.out.println("Processing task: " + taskInfo);
// 实际任务处理逻辑
}
}
优缺点
优点:
- 支持消费者组和消息确认,提供可靠的消息处理
- 内置消息持久化机制
- 支持多消费者并行处理
- 消息ID包含时间戳,便于排序
缺点:
- 要求Redis 5.0+版本
- 实现相对复杂
- 仍需轮询获取到期任务
- 对未到期任务的处理相对繁琐
性能对比与选型建议
实现方式 | 性能 | 可靠性 | 实现复杂度 | 内存占用 | 适用场景 |
---|---|---|---|---|---|
Sorted Set | ★★★★☆ | ★★★☆☆ | 低 | 中 | 任务量适中,需要精确调度 |
List + 轮询 | ★★★★★ | ★★★☆☆ | 中 | 低 | 高并发,延时精度要求不高 |
Pub/Sub + 时间轮 | ★★★★★ | ★★☆☆☆ | 高 | 低 | 实时性要求高,可容忍服务重启丢失 |
Stream | ★★★☆☆ | ★★★★★ | 高 | 中 | 可靠性要求高,需要消息确认 |
总结
在实际应用中,可根据系统规模、性能需求、可靠性要求和实现复杂度等因素进行选择,也可以组合多种方式打造更符合业务需求的延时队列解决方案。无论选择哪种实现,都应关注可靠性、性能和监控等方面,确保延时队列在生产环境中稳定运行。