延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

news2025/1/8 0:26:46

一、接着上文

上文我们讲述了使用redisson的RDelayedQueue实现分布式延迟队列,本文我们将自己JDK的延迟队列DelayQueue实现。

相比前者的实现,作为进程内的延迟队列,它会遇到许多技术难点:

  • 如何支持分布式的多个节点部署场景
  • 应用重启会恢复延时队列
  • 冷数据如何转换为热数据
  • 如何删除延迟队列中的任务

随后,我们也将提及:

  • 保存任务至延迟队列(生产者)
  • 读取延迟队列中的任务(消费者)

二、设计概要

在这里插入图片描述

  • 冷数据:mysql表中的任务数据

  • 热数据:jdk 延迟队列中的任务

  • 广播事件:删除延迟队列中的任务,发布的是广播事件,可以使用redis topic实现。

  • 本地事件:分布式多节点部署的时候,每个任务只保存在其中一个节点的延迟队列中,可以使用spring事件驱动实现。

  • 延迟队列 DelayQueueJob, 它实现了接口Delayed

包括任务的交易流水号和过期时间(即任务的回调时间)

import lombok.Builder;
import lombok.Data;

import java.util.Date;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @author xxx
 */
@Builder
@Data
public class DelayQueueJob implements Delayed {
    /**
     * 交易流水号
     */
    private String transNo;
     /**
     * 到期时间
     */
    private Date expireDate;

    public DelayQueueJob(String transNo, Date expireDate) {
        super();
        this.transNo = transNo;
        this.expireDate = expireDate;
    }

    /**
     * 用于队列中排序过期时间
     *
     * @param o
     * @return
     */
    @Override
    public int compareTo(Delayed o) {
        return Long.valueOf(this.expireDate.getTime())
                .compareTo(Long.valueOf(((DelayQueueJob) o).expireDate.getTime()));
    }

    /**
     * 用于获取过期时间
     * 延迟关闭时间 = 过期时间 - 当前时间
     *
     * @param unit
     * @return
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return this.expireDate.getTime() - System.currentTimeMillis();
    }
}

三、应用启动流程

解决恢复延迟队列的问题。因为DelayQueue是进程内的,一旦重启,将被销毁。

在这里插入图片描述

import cn.hutool.core.date.DateTime;
import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.util.List;
import java.util.concurrent.TimeUnit;


@Slf4j
@Service
@RequiredArgsConstructor
public class ApplicationStartupListener implements ApplicationListener<ApplicationReadyEvent> {

    @Override
    public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent) {
        // 实现代码参考上面的流程图
    }
}

四、定时任务流程

解决冷数据如何转换为热数据的问题,防止延时任务过多导致消耗过多的jvm内存,所以只有回调时间将近的任务才放入延迟队列。

在这里插入图片描述

五、如何删除延迟队列中的任务

删除延迟队列的任务:发送广播消息通知所有的节点,当不是当前节点的时候,执行删除。

if (!NetUtil.getLocalhostStr().equals(ipAddress)) {
    DelayQueueSingleton.getDelayQueue().remove(transNo);
}

DelayQueueSingletons是一个单例类,详见下:

public class DelayQueueSingleton {
    private static volatile CustomDelayQueue<DelayQueueJob> delayQueue;

    private DelayQueueSingleton() {
    }

    public static CustomDelayQueue<DelayQueueJob> getDelayQueue() {
        if (delayQueue == null) {
            synchronized (DelayQueueSingleton.class) {
                if (delayQueue == null) {
                    delayQueue = new CustomDelayQueue<>();
                }
            }
        }
        return delayQueue;
    }

}

这里为了删除延迟队列的任务,我们对DelayQueue进行了重写。


import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;

public class CustomDelayQueue<T extends Delayed> {
    private final DelayQueue<T> queue = new DelayQueue<>();
    private final Map<String, T> map = new ConcurrentHashMap<>();

    public boolean put(T task, String taskId) {
        // 如果任务已存在,则删除旧任务,防止重复添加
        this.remove(taskId);

        map.put(taskId, task);
        return queue.add(task);
    }

    public boolean remove(String taskId) {
        // 先删除map,再删除queue
        T task = map.remove(taskId);
        if (task != null) {
            return queue.remove(task);
        }
        return false;
    }

    public T take() throws InterruptedException {
        return queue.take();
    }
}

六、保存任务至延迟队列(生产者)


// 如果通知时间在一定时间范围内
if (DateUtil.offsetMinute(new DateTime(), commonConfig.getHotDataTimeLine()).after(event.getNotifyDate())) {

    DelayQueueSingleton.getDelayQueue().put(DelayQueueJob.builder()
            .transNo(event.getTransNo())
            .expireDate(event.getNotifyDate())
            .build(), event.getTransNo());

}

七、读取延迟队列中的任务(消费者)

作为延迟队列的消费者,它的实现和上一篇文章实现类似。不同的是take()获取任务不一样。

String transNo = null;
Date notifyDate = null;
                            
DelayQueueJob job = DelayQueueSingleton.getDelayQueue().take();
if (null != job) {
    transNo = job.getTransNo();
    notifyDate = job.getExpireDate();
}

if (null == transNo) {
    return;
}

if (log.isInfoEnabled()) {
    log.info("开始执行延迟队列中的任务,transNo={},notifyDate={}", transNo, notifyDate);
}

// 异步执行你的操作
notifyTaskService.handleTask(transNo, notifyDate);

八、总结

作为进程内的延迟队列,在多点部署的分布式集群环境下, 代码明显比上一篇要复杂得多。

它们都需要的步骤是:

  • 任务的生产
  • 任务的消费
  • 移除任务

DelayQueue额外多出来的步骤是:

  • 应用启动的时候拉取回调时间将近的未完成任务(更新marked标记为true,防止重复拉取冷数据)
  • 定时拉取未标记且回调时间将近的未完成任务(和上面必须是互斥,等待上一步执行完成,否则会导致重复拉取)
  • 删除延迟队列DelayQueue的任务,必须发布广播消息给全部节点。(引入广播消息机制)

由此可见,任务表的字段marked仅供DelayQueue使用,防止重复拉取数据库的任务到热数据区。

    @Column(name = "marked", nullable = false, columnDefinition = "TINYINT(1) default 0 COMMENT '是否已标记为热数据'")
    private Boolean marked;

附:相关系列文章链接

延时任务通知服务的设计及实现(一)-- 设计方案

延时任务通知服务的设计及实现(二)-- redisson的延迟队列RDelayedQueue

延时任务通知服务的设计及实现(三)-- JDK的延迟队列DelayQueue

延时任务通知服务的设计及实现(四)-- webhook执行任务

延时任务通知服务的设计及实现(五)-- Netty时间轮HashedWheelTimer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1647117.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue cli 自定义项目架子,vue自定义项目架子,超详细

脚手架Vue CLI基本介绍&#xff1a; Vue CLI 是Vue官方提供的一个全局命令工具 可以帮助我们快速创建一个开发Vue项目的标准化基础架子【集成了webpack配置】 脚手架优点&#xff1a; 开箱即用&#xff0c;零配置内置babel等工具标准化的webpack配置 脚手架 VueCLI相关命令…

Mac跑llama.cpp过程中遇到的问题

原repo 在华为手机上安装termux、下载库&#xff1a;顺利在电脑上安装Android NDK&#xff1a;先下载Android Studio&#xff0c;再在里面下载Android SDK 安装Android Studio时&#xff0c;SDK的某些组件总是下载不成功。后来关了梯子、改了hosts&#xff0c;重新安装就成功了…

汇编语言——比较两个字符串STRING1和STRING2所含字符是否完全相同,若相同则显示MATCH, 不相同则显示NO MATCH

CMPS 串比较指令&#xff1a; CMPS SRC, DST CMPSB &#xff08;字节&#xff09; CMPSW &#xff08;字&#xff09; 执行操作&#xff1a; ((SI)) - ((DI)) 根据比较结果置条件标志位&#xff1a;相等 ZF1&#xff1b;不等 ZF0 字节操作&#xff1a;(SI)←(SI)1, (DI)←(DI…

【算法系列】链表

目录 常用技巧 常用操作 leetcode/牛客题目 一、移除链表元素 二、反转链表 三、链表的中间结点 四、返回倒数第k个节点 五、合并两个有序链表 六、链表分割 七、链表的回文结构 八、相交链表 九、环形链表 十、环形链表 II 十一、随机链表的复制 十二、两数相加…

英语学习笔记2——Is this your ...?

Is this your …? 这是你的 … 吗&#xff1f; 词汇 Vocabulary pen n. 笔 不仅指钢笔&#xff0c;是笔的统称 相关&#xff1a;ball pen n. 圆珠笔    pencil n. 铅笔    marker n. 记号笔 book n. 书 横着翻的本子或书 补充&#xff1a;pad n. 本子 竖着翻的本子或…

在Codelab对llama3做Lora Fine tune微调

Unsloth 高效微调大模型的工具&#xff0c;通过Unsloth微调Llama3, Mistral, Gemma 速度提升2-5倍&#xff0c;内存减少70%&#xff01; Codelab 创建一个jupyter notebook 选择 T4 GPU 安装Fine tune 相关的lib %%capture import torch major_version, minor_version torch…

软考中级-软件设计师(九)数据库技术基础 考点最精简

一、基本概念 1.1数据库与数据库系统 数据&#xff1a;是数据库中存储的基本对象&#xff0c;是描述事物的符号记录 数据库&#xff08;DataBase&#xff0c;DB&#xff09;&#xff1a;是长期存储在计算机内、有组织、可共享的大量数据集合 数据库系统&#xff08;DataBas…

Navicat for MySQL Mac:数据库管理与开发的理想工具

Navicat for MySQL Mac是一款功能强大的数据库管理与开发工具&#xff0c;专为Mac用户设计&#xff0c;旨在提供高效、便捷的数据库操作体验。 它支持创建、管理和维护MySQL和MariaDB数据库&#xff0c;通过直观的图形界面&#xff0c;用户可以轻松进行数据库连接、查询、编辑和…

力扣:221. 最大正方形

221. 最大正方形 在一个由 0 和 1 组成的二维矩阵内&#xff0c;找到只包含 1 的最大正方形&#xff0c;并返回其面积。 示例 1&#xff1a; 输入&#xff1a;matrix [["1","0","1","0","0"],["1","0"…

数字藏品平台遭受科技攻击时的防护策略与攻击类型判定

随着区块链技术和数字经济的飞速发展&#xff0c;数字藏品平台逐渐成为炙手可热的投资领域。然而&#xff0c;这也使其成为了黑客攻击的重要目标。本文将深入探讨数字藏品平台可能遭遇的几种主要科技攻击类型&#xff0c;并提出相应的防护措施和判定方法。 一、51%攻击 攻击描…

太速科技-FMC377_双AD9361 射频收发模块

FMC377_双AD9361 射频收发模块 FEATURES&#xff1a; ◆ Coverage from 70M ~ 6GHz RF ◆ Flexible rate 12 bit ADC/DAC ◆ Fully-coherent 4x4 MIMO capability, TDD/FDD ◆ RF ports: 50Ω Matched ◆ support both internal reference and exter…

cmake进阶:变量的作用域说明二(从函数作用域方面)

一. 简介 前一篇文章从函数作用域方面学习了 变量的作用域。文章如下&#xff1a; cmake进阶&#xff1a;变量的作用域-CSDN博客 本文继续从函数作用域方面学习了 变量的作用域。 二. 变量的作用域 1. 函数内定义与外部同名的变量 向顶层 CMakeLists.txt添加如下代码&a…

leetcode-缺失的第一个正整数-96

题目要求 思路 1.这里的题目要求刚好符合map和unordered_map 2.创建一个对应map把元素添加进去&#xff0c;用map.find(res)进行查找&#xff0c;如果存在返回指向该元素的迭代器&#xff0c;否则返回map::end()。 代码实现 class Solution { public:int minNumberDisappeare…

项目管理-干系人管理

项目管理&#xff1a;每天进步一点点~ 活到老&#xff0c;学到老 ヾ(◍∇◍)&#xff89;&#xff9e; 何时学习都不晚&#xff0c;加油 1.干系人管理-主要框架 重点内容&#xff1a; ①ITTO 输入&#xff0c;输出工具和技术。 ②问题和解决方案。 ③论文可以结合范围&am…

[Linux][网络][TCP][三][超时重传][快速重传][SACK][D-SACK][滑动窗口]详细讲解

目录 1.超时重传1.什么是超时重传&#xff1f;2.超时时间是如何确定的&#xff1f; 2.快速重传3.SACK4.D-SACK1.ACK丢失2.网络延迟 5.滑动窗口0.问题抛出1.发送方的滑动窗口2.如何表示发送方的四个部分&#xff1f;3.接收方的滑动窗口4.滑动窗口的完善理解 1.超时重传 1.什么是…

14【PS作图】像素画尺寸大小

【背景介绍】本节介绍像素图多大合适 下图是160*144像素大小,有一个显示文本的显示器,还有一个有十几个键的键盘 像素画布尺寸 电脑16像素,但还有一个显示屏 下图为240*160 在场景素材,和对话素材中,用的是不同尺寸的头像,对话素材中的头像会更清楚,尺寸会更大 远处…

五月节放假作业讲解

目录 作业1&#xff1a; 问题&#xff1a; 结果如下 作业2&#xff1a; 结果: 作业1&#xff1a; 初始化数组 问题&#xff1a; 如果让数组初始化非0数会有问题 有同学就问了&#xff0c;我明明已经初始化定义过了&#xff0c;为啥还有0呀 其实这种初始化只会改变第一个…

服务攻防-数据库安全RedisCouchDBH2database未授权访问CVE漏洞

#知识点&#xff1a; 1、数据库-Redis-未授权RCE&CVE 2、数据库-Couchdb-未授权RCE&CVE 3、数据库-H2database-未授权RCE&CVE#章节点&#xff1a; 1、目标判断-端口扫描&组合判断&信息来源 2、安全问题-配置不当&CVE漏洞&弱口令爆破 3、复现对象-数…

强化学习:时序差分法【Temporal Difference Methods】

强化学习笔记 主要基于b站西湖大学赵世钰老师的【强化学习的数学原理】课程&#xff0c;个人觉得赵老师的课件深入浅出&#xff0c;很适合入门. 第一章 强化学习基本概念 第二章 贝尔曼方程 第三章 贝尔曼最优方程 第四章 值迭代和策略迭代 第五章 强化学习实例分析:GridWorld…

《Video Mamba Suite》论文笔记(4)Mamba在时空建模中的作用

原文翻译 4.4 Mamba for Spatial-Temporal Modeling Tasks and datasets.最后&#xff0c;我们评估了 Mamba 的时空建模能力。与之前的小节类似&#xff0c;我们在 Epic-Kitchens-100 数据集 [13] 上评估模型在zero-shot多实例检索中的性能。 Baseline and competitor.ViViT…