基于redisson实现延时队列解耦业务

news2024/11/20 23:34:48

前言

      今天跟大家分享的是一个基于redisson实现的延时队列,有个初版的封装工具,使用者只用关心延时时间到了取到的数据处理(或者之前处理,到时间只做剩下的业务),废话不多说,直接上货。


一、业务场景

      这里是对物联网设备做数据模拟上报。看下原型转化后的需求界面吧。
在这里插入图片描述

二、实现思路

1、实现其实有很多方案:

  1. 用timer实现
  2. 用java提供的队列实现
  3. redis实现
  4. redission实现

      最简单的直接用timer都可以做,我是想到这个延时队列以后还有其他场景使用,让其他开发小伙伴只用关心业务,所以基于redisson实现,封装延时队列工具类。

2、业务流程图

我自己画的简单流程图:
在这里插入图片描述

三、核心代码

1.redisson引入与配置

      这个我之前有写,这里就不重复了

2.延时队列工具

import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.redisson.api.RBlockingDeque;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
 * redisson实现的延时队列
 *
 *
 * @author zwmac
 */
@Slf4j
@Component
public class RedissonDelayQueue {
    @Autowired
    private RedissonClient redissonClient;

    /**
     * 添加任务到延时队列里面
     *
     * @param queueName 队列名称
     * @param data      数据
     * @param delayTime 延时时间,单位秒
     */
    public void addTaskToDelayQueue(String queueName,JSONObject data,Long delayTime) {
        if(StringUtils.isNotBlank(queueName)){
            RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
            RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
            delayedQueue.offer(data, delayTime, TimeUnit.SECONDS);

        }
    }

    /**
     * 删除延时队列
     * @param queueName 队列名称
     */
    public void delDelayQueue(String queueName) {
        if(StringUtils.isNotBlank(queueName)){
            RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
            RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);

            blockingDeque.clear();
            blockingDeque.delete();
            delayedQueue.clear();
            delayedQueue.destroy();

        }
    }

    /**
     * 判断队列是否存在
     * @param queueName 队列名称
     * @return true 存在,false 不存在
     */
    public boolean hasQueue(String queueName) {
        RBlockingDeque<JSONObject> blockingDeque =  redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        if (blockingDeque.isExists() && delayedQueue.isExists() && !delayedQueue.isEmpty()){
            return true;
        }
        return false;
    }

    /**
     * 队列消费者
     * @param consumer 消费者
     * @param queueName 队列名称
     */
    public void queueConsumer( Consumer consumer, String queueName){
        new Thread(() -> {
            while (true){
                try {
                    JSONObject data = this.takeFromDelayQueue(queueName);
                    if (data != null){
                        //消费接口
                        consumer.accept(data);
                        RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
                        RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
                        if (blockingDeque.isEmpty() && delayedQueue.isEmpty()){
                            //所有数据已经轮训完毕,删除队列
                            this.delDelayQueue(queueName);
                            //结束线程
                            log.info("队列名称:{},延时元素消费完成,退出释放线程",queueName);
                            break;
                        }
                    }
                } catch (Exception e) {
                    //e.printStackTrace();
                    //退出,释放线程
                    log.info("队列名称:{},退出线程释放,原因:{}",queueName,e.getMessage());
                    break;
                }

            }

        },queueName + "-Customer").start();
    }

    /**
     * 从延时队列里面取出数据
     * @param queueName 队列名称
     * @return 队列元素json对象
     * @throws Exception 异常
     */
    public JSONObject takeFromDelayQueue(String queueName) throws Exception {
        RBlockingDeque<JSONObject> blockingDeque = redissonClient.getBlockingDeque(queueName);
        RDelayedQueue<JSONObject> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
        JSONObject jsonObject = null;
        try {
            //log.info("--队列名称:{},blockingDeque数量:{},delayedQueue数量:{}",queueName,blockingDeque.size(),delayedQueue.size());
            if (blockingDeque.isExists()){
                log.info("--出队列前--队列名称:{},当前队列大小:{}",queueName,blockingDeque.size());
                jsonObject = blockingDeque.take();
                log.info("--出队列后--队列名称:{},当前队列大小:{}",queueName,blockingDeque.size());
            }
            /** 这里处理早了,还没有消费就销毁了,会导致消费数据差一条
             if (blockingDeque.isEmpty() && delayedQueue.isEmpty()){
                //所有数据已经轮训完毕,删除队列
                this.delDelayQueue(queueName);
                //结束线程
                //Thread.currentThread().interrupt();
                throw new RuntimeException("所有数据已经轮训完毕,删除队列");
            }**/
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return jsonObject;
    }
}

      里面有关于线程销毁注释了一段,有兴趣的可以看看,为什么销毁不在那里处理,当然原因我也写在注释里了的。

2.使用

@Resource
  private RedissonDelayQueue redissonDelayQueue;

private MessageInfo historyReceive(JSONObject jsonObject, String identify) {
    //从ES查询设备的历史数据
    List<JSONObject> historyData = searchHisFromEs(nbDeviceId, startTime, endTime,logMarkId);

    //查询该设备是否有重放队列在执行
    String hisRetryQueueKey = "hisRetryQueueKey-" + nbDeviceId;
    if(redissonDelayQueue.hasQueue(hisRetryQueueKey)){
      //有重放队列在执行,删除原队列
      redissonDelayQueue.delDelayQueue(hisRetryQueueKey);
    }

    //放到延时队列
    if (CollectionUtil.isNotEmpty(historyData)) {
      queueConsumer(redissonDelayQueue,nbDeviceId,logMarkId,identify,hisRetryQueueKey);
      for (int i = 0; i < historyData.size(); i++) {
        JSONObject data = historyData.get(i);
        Long interval = 2L;
        if (i > 0){
          interval = Long.valueOf(intervalTime * i) + interval;
        }
        redissonDelayQueue.addTaskToDelayQueue(hisRetryQueueKey,data,interval);
      }
    }

    return new MessageInfo(0, "success");
  }
/**延时数据业务处理
**/
private void queueConsumer(RedissonDelayQueue redissonDelayQueue, String nbDeviceId, String logMarkId, String identify, String hisRetryQueueKey) {
    //消费延时队列数据
    redissonDelayQueue.queueConsumer(data -> {
      //重放数据做数据重新组织后,直接放到解析完成的队列
      log.info("时间:{}---重放数据:{}", DateUtil.now(),data);
      //业务处理
      

    },hisRetryQueueKey);
  }

      我这里是在从延时队列取到元素后做的一些业务操作,如果没有一些下游级联操作,其实可以在放入队列的for循环里做,真正到时间了,再做一些简单的业务也可以。
      可以看出,现在使用就只需要处理for循环放入延时队列,queueConsumer消费处理延时到期的业务。

3.效果

在这里插入图片描述


总结

  1. 解耦,让开发只用关注业务
  2. 基于redisson不用太关注redis底层实现,这里可以理解就是2个队列,一个未到期队列、一个到期队列,随着时间的推移redisson帮我们实现从未到期移动数据到到期,我们只用管从到期取到数据的操作
  3. 封装还很粗糙,还有进步空间
    就分享到这,希望能帮到大家,uping!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/927348.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

目前电视盒子哪个最好?2023公认最佳电视盒子排名TOP5

电视盒子作为我们看片必备&#xff0c;功能更加丰富&#xff0c;看视频、玩游戏、K歌、上网课等都能实现&#xff0c;新手们在下单前会参考排行榜&#xff0c;近期业内发布了公认最好的电视盒子排名前五&#xff0c;不懂目前电视盒子哪个最好可以从入围的品牌中选择&#xff1a…

IET独立出版 | EI检索 | 2023年第三届机械、航空航天与汽车工程国际会议

会议简介 Brief Introduction 2023年第三届机械、航空航天与汽车工程国际会议&#xff08;CMAAE 2023&#xff09; 会议时间&#xff1a;2023年12月8 -10日 召开地点&#xff1a;中国南京 大会官网&#xff1a;www.cmaae.org 航天是当今世界最具挑战性和广泛带动性的高技术领域…

RTSP流媒体服务器EasyNVR视频平台设备通道时间与服务器录像时间不一致的问题解决步骤

EasyNVR平台优秀的视频能力在于通过RTSP/ONVIF协议&#xff0c;将前端接入设备的音视频资源进行采集&#xff0c;并转码成适合全平台、全终端分发的视频流格式&#xff0c;包括RTMP、RTSP、FLV、HLS、WebRTC等格式。平台已经在智慧水利、智慧工厂、智慧校园、智慧仓储等场景中应…

【解决】idea启动spring MVC报错:一个或多个listeners启动失败Listener ClassNotFoundException

idea配置教程。tomcat调试报错Artifact :war exploded: Error during artifact deployment。 修改代码后&#xff0c;启动不生效&#xff0c;仍是旧代码。 根本原因是&#xff1a; Modules output path和Artifacts output directory不匹配 Modules output path一定要等于Ar…

软件测试框架实战:Python+Slenium搭建Web自动化测试框架全教程

PythonSelenium是一种流行的Web自动化测试框架&#xff0c;可以模拟真实的用户操作&#xff0c;对网页进行功能和样式的验证。要通过selenium测试网页&#xff0c;需要以下几个步骤&#xff1a; 安装selenium库和浏览器驱动 。 使用selenium提供的方法来控制浏览器窗口大小、后…

如何建立自己的微信公众号

微信公众号是生活中常见的一种媒体形式&#xff0c;可以通过注册的方式来建立自己的微信公众号。 怎么注册 微信公众号注册的具体步骤如下&#xff1a; 1、在浏览器中搜索微信公众号&#xff0c;接着单击进入微信公众平台。 2、进入微信公众平台官网界面&#xff0c;接着单击…

【Acwing291】蒙德里安的梦想(状态压缩dp)详细讲解

题目描述 题目分析 显而易见的重要事实 首先&#xff0c;需要明白一个很重要的事实&#xff1a; 所有的摆放方案数所有横着摆放且合理的方案数 这是因为&#xff0c;横着的确定之后&#xff0c;竖着的一定会被唯一确定&#xff0c;举一个例子&#xff1a; ------唯一确定-…

自动化测试定位不到元素?可能是 frame 在搞鬼

很多人在用Splinter或Selenium定位页面元素的时候会遇到定位不到的问题&#xff0c;明明元素就在那儿&#xff0c;就是定位不到&#xff0c;这种情况很有可能是frame在搞鬼。 说白了就是网站上的网页A&#xff0c;又嵌入了其他网页B。你访问了网页A&#xff0c;在里面可以看到…

4G智慧电力物联网:建设高效智能,引领电力行业革新!

随着4G与物联网技术的快速发展为电力行业提供了更高效、可靠、智能化的解决方案。本文中智联物联将为大家分享智慧电力系统中的一些关键的物联网技术和通讯设备&#xff0c;如工业4G路由器、分布式发电站、数据采集传输、远程监控管理以及变电站监测。 光伏发电站是电力行业中重…

Golang struct 结构体注意事项和使用细节

结构体所有字段在内存当中是连续的 type Point struct {x, y int }type Rect struct {leftUp, rightDown Point }func main() {//r1会在内存当中有四个整数r1 : Rect{leftUp: Point{x: 1,y: 2,},rightDown: Point{x: 3,y: 4,},}//r1有四个int&#xff0c;在内存当中是连续分布的…

Vue2-快速搭建pc端后台管理系统

一.推荐二次开发框架 vue-element-admin Star(84k)vue-antd-admin Star(3.5k) 二.vue-element-admin 官网链接:https://panjiachen.github.io/vue-element-admin-site/zh/ 我这里搭建的是基础模版vue-admin-template(推荐) # 克隆项目 git clone https://github.com/PanJi…

使用多种工具进行JVM调优、线上故障排查的例子

1 FullGC调优 面试官&#xff1a;如何进行 JVM 调优&#xff08;附真实案例&#xff09; 2 使用arthas诊断案例 2.1 使用arthas确定某一个耗时的请求来自哪一个controller&#xff0c;并且分析以及代码优化 2.1.1 为什么要做第一步的“确定请求来源的controller”&#xff…

切换Debian的crontab的nano编辑器

Debian的crontab默认的编辑器是nano&#xff0c;用起来很不习惯,怎么才能转回vim呢? 用以下命令便可&#xff1a; #update-alternatives --config editor 出现以下所示的界面&#xff1a; 而后选择8使用/usr/bin/vim就能够了。 PS&#xff1a;若是你发现你的定时没有生效&…

全新土地销售活动 Turkishverse——在数字十字路口占据一席之地

准备好与来自该地区的众多世界知名合作伙伴一起探索土耳其文化和历史吧&#xff01; 简单介绍 ● 在这个弘扬土耳其文化和历史的新社区中&#xff0c;共有 433 块 LAND 可供出售&#xff0c;其中包括 □ 380 块标准 LAND □ 48 块优质 LAND □ 5 个 Estate ● LAND 销售抽…

通达信唐奇安通道指标公式,海龟交易法则的先驱

唐奇安通道&#xff08;Donchian Channel&#xff09;是由Richard Donchian发明的技术分析指标&#xff0c;用于确定价格的趋势和波动。著名的海龟交易法则就是基于唐奇安通道设计的&#xff0c;将通道作为交易系统的一部分&#xff0c;用于捕捉趋势信号。唐奇安通道由三条线组…

【腾讯云Cloud Studio实战训练营】React 快速构建点餐页面+Python 拼图小游戏

文章目录 一、腾讯云 Cloud Studio 概述1.1 腾讯云 Cloud Studio 简介1.2 腾讯云 Cloud Studio 功能特点1.3 腾讯云 Cloud Studio 产品优势 二、Cloud Studio界面功能介绍2.1 注册登录2.1.1 新注册用户有免费的3000分钟体验 2.2 界面功能介绍2.2.1 空间模板2.2.2 开发空间关闭空…

二甲医院信息管理系统源码 his系统源码 java+Angular+JavaScript

云HIS系统采用SaaS软件应用服务模式&#xff0c;提供软件应用服务多租户机制&#xff0c;实现一中心部署多机构使用。主要包含收费计费、药品管理、门诊医生工作站、住院医生工作站、护士工作站、数据统计、电子病历、医保接口等功能&#xff0c;能够满足医院及诊所日常业务开展…

VR智慧课堂 | 临床兽医学VR实验教学有哪些好处?

随着科技的不断发展&#xff0c;虚拟现实(VR)技术已经逐渐渗透到各个领域&#xff0c;为人们带来了前所未有的体验。在动物医学实验教学中&#xff0c;VR技术的应用也日益受到关注。本文将探讨临床兽医学VR实验教学的好处。 首先&#xff0c;VR技术能够提高动物医学实验的安全性…

常用数据库备份方法,sql数据库备份方法

在信息时代&#xff0c;数据成为了公司的主要资产。然而&#xff0c;数据的安全性和完整性也成为企业管理的重要组成部分。因此&#xff0c;数据库备份至关重要。本文将详细介绍几种常见的数据库备份方法。 全备份 全备份是指数据库中所有数据的备份&#xff0c;包括数据文件、…

为什么要使用依赖注入?直接new对象不香吗?为什么要把简单的问题复杂化?

作者&#xff1a;newki 为什么要使用依赖注入&#xff1f;直接new对象不香吗&#xff1f;为什么要把简单的问题复杂化&#xff1f; 你是不是在炫技,是不是像装13&#xff1f; 这还真不是&#xff0c;如果说我使用的Dagger2&#xff0c;还真是炫技&#xff0c;NB啊。Dagger的坑…