从实现到原理,总结11种延迟任务的实现方式(下)

news2024/11/23 22:16:13

7 监听Redis过期key

在Redis中,有个发布订阅的机制

在这里插入图片描述
生产者在消息发送时需要到指定发送到哪个channel上,消费者订阅这个channel就能获取到消息。图中channel理解成MQ中的topic。

并且在Redis中,有很多默认的channel,只不过向这些channel发送消息的生产者不是我们写的代码,而是Redis本身。这里面就有这么一个channel叫做__keyevent@__:expired,db是指Redis数据库的序号。

当某个Redis的key过期之后,Redis内部会发布一个事件到__keyevent@__:expired这个channel上,只要监听这个事件,那么就可以获取到过期的key。

所以基于监听Redis过期key实现延迟任务的原理如下:

  • 将延迟任务作为key,过期时间设置为延迟时间
  • 监听__keyevent@__:expired这个channel,那么一旦延迟任务到了过期时间(延迟时间),那么就可以获取到这个任务

7.1 demo

Spring已经实现了监听__keyevent@__:expired这个channel这个功能,__keyevent@__:expired中的*代表通配符的意思,监听所有的数据库。

所以demo写起来就很简单了,只需4步即可

依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.2.5.RELEASE</version>
</dependency>

配置文件

spring:
  redis:
    host: 192.168.200.144
    port: 6379

配置类

@Configuration
public class RedisConfiguration {

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
        redisMessageListenerContainer.setConnectionFactory(connectionFactory);
        return redisMessageListenerContainer;
    }

    @Bean
    public KeyExpirationEventMessageListener redisKeyExpirationListener(RedisMessageListenerContainer redisMessageListenerContainer) {
        return new KeyExpirationEventMessageListener(redisMessageListenerContainer);
    }

}

KeyExpirationEventMessageListener实现了对__keyevent@*__:expiredchannel的监听

在这里插入图片描述

当KeyExpirationEventMessageListener收到Redis发布的过期Key的消息的时候,会发布RedisKeyExpiredEvent事件

在这里插入图片描述
所以我们只需要监听RedisKeyExpiredEvent事件就可以拿到过期消息的Key,也就是延迟消息。

对RedisKeyExpiredEvent事件的监听实现MyRedisKeyExpiredEventListener

@Component
public class MyRedisKeyExpiredEventListener implements ApplicationListener<RedisKeyExpiredEvent> {

    @Override
    public void onApplicationEvent(RedisKeyExpiredEvent event) {
        byte[] body = event.getSource();
        System.out.println("获取到延迟消息:" + new String(body));
    }

}

代码写好,启动应用

之后我直接通过Redis命令设置消息,就没通过代码发送消息了,消息的key为sanyou,值为task,值不重要,过期时间为5s

set sanyou task 

expire sanyou 5

成功获取到延迟任务

在这里插入图片描述

7.2 存在问题

虽然这种方式可以实现延迟任务,但是这种方式坑比较多

7.2.1 问题一:任务存在延迟

Redis过期事件的发布不是指key到了过期时间就发布,而是key到了过期时间被清除之后才会发布事件。

而Redis过期key的两种清除策略,就是面试八股文常背的两种:

  • 惰性清除。当这个key过期之后,访问时,这个Key才会被清除
  • 定时清除。后台会定期检查一部分key,如果有key过期了,就会被清除

所以即使key到了过期时间,Redis也不一定会发送key过期事件,这就到导致虽然延迟任务到了延迟时间也可能获取不到延迟任务。

7.2.2 问题二:丢消息太频繁

Redis实现的发布订阅模式,消息是没有持久化机制,当消息发布到某个channel之后,如果没有客户端订阅这个channel,那么这个消息就丢了,并不会像MQ一样进行持久化,等有消费者订阅的时候再给消费者消费。

所以说,假设服务重启期间,某个生产者或者是Redis本身发布了一条消息到某个channel,由于服务重启,没有监听这个channel,那么这个消息自然就丢了。

7.2.3 问题三:消息消费只有广播模式

Redis的发布订阅模式消息消费只有广播模式一种。

所谓的广播模式就是多个消费者订阅同一个channel,那么每个消费者都能消费到发布到这个channel的所有消息。

在这里插入图片描述
如图,生产者发布了一条消息,内容为sanyou,那么两个消费者都可以同时收到sanyou这条消息。

所以,如果通过监听channel来获取延迟任务,那么一旦服务实例有多个的话,还得保证消息不能重复处理,额外地增加了代码开发量。

7.2.4 问题四:接收到所有key的某个事件

这个不属于Redis发布订阅模式的问题,而是Redis本身事件通知的问题。

当监听了__keyevent@__:expired的channel,那么所有的Redis的key只要发生了过期事件都会被通知给消费者,不管这个key是不是消费者想接收到的。

所以如果你只想消费某一类消息的key,那么还得自行加一些标记,比如消息的key加个前缀,消费的时候判断一下带前缀的key就是需要消费的任务。

8 Redisson的RDelayedQueue

Redisson他是Redis的儿子(Redis son),基于Redis实现了非常多的功能,其中最常使用的就是Redis分布式锁的实现,但是除了实现Redis分布式锁之外,它还实现了延迟队列的功能。

8.1 demo

引入依赖

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.1</version>
</dependency>

封装了一个RedissonDelayQueue类

@Component
@Slf4j
public class RedissonDelayQueue {

    private RedissonClient redissonClient;

    private RDelayedQueue<String> delayQueue;
    private RBlockingQueue<String> blockingQueue;

    @PostConstruct
    public void init() {
        initDelayQueue();
        startDelayQueueConsumer();
    }

    private void initDelayQueue() {
        Config config = new Config();
        SingleServerConfig serverConfig = config.useSingleServer();
        serverConfig.setAddress("redis://localhost:6379");
        redissonClient = Redisson.create(config);

        blockingQueue = redissonClient.getBlockingQueue("SANYOU");
        delayQueue = redissonClient.getDelayedQueue(blockingQueue);
    }

    private void startDelayQueueConsumer() {
        new Thread(() -> {
            while (true) {
                try {
                    String task = blockingQueue.take();
                    log.info("接收到延迟任务:{}", task);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }, "SANYOU-Consumer").start();
    }

    public void offerTask(String task, long seconds) {
        log.info("添加延迟任务:{} 延迟时间:{}s", task, seconds);
        delayQueue.offer(task, seconds, TimeUnit.SECONDS);
    }

}

这个类在创建的时候会去初始化延迟队列,创建一个RedissonClient对象,之后通过RedissonClient对象获取到RDelayedQueue和RBlockingQueue对象,传入的队列名字叫SANYOU,这个名字无所谓。

当延迟队列创建之后,会开启一个延迟任务的消费线程,这个线程会一直从RBlockingQueue中通过take方法阻塞获取延迟任务。

添加任务的时候是通过RDelayedQueue的offer方法添加的。

controller类,通过接口添加任务,延迟时间为5s

@RestController
public class RedissonDelayQueueController {

    @Resource
    private RedissonDelayQueue redissonDelayQueue;

    @GetMapping("/add")
    public void addTask(@RequestParam("task") String task) {
        redissonDelayQueue.offerTask(task, 5);
    }

}

启动项目,在浏览器输入如下连接,添加任务

http://localhost:8080/add?task=sanyou

静静等待5s,成功获取到任务。

在这里插入图片描述

8.2 实现原理

如下是Redisson延迟队列的实现原理

在这里插入图片描述
SANYOU前面的前缀都是固定的,Redisson创建的时候会拼上前缀。

  • redisson_delay_queue_timeout:SANYOU,sorted set数据类型,存放所有延迟任务,按照延迟任务的到期时间戳(提交任务时的时间戳 + 延迟时间)来排序的,所以列表的最前面的第一个元素就是整个延迟队列中最早要被执行的任务,这个概念很重要
  • redisson_delay_queue:SANYOU,list数据类型,也是存放所有的任务,但是研究下来发现好像没什么用。。
  • SANYOU,list数据类型,被称为目标队列,这个里面存放的任务都是已经到了延迟时间的,可以被消费者获取的任务,所以上面demo中的RBlockingQueue的take方法是从这个目标队列中获取到任务的
  • redisson_delay_queue_channel:SANYOU,是一个channel,用来通知客户端开启一个延迟任务

任务提交的时候,Redisson会将任务放到redisson_delay_queue_timeout:SANYOU中,分数就是提交任务的时间戳+延迟时间,就是延迟任务的到期时间戳

Redisson客户端内部通过监听redisson_delay_queue_channel:SANYOU这个channel来提交一个延迟任务,这个延迟任务能够保证将redisson_delay_queue_timeout:SANYOU中到了延迟时间的任务从redisson_delay_queue_timeout:SANYOU中移除,存到SANYOU这个目标队列中。

于是消费者就可以从SANYOU这个目标队列获取到延迟任务了。

所以从这可以看出,Redisson的延迟任务的实现跟前面说的MQ的实现都是殊途同归,最开始任务放到中间的一个地方,叫做redisson_delay_queue_timeout:SANYOU,然后会开启一个类似于定时任务的一个东西,去判断这个中间地方的消息是否到了延迟时间,到了再放到最终的目标的队列供消费者消费。

Redisson的这种实现方式比监听Redis过期key的实现方式更加可靠,因为消息都存在list和sorted set数据类型中,所以消息很少丢。

9 Netty的HashedWheelTimer

9.1 demo

@Slf4j
public class NettyHashedWheelTimerDemo {

    public static void main(String[] args) {
        HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 8);
        timer.start();

        log.info("提交延迟任务");
        timer.newTimeout(timeout -> log.info("执行延迟任务"), 5, TimeUnit.SECONDS);
    }

}

测试结果

在这里插入图片描述

9.2 实现原理

在这里插入图片描述
如图,时间轮会被分成很多格子(上述demo中的8就代表了8个格子),一个格子代表一段时间(上述demo中的100就代表一个格子是100ms),所以上述demo中,每800ms会走一圈。

当任务提交的之后,会根据任务的到期时间进行hash取模,计算出这个任务的执行时间所在具体的格子,然后添加到这个格子中,通过如果这个格子有多个任务,会用链表来保存。所以这个任务的添加有点像HashMap储存元素的原理。

HashedWheelTimer内部会开启一个线程,轮询每个格子,找到到了延迟时间的任务,然后执行。

由于HashedWheelTimer也是单线程来处理任务,所以跟Timer一样,长时间运行的任务会导致其他任务的延时处理。

前面Redisson中提到的客户端延迟任务就是基于Netty的HashedWheelTimer实现的。

10 Hutool的SystemTimer

Hutool工具类也提供了延迟任务的实现SystemTimer

10.1 demo

@Slf4j
public class SystemTimerDemo {

    public static void main(String[] args) {
        SystemTimer systemTimer = new SystemTimer();
        systemTimer.start();

        log.info("提交延迟任务");
        systemTimer.addTask(new TimerTask(() -> log.info("执行延迟任务"), 5000));
    }

}

执行结果

在这里插入图片描述
Hutool底层其实也用到了时间轮。

11 Qurtaz

Qurtaz是一款开源作业调度框架,基于Qurtaz提供的api也可以实现延迟任务的功能。

11.1 demo

依赖

<dependency>
    <groupId>org.quartz-scheduler</groupId>
    <artifactId>quartz</artifactId>
    <version>2.3.2</version>
</dependency>

SanYouJob实现Job接口,当任务到达执行时间的时候会调用execute的实现,从context可以获取到任务的内容

@Slf4j
public class SanYouJob implements Job {
    @Override
    public void execute(JobExecutionContext context) throws JobExecutionException {
        JobDetail jobDetail = context.getJobDetail();
        JobDataMap jobDataMap = jobDetail.getJobDataMap();
        log.info("获取到延迟任务:{}", jobDataMap.get("delayTask"));
    }
}

测试类

public class QuartzDemo {

    public static void main(String[] args) throws SchedulerException, InterruptedException {
        // 1.创建Scheduler的工厂
        SchedulerFactory sf = new StdSchedulerFactory();
        // 2.从工厂中获取调度器实例
        Scheduler scheduler = sf.getScheduler();

        // 6.启动 调度器
        scheduler.start();

        // 3.创建JobDetail,Job类型就是上面说的SanYouJob
        JobDetail jb = JobBuilder.newJob(SanYouJob.class)
                .usingJobData("delayTask", "这是一个延迟任务")
                .build();

        // 4.创建Trigger
        Trigger t = TriggerBuilder.newTrigger()
                //任务的触发时间就是延迟任务到的延迟时间
                .startAt(DateUtil.offsetSecond(new Date(), 5))
                .build();

        // 5.注册任务和定时器
        log.info("提交延迟任务");
        scheduler.scheduleJob(jb, t);
    }
}

执行结果:

在这里插入图片描述

10.2 实现原理

核心组件

  • Job:表示一个任务,execute方法的实现是对任务的执行逻辑
  • JobDetail:任务的详情,可以设置任务需要的参数等信息
  • Trigger:触发器,是用来触发业务的执行,比如说指定5s后触发任务,那么任务就会在5s后触发
  • Scheduler:调度器,内部可以注册多个任务和对应任务的触发器,之后会调度任务的执行

在这里插入图片描述
启动的时候会开启一个QuartzSchedulerThread调度线程,这个线程会去判断任务是否到了执行时间,到的话就将任务交给任务线程池去执行。

12 无限轮询延迟任务

无限轮询的意思就是开启一个线程不停的去轮询任务,当这些任务到达了延迟时间,那么就执行任务。

12.1 demo

@Slf4j
public class PollingTaskDemo {

    private static final List<DelayTask> DELAY_TASK_LIST = new CopyOnWriteArrayList<>();

    public static void main(String[] args) {
        new Thread(() -> {
            while (true) {
                try {
                    for (DelayTask delayTask : DELAY_TASK_LIST) {
                        if (delayTask.triggerTime <= System.currentTimeMillis()) {
                            log.info("处理延迟任务:{}", delayTask.taskContent);
                            DELAY_TASK_LIST.remove(delayTask);
                        }
                    }
                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (Exception e) {
                }
            }
        }).start();

        log.info("提交延迟任务");
        DELAY_TASK_LIST.add(new DelayTask("三友的java日记", 5L));
    }

    @Getter
    @Setter
    public static class DelayTask {

        private final String taskContent;

        private final Long triggerTime;

        public DelayTask(String taskContent, Long delayTime) {
            this.taskContent = taskContent;
            this.triggerTime = System.currentTimeMillis() + delayTime * 1000;
        }
    }

}

任务可以存在数据库又或者是内存,看具体的需求,这里我为了简单就放在内存里了。

执行结果:

在这里插入图片描述
这种操作简单,但是就是效率低下,每次都得遍历所有的任务。

13 最后

最后,本文所有示例代码地址:

https://github.com/363153421/delay-task-demo-master.git

上期文章:

从实现到原理,总结11种延迟任务的实现方式(上)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/652565.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

小鱼深度产品测评之:阿里云云效代码管理 Codeup,一款数十万企业正在使用,全方位保护企业代码资产的实力产品,。

云效代码管理 Codeup 0、引言1、进入页面2、创建代码库3、资源文件页面4、分支→新建保护分支规则5、分支-基本设置5.1 基本信息 模块5.2 存储空间管理 6、分支→仓库备份6.1 点击 如何启用 按钮6.2 点击 前往企业设置查看 7、合并请求8、度量报表9、动态10、流水线11、总结 0、…

基于html+css的图展示129

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

24届秋招专场 · 数组如何用双指针解题呢?

你好&#xff0c;我是安然无虞。 文章目录 删除有序数组中的重复项删除排序链表中的重复元素移除元素移除零 大家好&#xff0c;近期主要更新数组相关的解题算法咯&#xff0c;感兴趣的老铁可以一起看过来啦。 今天更新使用双指针解决数组部分题型&#xff0c;注意哦&#xff…

支付宝小程序云亮相!向小程序生态开放全面云服务

前言&#xff1a; 小程序是一种轻量级应用程序&#xff0c;不需要安装即可直接在手机上使用。相较于传统的APP来讲&#xff0c;其无需下载安装&#xff0c;轻便快捷&#xff0c;快速启动&#xff0c;易于推广的良好特性为我们所青睐。 为此&#xff0c;支付宝小程序云&#xff…

DataX在有赞大数据平台的实践

文章目录 一、需求二、选型三、前期设计3.1 运行形态3.2 执行器设计3.3 开发策略 四、Datax-Web五、总结 大家好&#xff0c;我是脚丫先生 (o^^o) 在看技术文章的时候&#xff0c;发现有赞平台采用过Datax。想到指北数据中台&#xff0c;数据汇聚采用的是Datax-web二次开发&am…

chatgpt赋能python:Python的字符串索引操作技巧

Python的字符串索引操作技巧 Python是一个强大而灵活的编程语言&#xff0c;被广泛用于各种领域。在Python中&#xff0c;字符串是一个非常重要的数据类型&#xff0c;它可以包含文本、数字、符号和其他任何字符。在处理字符串时&#xff0c;索引操作是常见的操作之一。本文将…

计算机网络之网络层:数据平面

四.网络层&#xff1a;数据平面 4.1 网络层概述 网络层被分解为两个相互作用的部分&#xff0c;即数据平面和控制平面。 数据平面决定到达路由器输入链路之一的数据报如何转发到该路由器的输出链路之一&#xff0c;转发方式有&#xff1a; 传统的IP转发&#xff1a;转发基于…

【算法与数据结构】707、LeetCode设计链表

文章目录 一、题目二、设计链表三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、设计链表 思路分析&#xff1a;这里我将的成员函数放在类外实现了&#xff0c;这样链表类看起来更加简洁&#xff0c;方便大家…

jenkins用户权限管理

环境准备: 登录jenkins: http://192.168.9.190:8091/ admin asdwhl@0 一、用户权限插件安装 1、Dashboard > Manage Jenkins > Manage Plugins > Available(可选插件) 依次安装: Role-based Authorization Strategy Authorize Project

校园二手交易平台实训报告

目 录 一、设计背景 1. 需求分析 2. 课题研究的目的和意义 二、系统需求分析与开发环境 1. 系统功能需求 2. 系统界面需求 3. 开发环境 三、系统设计 四、系统测试 1. 脑模拟器测试 五、总结与展望 六、重要程序 1. LoginActivity 2. RegisterActiv…

51Proteus仿真数控0~20mA恒流源串口DAC0832数码管显示-0036

51Proteus仿真数控0~20mA恒流源串口DAC0832数码管显示-0036 Proteus仿真小实验&#xff1a; 51Proteus仿真数控0~20mA恒流源串口DAC0832数码管显示-0036 功能&#xff1a; 硬件组成&#xff1a;AT89C51单片机 6位数码管DAC0832电压输出多个按键&#xff08;设置、移动、加、…

INTERSPEECH 2023论文|基于多频带时频注意力的复调音乐旋律提取

论文题目&#xff1a; MTANet: Multi-band Time-frequency Attention Network for Singing Melody Extraction from Polyphonic Music 作者列表&#xff1a; 高虞安&#xff0c;胡英&#xff0c;王柳淞&#xff0c;黄浩&#xff0c;何亮 研究背景 复调音乐是一种具有多个声…

[PyTorch][chapter 41][卷积网络实战-LeNet5]

前言 这里结合前面学过的LeNet5 模型&#xff0c;总结一下卷积网络搭建&#xff0c;训练的整个流程 目录&#xff1a; 1&#xff1a; LeNet-5 2: 卷积网络总体流程 3&#xff1a; 代码 一 LeNet-5 LeNet-5是一个经典的深度卷积神经网络&#xff0c;由Yann LeCun在1998年提…

zabbix-agent安装

1.CentOS release 5 1-1.centos5 32位 [rootLV zabbix]# cat /etc/redhat-release CentOS release 5 (Final) [rootLV zabbix]# uname -a Linux LV 2.6.18-53.el5xen #1 SMP Mon Nov 12 03:26:12 EST 2007 i686 i686 i386 GNU/Linux确定了系统centos5 32位rpm方式安装&#…

Ubuntu18.04离线安装redis

因需要安装redis的服务器无法连接互联网&#xff0c;所以需要离线安装。首先需要下载redis的安装包&#xff0c;之后进行安装&#xff0c;在安装之前需要保证gcc&#xff0c;g&#xff0c;make等依赖包已经安装。 1. 安装gcc等依赖包 依赖包安装请参考&#xff1a; Ubuntu18…

CI570 3BSE001440R1需要电流显示和就地/远传控制

​ CI570 3BSE001440R1需要电流显示和就地/远传控制 CI570 3BSE001440R1需要电流显示和就地/远传控制 如果变频器与通讯方式与DCS系统连接&#xff0c;则只需要计算1个通讯点&#xff0c;不需要计算其他点数。 &#xff08;6&#xff09;如DCS系统外接电磁阀、指示灯、接触器等…

物联网云平台数据存储方案,这次我终于找对了

《高并发系统实战派》-- 你值得拥有 文章目录 物联网云平台存储概述为什么要做存储&#xff1f;存储的意义在哪里&#xff1f;数据存储方案设计存储数据库选型需要考虑的因素数据库选型结构化数据半结构化数据非结构化数据 案例分析第一颗栗子第二颗栗子第三颗栗子第四颗栗子 …

Web安全:vulhub 靶场搭建.(各种漏洞环境集合,一键搭建漏洞测试靶场)

Web安全&#xff1a;vulhub 靶场搭建. Vulhub是一个面向大众的开源漏洞靶场&#xff0c;无需docker知识&#xff0c;简单执行两条命令即可编译、运行一个完整的漏洞靶场镜像。让漏洞复现变得更加简单&#xff0c;让安全研究者更加专注于漏洞原理本身. 目录&#xff1a; Web安…

DLL修复工具下载,解决DLL文件问题的方法

在计算机应用程序中&#xff0c;我们经常会遇到一些错误提示&#xff0c;如“找不到.dll文件”或“无法加载.dll文件”。这些问题通常是由于缺少或损坏的DLL文件造成的。为了解决这些问题&#xff0c;我们可以借助DLL修复工具来修复和恢复DLL文件。本文将介绍什么是DLL文件&…

C# 自动更新(基于FTP)

效果 启动软件后&#xff0c;会自动读取所有的 FTP 服务器文件&#xff0c;然后读取本地需要更新的目录&#xff0c;进行匹配&#xff0c;将 FTP 服务器的文件同步到本地 Winform 界面 一、前言 在去年&#xff0c;我写了一个 C# 版本的自动更新&#xff0c;这个是根据配置文…