前言
今天跟大家分享的是一个基于redisson实现的延时队列,有个初版的封装工具,使用者只用关心延时时间到了取到的数据处理(或者之前处理,到时间只做剩下的业务),废话不多说,直接上货。
一、业务场景
这里是对物联网设备做数据模拟上报。看下原型转化后的需求界面吧。
二、实现思路
1、实现其实有很多方案:
- 用timer实现
- 用java提供的队列实现
- redis实现
- redission实现
最简单的直接用timer都可以做,我是想到这个延时队列以后还有其他场景使用,让其他开发小伙伴只用关心业务,所以基于redisson实现,封装延时队列工具类。
2、业务流程图
我自己画的简单流程图:
三、核心代码
1.redisson引入与配置
这个我之前有写,这里就不重复了
2.延时队列工具
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* redisson实现的延时队列
*
*
* @author zwmac
*/
@Slf4j
@Component
public class RedissonDelayQueue {
@Autowired
private RedissonClient redissonClient;
/**
* 添加任务到延时队列里面
*
* @param queueName 队列名称
* @param data 数据
* @param delayTime 延时时间,单位秒
*/
public void addTaskToDelayQueue(String queueName,JSONObject data,Long delayTime) {
if(StringUtils.isNotBlank(queueName)){
RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
delayedQueue.offer(data, delayTime, TimeUnit.SECONDS);
}
}
/**
* 删除延时队列
* @param queueName 队列名称
*/
public void delDelayQueue(String queueName) {
if(StringUtils.isNotBlank(queueName)){
RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
blockingDeque.clear();
blockingDeque.delete();
delayedQueue.clear();
delayedQueue.destroy();
}
}
/**
* 判断队列是否存在
* @param queueName 队列名称
* @return true 存在,false 不存在
*/
public boolean hasQueue(String queueName) {
RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
if (blockingDeque.isExists() && delayedQueue.isExists() && !delayedQueue.isEmpty()){
return true;
}
return false;
}
/**
* 队列消费者
* @param consumer 消费者
* @param queueName 队列名称
*/
public void queueConsumer( Consumer consumer, String queueName){
new Thread(() -> {
while (true){
try {
JSONObject data = this.takeFromDelayQueue(queueName);
if (data != null){
//消费接口
consumer.accept(data);
RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
if (blockingDeque.isEmpty() && delayedQueue.isEmpty()){
//所有数据已经轮训完毕,删除队列
this.delDelayQueue(queueName);
//结束线程
log.info("队列名称:{},延时元素消费完成,退出释放线程",queueName);
break;
}
}
} catch (Exception e) {
//e.printStackTrace();
//退出,释放线程
log.info("队列名称:{},退出线程释放,原因:{}",queueName,e.getMessage());
break;
}
}
},queueName + "-Customer").start();
}
/**
* 从延时队列里面取出数据
* @param queueName 队列名称
* @return 队列元素json对象
* @throws Exception 异常
*/
public JSONObject takeFromDelayQueue(String queueName) throws Exception {
RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
JSONObject jsonObject = null;
try {
//log.info("--队列名称:{},blockingDeque数量:{},delayedQueue数量:{}",queueName,blockingDeque.size(),delayedQueue.size());
if (blockingDeque.isExists()){
log.info("--出队列前--队列名称:{},当前队列大小:{}",queueName,blockingDeque.size());
jsonObject = blockingDeque.take();
log.info("--出队列后--队列名称:{},当前队列大小:{}",queueName,blockingDeque.size());
}
/** 这里处理早了,还没有消费就销毁了,会导致消费数据差一条
if (blockingDeque.isEmpty() && delayedQueue.isEmpty()){
//所有数据已经轮训完毕,删除队列
this.delDelayQueue(queueName);
//结束线程
//Thread.currentThread().interrupt();
throw new RuntimeException("所有数据已经轮训完毕,删除队列");
}**/
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return jsonObject;
}
}
里面有关于线程销毁注释了一段,有兴趣的可以看看,为什么销毁不在那里处理,当然原因我也写在注释里了的。
2.使用
@Resource
private RedissonDelayQueue redissonDelayQueue;
private MessageInfo historyReceive(JSONObject jsonObject, String identify) {
//从ES查询设备的历史数据
List<JSONObject> historyData = searchHisFromEs(nbDeviceId, startTime, endTime,logMarkId);
//查询该设备是否有重放队列在执行
String hisRetryQueueKey = "hisRetryQueueKey-" + nbDeviceId;
if(redissonDelayQueue.hasQueue(hisRetryQueueKey)){
//有重放队列在执行,删除原队列
redissonDelayQueue.delDelayQueue(hisRetryQueueKey);
}
//放到延时队列
if (CollectionUtil.isNotEmpty(historyData)) {
queueConsumer(redissonDelayQueue,nbDeviceId,logMarkId,identify,hisRetryQueueKey);
for (int i = 0; i < historyData.size(); i++) {
JSONObject data = historyData.get(i);
Long interval = 2L;
if (i > 0){
interval = Long.valueOf(intervalTime * i) + interval;
}
redissonDelayQueue.addTaskToDelayQueue(hisRetryQueueKey,data,interval);
}
}
return new MessageInfo(0, "success");
}
/**延时数据业务处理
**/
private void queueConsumer(RedissonDelayQueue redissonDelayQueue, String nbDeviceId, String logMarkId, String identify, String hisRetryQueueKey) {
//消费延时队列数据
redissonDelayQueue.queueConsumer(data -> {
//重放数据做数据重新组织后,直接放到解析完成的队列
log.info("时间:{}---重放数据:{}", DateUtil.now(),data);
//业务处理
},hisRetryQueueKey);
}
我这里是在从延时队列取到元素后做的一些业务操作,如果没有一些下游级联操作,其实可以在放入队列的for循环里做,真正到时间了,再做一些简单的业务也可以。
可以看出,现在使用就只需要处理for循环放入延时队列,queueConsumer消费处理延时到期的业务。
3.效果
总结
- 解耦,让开发只用关注业务
- 基于redisson不用太关注redis底层实现,这里可以理解就是2个队列,一个未到期队列、一个到期队列,随着时间的推移redisson帮我们实现从未到期移动数据到到期,我们只用管从到期取到数据的操作
- 封装还很粗糙,还有进步空间
就分享到这,希望能帮到大家,uping!