一、接着上文
RDelayedQueue作为redisson封装的一个分布式延迟队列,直接拿来使用还是比较简单的。
本文主要包括以下几部分:
- 保存至延迟队列(生产者)
- 读取延迟队列(消费者)
- 从延迟队列移除任务
二、redission配置
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.redisson.config.SingleServerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* Redisson配置类
*
* @author xxx
*/
@Configuration
public class RedissonConfig {
@Value("${spring.application.name}")
private String serverName;
@Bean
public RedissonClient redissonClient(RedisProperties redisProperties) {
Config config = new Config();
SingleServerConfig singleServerConfig = config.useSingleServer();
singleServerConfig.setAddress("redis://" + redisProperties.getHost() + ":" + redisProperties.getPort());
singleServerConfig.setPassword(redisProperties.getPassword());
singleServerConfig.setKeepAlive(true);
singleServerConfig.setDatabase(redisProperties.getDatabase());
singleServerConfig.setConnectionMinimumIdleSize(2);
singleServerConfig.setConnectionPoolSize(4);
singleServerConfig.setClientName(serverName);
return Redisson.create(config);
}
}
spring:
application:
name: delay-task-service
redis:
host: 192.168.8.18
port: 6379
database: 0
timeout: 3000
三、保存至延迟队列(生产者)
作为延迟任务的生产者,你需要根据预期的回调时间,计算出delay延迟时间。
伪代码见下:
public static final String REDISSON_QUEUE_NAME = "DelayTaskQueue";
private final RedissonClient redissonClient;
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(REDISSON_QUEUE_NAME);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
long delay = DateUtil.between(event.getNotifyDate(), new DateTime(), DateUnit.SECOND);
delayedQueue.offer(event.getTransNo(), delay < 0 ? 1 : delay, TimeUnit.SECONDS);
四、读取延迟队列(消费者)
public static final String REDISSON_QUEUE_NAME = "DelayTaskQueue";
private final RedissonClient redissonClient;
@PostConstruct
public void init() {
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>())
.execute(() -> {
while (true) {
try {
RBlockingDeque<String> blockingDeque = redissonClient.getBlockingDeque(REDISSON_QUEUE_NAME);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
String transNo = blockingDeque.take();
if (null == transNo) {
return;
}
if (log.isInfoEnabled()) {
log.info("开始执行延迟队列中的任务,transNo={}", transNo);
}
// 异步执行你的操作
notifyTaskService.handleTask(transNo, null);
} catch (Exception e) {
log.error("延时队列的任务执行出现异常", e);
}
}
});
}
五、从延迟队列移除任务
public static final String REDISSON_QUEUE_NAME = "DelayTaskQueue";
private final RedissonClient redissonClient;
RBlockingQueue<String> blockingQueue = redissonClient.getBlockingQueue(REDISSON_QUEUE_NAME);
RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
delayedQueue.remove(transNo);
六、总结
本文主要是摘要一些源码,仅供参考。
附:相关系列文章链接
延时任务通知服务的设计及实现(一)-- 设计方案
延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue
延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue
延时任务通知服务的设计及实现(四)-- webhook执行任务