什么是延迟任务
我们把需要延迟执行的任务叫做延迟任务,比如业务中用户发送审配,过期后需要执行一些操作,网上订单未支付,红包过期取消等等。
Java API 实现延迟任务
- ScheduledExecutorService实现延迟任务
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
System.out.println("程序启动时间:" + LocalDateTime.now());
executor.schedule(() -> {
System.out.println("start delay task!" + LocalDateTime.now());
}, 4, TimeUnit.SECONDS);
}
ScheduledExecutorService可以延迟任务,也可以延迟任务后使用固定频率再执行任务。
- DelayQueue延迟任务
DelayQueue泛型参数得实现Delayed接口,Delayed继承了Comparable接口。
getDelay
方法返回这个任务还剩多久时间可以执行,小于0的时候说明可以这个延迟任务到了执行的时间了。
compareTo
这个是对任务排序的,保证最先到延迟时间的任务排到队列的头。
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayItem> delayItemQueue = new DelayQueue<>();
delayItemQueue.put(new DelayItem("task1", 2000l));
delayItemQueue.put(new DelayItem("task2", 5000l));
System.out.println("start task" + LocalDateTime.now());
while (!delayItemQueue.isEmpty()) {
DelayItem take = delayItemQueue.take();
System.out.println(take.getTaskContent() + " " + LocalDateTime.now());
}
}
static class DelayItem implements Delayed {
private final String taskContent;
private Long triggerTime = System.currentTimeMillis();
public DelayItem(String taskContent, Long delayTime) {
this.taskContent = taskContent;
this.triggerTime = this.triggerTime + delayTime;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(triggerTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return this.triggerTime.compareTo(((DelayItem) o).getTriggerTime());
}
public Long getTriggerTime() {
return triggerTime;
}
public String getTaskContent() {
return taskContent;
}
}
spring延迟任务
@Scheduled也可以实现延迟执行
@Component
public class ScheduleJobs {
@Scheduled(fixedDelay = 2 * 1000)
public void fixedDelayJob() throws InterruptedException {
System.out.println("任务执行,时间:" + LocalDateTime.now());
}
}
netty延迟任务
@Slf4j
public class NettyHashedWheelTimerDemo {
public static void main(String[] args) {
HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 8);
timer.start();
log.info("提交延迟任务");
timer.newTimeout(timeout -> log.info("执行延迟任务"), 5, TimeUnit.SECONDS);
}
}
相比上面几种延迟队列,netty在算法上做了改变,使用定时轮实现的,定时轮其实就是一种环型的数据结构,可以把它想象成一个时钟,分成了许多格子,每个格子代表一定的时间,在这个格子上用一个链表来保存要执行的超时任务,同时有一个指针一格一格的走,走到那个格子时就执行格子对应的延迟任务,如下图所示:
时间轮定时器最大的优势就是,任务的新增和取消都是 O(1) 时间复杂度,而且只需要一个线程就可以驱动时间轮进行工作。
上述方式的几种场景都有一些缺点
- 占用jvm内存,数据量大的时候可能会导致OOM
- 机器重启,内存中的延迟队列丢失
- 解决分布式部署的问题
我可以的的解决办法
- 来任务后将数据存入mysql之类的数据库中,只把最近要发生的任务拉取出来放入延迟队列。
- 分布式环境,可以增加分布式锁,只让一个服务实例去加载延迟任务
Redis延迟队列
-
键过期通知
默认情况下 Redis 服务器端是不开启键空间通知的,需要我们通过
config set notify-keyspace-events Ex
的命令手动开启,开启键空间通知后,我们就可以拿到每个键值过期的事件,我们利用这个机制实现了给每个人开启一个定时任务的功能,实现代码如下:
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import utils.JedisUtils;
public class TaskExample {
public static final String _TOPIC = "__keyevent@0__:expired"; // 订阅频道名称
public static void main(String[] args) {
Jedis jedis = JedisUtils.getJedis();
// 执行定时任务
doTask(jedis);
}
/**
* 订阅过期消息,执行定时任务
* @param jedis Redis 客户端
*/
public static void doTask(Jedis jedis) {
// 订阅过期消息
jedis.psubscribe(new JedisPubSub() {
@Override
public void onPMessage(String pattern, String channel, String message) {
// 接收到消息,执行定时任务
System.out.println("收到消息:" + message);
}
}, _TOPIC);
}
}
这种方式也有一个弊端,就是键值过期的时候,接受服务正好挂了,会有任务丢失
- redis第二种任务方式是:通过 zset 数据判断
redis的zset数据结构中的每个元素都有一个分数score和一个值value,我们可以将任务的执行时间戳作为score,将任务数据作为value,将任务插入到zset中,然后在开启一个查询,消费查询出来的任务,也可以使用别人封装好的工具Redisson
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.1</version>
</dependency>
RabbitMQ的延迟任务
- 可以将正常队列设置一个过期时间,设置路由规则,过期后路由到指定队列中去消费
- 使用延迟消息插件
RabbitMQ官方推出的插件,原生支持延迟消息的功能。其原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列。
本地RabbitMQ官网下载rabbitmq_delayer_message_exchange插件地址:
https://www.rabbitmq.com/community-plugins
参考
延迟任务 https://mp.weixin.qq.com/s/aghAzOxPOVOdpSLvTBhWHw