场景
6S后执行任务
7天后发送订单
从现有时间算延后多少时间开始执行,当然也可以转换为在以后某个时间执行。
Timer类
Java中的Timer类是一个定时器,它可以用来实现延时消息的功能。
import java.util.Timer;
import java.util.TimerTask;
public class TimerDemo {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("Hello, world!");
}
}, 5000);
}
}
Timer类的使用非常简单,但是它存在一些问题。首先,Timer类是单线程的,如果有多个任务需要执行,它们会被放到同一个队列中,按照先后顺序依次执行。如果某个任务的执行时间过长,会影响后续任务的执行。其次,Timer类不够灵活,无法满足一些复杂的需求。
ScheduledExecutorService 周期线程池
Java中的ScheduledExecutorService接口是一个可调度的线程池,它可以用来实现延时消息的功能。
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutorServiceDemo {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(new Runnable() {
@Override
public void run() {
System.out.println("Hello, world!");
}
}, 5, TimeUnit.SECONDS);
}
}
与Timer类相比,ScheduledExecutorService接口更加灵活。它可以支持多个任务同时执行,可以设置任务的执行周期,可以设置任务的执行优先级等等。但是,它也存在一些问题。比如,如果任务的执行时间过长,会影响后续任务的执行,因为它也是单线程的。
Quartz 定时任务
POM:
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>***</version>
</dependency>
示例:
JobDetail job = newJob(SimpleJob.class).withIdentity("job1", "group1").build();
SimpleTrigger trigger = newTrigger().withIdentity("trigger1", "group1").startAt(runDate)
.withSchedule(simpleSchedule().withIntervalInHours(1).repeatForever()).modifiedByCalendar("holidays").build();
sched.scheduleJob(job, trigger);
DelayQueue 延时队列
DelayQueue的定义
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E>
DelayQueue是一个无界的BlockingQueue,是线程安全的(无界指的是队列的元素数量不存在上限,队列的容量会随着元素数量的增加而扩容,阻塞队列指的是当队列内元素数量为0的时候,试图从队列内获取元素的线程将被阻塞或者抛出异常)
以上是阻塞队列的特点,而延迟队列还拥有自己如下的特点:
DelayQueue中存入的必须是实现了Delayed接口的对象(Delayed定义了一个getDelay的方法,用来判断排序后的元素是否可以从Queue中取出,并且Delayed接口还继承了Comparable用于排序),插入Queue中的数据根据compareTo方法进行排序(DelayQueue的底层存储是一个PriorityQueue,PriorityQueue是一个可排序的Queue,其中的元素必须实现Comparable接口的compareTo方法),并通过getDelay方法返回的时间确定元素是否可以出队,只有小于等于0的元素(即延迟到期的元素)才能够被取出
延迟队列不接收null元素
代码
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@Data
public class OrderDelayTask implements Delayed {
private Long orderId;
private long delayTime;
public OrderDelayTask(Long orderId, long delayTime) {
this.orderId = orderId;
// 延迟时间加当前时间
this.delayTime = System.currentTimeMillis() + delayTime;
}
// 获取任务剩余时间
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(delayTime, ((OrderDelayTask) o).delayTime);
}
public static void main(String[] args) {
DelayQueue<OrderDelayTask> orderDelayQueue = new DelayQueue<>();
//发起订单下单的时候将订单演示对象放入orderDelayQueue
orderDelayQueue.add(
new OrderDelayTask(
123456l, // 订单id
3 * 1000 // 延迟时间:3s
)
);
new Thread(() -> {
try {
while (true) {
OrderDelayTask task = orderDelayQueue.take();
// 当队列为null的时候,poll()方法会直接返回null, 不会抛出异常,但是take()方法会一直等待,因此会抛出一个InterruptedException类型的异常。(当阻塞方法收到中断请求的时候就会抛出InterruptedException异常)
Long orderId = task.getOrderId();
// 执行业务
System.out.println(orderId);
}
} catch (InterruptedException e) {
// 因为是重写Runnable接口的run方法,子类抛出的异常要小于等于父类的异常。而在Runnable中run方法是没有抛异常的。所以此时是不能抛出InterruptedException异常。如果此时你只是记录日志的话,那么就是一个不负责任的做法,因为在捕获InterruptedException异常的时候自动的将是否请求中断标志置为了false。在捕获了InterruptedException异常之后,如果你什么也不想做,那么就将标志重新置为true,以便栈中更高层的代码能知道中断,并且对中断作出响应。
Thread.currentThread().interrupt();
}
}).start();
}
}
时间轮
原理
时间轮算法可以类比于时钟,如上图箭头(指针)按某一个方向按固定频率轮动,每一次跳动称为一个 tick。
这样可以看出定时轮由个3个重要的属性参数,ticksPerWheel(一轮的tick数),tickDuration(一个tick的持续时间)以及 timeUnit(时间单位)
例如当ticksPerWheel=60,tickDuration=1,timeUnit=秒,这就和现实中的始终的秒针走动完全类似了。
如果当前指针指在1上面,我有一个任务需要4秒以后执行,那么这个执行的线程回调或者消息将会被放在5上。
那如果需要在20秒之后执行怎么办,由于这个环形结构槽数只到8,如果要20秒,指针需要多转2圈。位置是在2圈之后的5上面(20 % 8 + 1)
实现:Netty的HashedWheelTimer
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.Timer;
import io.netty.util.TimerTask;
import java.util.concurrent.TimeUnit;
public class test1 {
static class MyTimerTask implements TimerTask {
boolean flag;
public MyTimerTask(boolean flag) {
this.flag = flag;
}
public void run(Timeout timeout) throws Exception {
System.out.println("要去数据库删除订单了。。。。");
this.flag = false;
}
}
public static void main(String[] argv) {
MyTimerTask timerTask = new MyTimerTask(true);
Timer timer = new HashedWheelTimer();
//此处设置在时间轮第几个执行(本代码设置为第3格)
timer.newTimeout(timerTask, 3, TimeUnit.SECONDS);
int i = 1;
while (timerTask.flag) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(i +"秒过去了");
i++;
}
}
}
自定义实现
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class DelayExecUtil {
private static final int CORE_POOL_SIZE = 2;
private static final int MAX_POOL_SIZE = 20;
private static final int QUEUE_CAPACITY = 1000;
private static final long KEEP_ALIVE_TIME = 60;
private static final BlockingQueue<Runnable> DELAY_TASK_QUEUE = new ArrayBlockingQueue<>(QUEUE_CAPACITY);
private static final int LENGTH = 3600;
private static final Set<DelayTaskContext>[] TIMERS = new Set[LENGTH];
private static Set<DelayTaskContext> currentDelayTaskSet;
private static List<DelayTaskContext> executeDelayTaskList = new ArrayList<>();
private static final long DELAY = 3 * 1000l;
private static final long PERIOD = 1 * 1000l;
private static int currentTimeIndex = 0;
private static ThreadPoolExecutor executor;
// 这种场景应该写少读多的场景
private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private static void indexIncrease() {
currentTimeIndex = (currentTimeIndex + 1) % LENGTH;
}
private static class DelayTaskContext {
public int cycle;
public DelayTask delayTask;
}
public interface DelayTask {
void execute();
}
private static final Timer TIMER = new Timer("delay-time-main");
static {
// 创建执行线程
executor = new ThreadPoolExecutor(CORE_POOL_SIZE
, MAX_POOL_SIZE
, KEEP_ALIVE_TIME
, TimeUnit.SECONDS
, DELAY_TASK_QUEUE
, r -> new Thread(r, "delay-time-handle")
// 当线程池已达到maxPoolSize之后,不在新线程中执行任务,而是由调用者所在线程来执行,即异步变同步
, new ThreadPoolExecutor.CallerRunsPolicy());
// 启动定时器
TIMER.schedule(new TimerTask() {
@Override
public void run() {
try {
lock.readLock().lock();
indexIncrease();
currentDelayTaskSet = TIMERS[currentTimeIndex];
log.info("index:" + currentTimeIndex);
if (currentDelayTaskSet != null && currentDelayTaskSet.size() > 0) {
executeDelayTaskList = new ArrayList<>();
for (DelayTaskContext task : currentDelayTaskSet) {
if (task.cycle == 0) {
executeDelayTaskList.add(task);
}
if (task.cycle > 0) {
task.cycle--;
}
}
// 清理可执行任务
if (executeDelayTaskList.size() > 0) {
currentDelayTaskSet.removeAll(executeDelayTaskList);
for (DelayTaskContext taskContext : executeDelayTaskList) {
executor.execute(() -> {
taskContext.delayTask.execute();
});
}
executeDelayTaskList.clear();
}
}
} finally {
lock.readLock().unlock();
}
}
}, DELAY, PERIOD);
}
/**
* 添加延迟任务
*
* @param delayTask 任务对象
* @param delay 延迟多少s 会给3s的延迟
*/
public static void addDelayTask(DelayTask delayTask, int delay) {
if (delay < 0) {
throw new IllegalArgumentException("Negative delay.");
}
if (delayTask == null) {
throw new IllegalArgumentException("Empty task.");
}
try {
lock.writeLock().lock();
int cycle = delay / LENGTH;
int remainder = delay % LENGTH;
remainder = remainder + currentTimeIndex + 1;
if (TIMERS[remainder] == null) {
TIMERS[remainder] = new HashSet<>();
}
DelayTaskContext task = new DelayTaskContext();
task.cycle = cycle;
task.delayTask = delayTask;
log.info("add task cycle:{},remainder:{},delay:{}", cycle, remainder, delay);
TIMERS[remainder].add(task);
} finally {
lock.writeLock().unlock();
}
}
public static void main(String[] args) throws InterruptedException {
Thread.sleep(1000 * 5);
DelayExecUtil.addDelayTask(() -> {
log.info("task execute");
}, 3);
}
}
RocketMQ延迟消息
参考:https://help.aliyun.com/zh/apsaramq-for-rocketmq/cloud-message-queue-rocketmq-4-x-series/developer-reference/send-and-receive-scheduled-messages-and-delayed-messages
import java.util.Date;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class RocketMQProducer {
/**
* 替换为您阿里云账号的AccessKey ID和AccessKey Secret。
* 请确保环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET已设置。
*/
private static RPCHook getAclRPCHook() {
return new AclClientRPCHook(new SessionCredentials(System.getenv("ALIBABA_CLOUD_ACCESS_KEY_ID"),
System.getenv("ALIBABA_CLOUD_ACCESS_KEY_SECRET")));
}
public static void main(String[] args) throws MQClientException {
/**
*创建Producer,并开启消息轨迹。设置为您在阿里云消息队列RocketMQ版控制台创建的Group ID。
*如果不想开启消息轨迹,可以按照如下方式创建:
*DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook());
*/
DefaultMQProducer producer = new DefaultMQProducer("YOUR GROUP ID", getAclRPCHook(), true, null);
/**
*设置使用接入方式为阿里云,在使用云上消息轨迹的时候,需要设置此项,如果不开启消息轨迹功能,则运行不设置此项。
*/
producer.setAccessChannel(AccessChannel.CLOUD);
/**
*设置为您从阿里云消息队列RocketMQ版控制台获取的接入点信息,类似“http://MQ_INST_XXXX.aliyuncs.com:80”。
*/
producer.setNamesrvAddr("YOUR ACCESS POINT");
producer.start();
for (int i = 0; i < 128; i++) {
try {
/*设置为您在消息队列RocketMQ版控制台创建的Topic。*/
Message msg = new Message("YOUR TOPIC",
/*设置消息的Tag。*/
"YOUR MESSAGE TAG",
/*消息内容。*/
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
/*发送延时消息,需要设置延时时间,单位毫秒(ms),消息将在指定延时时间后投递,例如消息将在3秒后投递。*/
longdelayTime = System.currentTimeMillis() + 3000;
msg.putUserProperty("__STARTDELIVERTIME", String.valueOf(delayTime));
/**
*若需要发送定时消息,则需要设置定时时间,消息将在指定时间进行投递,例如消息将在2021-08-10 18:45:00投递。
*定时时间格式为:yyyy-MM-dd HH:mm:ss,若设置的时间戳在当前时间之前,则消息将被立即投递给Consumer。
* longtimeStamp=newSimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2021-08-10 18:45:00").getTime();
* msg.putUserProperty("__STARTDELIVERTIME",String.valueOf(timeStamp));
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
//消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理。
System.out.println(new Date() + " Send mq message failed.");
e.printStackTrace();
}
}
//在应用退出前,销毁Producer对象。
//注意:如果不销毁也没有问题。
producer.shutdown();
}
}
Redis实现
zset
利用redis的zset,zset是一个有序集合,每一个元素(member)都关联了一个score,通过score排序来取集合中的值
Keyspace Notifications键空间机制
使用redis的Keyspace Notifications(键空间机制),就是利用该机制可以在key失效之后,提供一个回调,实际上是redis会给客户端发送一个消息。
参考博客
Java延时消息的实现
https://blog.csdn.net/jam_yin/article/details/131001180
1分钟实现“延迟消息”功能
https://www.w3cschool.cn/architectroad/architectroad-delay-message.html