一、接着上文
上文我们讲述了使用redisson的RDelayedQueue实现分布式延迟队列,本文我们将自己JDK的延迟队列DelayQueue实现。
相比前者的实现,作为进程内的延迟队列,它会遇到许多技术难点:
- 如何支持分布式的多个节点部署场景
- 应用重启会恢复延时队列
- 冷数据如何转换为热数据
- 如何删除延迟队列中的任务
随后,我们也将提及:
- 保存任务至延迟队列(生产者)
- 读取延迟队列中的任务(消费者)
二、设计概要
-
冷数据:mysql表中的任务数据
-
热数据:jdk 延迟队列中的任务
-
广播事件:删除延迟队列中的任务,发布的是广播事件,可以使用redis topic实现。
-
本地事件:分布式多节点部署的时候,每个任务只保存在其中一个节点的延迟队列中,可以使用spring事件驱动实现。
-
延迟队列 DelayQueueJob, 它实现了接口Delayed
包括任务的交易流水号和过期时间(即任务的回调时间)
import lombok.Builder;
import lombok.Data;
import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @author xxx
*/
@Builder
@Data
public class DelayQueueJob implements Delayed {
/**
* 交易流水号
*/
private String transNo;
/**
* 到期时间
*/
private Date expireDate;
public DelayQueueJob(String transNo, Date expireDate) {
super();
this.transNo = transNo;
this.expireDate = expireDate;
}
/**
* 用于队列中排序过期时间
*
* @param o
* @return
*/
@Override
public int compareTo(Delayed o) {
return Long.valueOf(this.expireDate.getTime())
.compareTo(Long.valueOf(((DelayQueueJob) o).expireDate.getTime()));
}
/**
* 用于获取过期时间
* 延迟关闭时间 = 过期时间 - 当前时间
*
* @param unit
* @return
*/
@Override
public long getDelay(TimeUnit unit) {
return this.expireDate.getTime() - System.currentTimeMillis();
}
}
三、应用启动流程
解决恢复延迟队列的问题。因为DelayQueue是进程内的,一旦重启,将被销毁。
import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Slf4j
@Service
@RequiredArgsConstructor
public class ApplicationStartupListener implements ApplicationListener<ApplicationReadyEvent> {
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
// 实现代码参考上面的流程图
}
}
四、定时任务流程
解决冷数据如何转换为热数据的问题,防止延时任务过多导致消耗过多的jvm内存,所以只有回调时间将近的任务才放入延迟队列。
五、如何删除延迟队列中的任务
删除延迟队列的任务:发送广播消息通知所有的节点,当不是当前节点的时候,执行删除。
if (!NetUtil.getLocalhostStr().equals(ipAddress)) {
DelayQueueSingleton.getDelayQueue().remove(transNo);
}
DelayQueueSingletons是一个单例类,详见下:
public class DelayQueueSingleton {
private static volatile CustomDelayQueue<DelayQueueJob> delayQueue;
private DelayQueueSingleton() {
}
public static CustomDelayQueue<DelayQueueJob> getDelayQueue() {
if (delayQueue == null) {
synchronized (DelayQueueSingleton.class) {
if (delayQueue == null) {
delayQueue = new CustomDelayQueue<>();
}
}
}
return delayQueue;
}
}
这里为了删除延迟队列的任务,我们对DelayQueue进行了重写。
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
public class CustomDelayQueue<T extends Delayed> {
private final DelayQueue<T> queue = new DelayQueue<>();
private final Map<String, T> map = new ConcurrentHashMap<>();
public boolean put(T task, String taskId) {
// 如果任务已存在,则删除旧任务,防止重复添加
this.remove(taskId);
map.put(taskId, task);
return queue.add(task);
}
public boolean remove(String taskId) {
// 先删除map,再删除queue
T task = map.remove(taskId);
if (task != null) {
return queue.remove(task);
}
return false;
}
public T take() throws InterruptedException {
return queue.take();
}
}
六、保存任务至延迟队列(生产者)
// 如果通知时间在一定时间范围内
if (DateUtil.offsetMinute(new DateTime(), commonConfig.getHotDataTimeLine()).after(event.getNotifyDate())) {
DelayQueueSingleton.getDelayQueue().put(DelayQueueJob.builder()
.transNo(event.getTransNo())
.expireDate(event.getNotifyDate())
.build(), event.getTransNo());
}
七、读取延迟队列中的任务(消费者)
作为延迟队列的消费者,它的实现和上一篇文章实现类似。不同的是take()获取任务不一样。
String transNo = null;
Date notifyDate = null;
DelayQueueJob job = DelayQueueSingleton.getDelayQueue().take();
if (null != job) {
transNo = job.getTransNo();
notifyDate = job.getExpireDate();
}
if (null == transNo) {
return;
}
if (log.isInfoEnabled()) {
log.info("开始执行延迟队列中的任务,transNo={},notifyDate={}", transNo, notifyDate);
}
// 异步执行你的操作
notifyTaskService.handleTask(transNo, notifyDate);
八、总结
作为进程内的延迟队列,在多点部署的分布式集群环境下, 代码明显比上一篇要复杂得多。
它们都需要的步骤是:
- 任务的生产
- 任务的消费
- 移除任务
DelayQueue额外多出来的步骤是:
- 应用启动的时候拉取回调时间将近的未完成任务(更新marked标记为true,防止重复拉取冷数据)
- 定时拉取未标记且回调时间将近的未完成任务(和上面必须是互斥,等待上一步执行完成,否则会导致重复拉取)
- 删除延迟队列DelayQueue的任务,必须发布广播消息给全部节点。(引入广播消息机制)
由此可见,任务表的字段marked仅供DelayQueue使用,防止重复拉取数据库的任务到热数据区。
@Column(name = "marked", nullable = false, columnDefinition = "TINYINT(1) default 0 COMMENT '是否已标记为热数据'")
private Boolean marked;
附:相关系列文章链接
延时任务通知服务的设计及实现(一)-- 设计方案
延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue
延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue
延时任务通知服务的设计及实现(四)-- webhook执行任务
延时任务通知服务的设计及实现(五)-- Netty时间轮HashedWheelTimer