一个简易的并行任务调度器

news2024/9/19 9:37:39

介绍

当你需要在程序中处理大量同类型的耗时任务,并且需要监听每个任务的工作状态和任务的编号,能及时响应并处理错误,而且需要所有任务都执行完毕自动通知时,那么这个任务调度器就非常适合你!

它封装了 JavaThreadPoolExecutorScheduledExecutorService,提供了一种简便的方式来管理并行任务的执行,并在所有任务完成后自动关闭执行器。

之前写了一个串行任务调度器,这篇也是根据这个改编而来:一个用Kotlin编写简易的串行任务调度器

使用方法

1.初始化:

val taskListener = object : ParallelTaskExecutor.TaskListener {
    override fun beforeExecute(task: ParallelTaskExecutor.NamedRunnable) {
        println("开始任务:${task.name}")
    }

    override fun afterExecute(task: ParallelTaskExecutor.NamedRunnable, exception: Exception?) {
        println("完成任务:${task.name},异常:$exception")
    }
}
val taskExecutor = ParallelTaskExecutor(taskListener, 1)

2.提交任务:

taskExecutor.submit("加载数据") {
    // 加载数据的代码
}
taskExecutor.submit("处理数据") {
    // 处理数据的代码
}
//或者
for (i in 0..15) {
            taskExecutor.submit("task:$i") {
                Thread.sleep(3000)
            }
        }
...

3.优雅关闭:

当所有任务完成后,调度器将在指定的超时后自动关闭,确保不浪费资源。

完整代码

/**
 * ParallelTaskExecutor 是一个用于管理和并行执行任务的类。
 * 它确保所有任务都被执行,并提供一个机制在所有任务完成时通知。
 */
class ParallelTaskExecutor(private val listener: TaskListener? = null, private val timeout: Long = 5) {

    // 用于存储任务的队列
    private val taskQueue = ConcurrentLinkedQueue<NamedRunnable>()

    // 标记当前是否有任务正在处理
    private val isTaskRunning = AtomicBoolean(false)

    // 用于并行运行任务的执行器服务
    private var executorService: ExecutorService? = null

    // 用于安排执行器服务关闭的调度执行器服务
    private val scheduler: ScheduledExecutorService = Executors.newScheduledThreadPool(1)

    // 用于安排执行器服务关闭的 Future
    private var scheduledShutdownFuture: ScheduledFuture<*>? = null

    // 计数器,用于跟踪运行中的任务数量
    private val runningTasksCounter = AtomicInteger(0)

    /**
     * 提交一个任务到执行器。
     * @param name 任务的名称。
     * @param task 要执行的任务。
     */
    @Synchronized
    fun submit(name: String, task: Runnable) {
        ensureExecutorService() // 确保执行器服务已启动
        taskQueue.offer(NamedRunnable(name, task)) // 将任务添加到队列中

        // 取消之前安排的任何关闭操作
        scheduledShutdownFuture?.cancel(false)

        // 如果当前没有运行的任务,开始处理任务
        if (isTaskRunning.compareAndSet(false, true)) {
            processTasks()
        }
    }

    /**
     * 处理队列中的任务。
     */
    private fun processTasks() {
        try {
            while (taskQueue.isNotEmpty()) {
                // 从队列中获取下一个任务
                val nextTask = taskQueue.poll()

                // 递增运行任务计数器
                runningTasksCounter.incrementAndGet()

                // 将任务提交到执行器服务
                executorService?.execute {
                    listener?.beforeExecute(nextTask)
                    var exception: Exception? = null
                    try {
                        nextTask.run()
                    } catch (e: Exception) {
                        e.printStackTrace()
                        exception = e
                    }
                    listener?.afterExecute(nextTask, exception)

                    // 任务执行完毕后递减计数器,并检查是否所有任务都已完成
                    if (runningTasksCounter.decrementAndGet() == 0) {
                        scheduleShutdown()
                    }
                }
            }
        } finally {
            // 重置 isTaskRunning 标志
            isTaskRunning.set(false)
        }
    }

    /**
     * 确保执行器服务已初始化并在运行中。
     */
    private fun ensureExecutorService() {
        if (executorService == null || executorService!!.isShutdown) {
            executorService = Executors.newFixedThreadPool(20)
        }
    }

    /**
     * 如果所有任务都已完成,安排关闭执行器服务。
     */
    private fun scheduleShutdown() {
        // 如果任务队列为空且没有运行中的任务,则安排关闭
        if (taskQueue.isEmpty() && runningTasksCounter.get() == 0) {
            scheduledShutdownFuture = scheduler.schedule({
                executorService?.shutdown()
                executorService = null
                listener?.onShutdown()
            }, timeout, TimeUnit.SECONDS)
        }
    }

    /**
     * 任务监听器接口,提供任务执行事件的回调。
     */
    interface TaskListener {
        fun beforeExecute(task: NamedRunnable)
        fun afterExecute(task: NamedRunnable, exception: Exception?)
        fun onShutdown()
    }

    /**
     * NamedRunnable 是一个对 Runnable 任务的包装,包含一个用于标识的名称。
     */
    class NamedRunnable(val name: String, private val task: Runnable) : Runnable {
        override fun run() {
            task.run()
        }
    }
}

最后

  • 可以根据你的需求自由定义线程池最大线程数
  • 资源自动管理,超时自动释放资源,新加任务自动创建线程池。
  • 自定义超时时间,可以延迟线程池资源释放,等待新任务到来,合理利用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2146019.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【FFT】信号处理——快速傅里叶变换【通俗易懂】

快速傅里叶变换&#xff08;Fast Fourier Transform, FFT&#xff09;是一种用于将信号从时间域转换到频率域的算法。 傅里叶变换的核心思想是&#xff1a;任何周期性信号都可以分解成多个不同频率的正弦波或余弦波的叠加。 简单来说&#xff0c;FFT可以帮助我们理解一个信号…

使用 Internet 共享 (ICS) 方式分配ip

设备A使用dhcp的情况下&#xff0c;通过设备B分配ip并共享网络的方法。 启用网络共享&#xff08;ICS&#xff09;并配置 NAT Windows 自带的 Internet Connection Sharing (ICS) 功能可以简化 NAT 设置&#xff0c;允许共享一个网络连接给其他设备。 打开网络设置&#xff1…

力扣之1075.项目员工I

文章目录 1. 1075.项目员工I1.1 题干1.2 准备数据1.3 解法1.4 结果截图 1. 1075.项目员工I 1.1 题干 项目表 Project&#xff1a; -------------------- | Column Name | Type | -------------------- | project_id | int | | employee_id | int | -------------------- 主键…

『小白可入』VSPD虚拟串口工具——从此告别硬件串口调试

一、下载VSPD工具 工具已上传至百度云&#xff0c;在以下地址下载&#xff1a; VSPD下载链接&#xff1a;通过百度网盘分享的文件&#xff1a;我的资源 链接&#xff1a;https://pan.baidu.com/s/1x2eoQYg6erqs__CQgT5j6Q?pwd4211 提取码&#xff1a;4211 二、安装 1.解压后的…

2024年Apple Search Ads(简称:苹果ASA):开展有效活动的秘诀

当谈到为应用程序启动广告活动时&#xff0c;许多人会立即想到Android。然而&#xff0c;这并不总是最好的选择&#xff0c;因为iOS设备在多个国家和地区占据市场主导地位。在这些地区&#xff0c;定位ios用户可以带来更大的成功。 您可以通过各种渠道在iOS上投放广告&#xff…

最低成本的游戏串流方案分享 如何自己打造云电脑?

今天教大家如何最低成本实现串流 出门在外也可以随时随地游玩端游大作 硬件准备&#xff1a;一台电脑 手机/平板一台 软件&#xff1a;Gameviewer远程 为啥不用moonlight等其他软件呢 因为设置公网穿透等复杂操作对小白来说不太友好 而GameViewer从安装到使用仅需一键 对比同类…

PostgreSQL技术内幕10:PostgreSQL事务原理解析-日志模块介绍

文章目录 0.简介1.PG日志介绍2.事务日志介绍3.WAL分析3.1 WAL概述3.2 WAL设计考虑3.2.1 存储格式3.2.2 实现方式3.2.3 数据完整性校验3.3 check ponit 4.事务提交日志&#xff08;CLOG&#xff09;4.1 clog存储使用介绍4.2 slru缓冲池并发控制 0.简介 本文将延续上一篇文章内容…

好用的网页翻译插件

软件介绍 「火山翻译&#xff0c;开箱即用免配置&#xff0c;完全免费无广告&#xff0c;开发的多语言翻译插件&#xff0c;基本涵盖众多小语种及国际通用语言的翻译&#xff0c;支持网页一键翻译、划词翻译、英语词典、生词本、吐司弹词记忆等丰富能力。 下载方式 请看文章…

【AprilTag】视觉定位实战 | 使用 ROS 驱动的 USB 摄像头进行相机标定与 AprilTag 识别

写在前面&#xff1a; &#x1f31f; 欢迎光临 清流君 的博客小天地&#xff0c;这里是我分享技术与心得的温馨角落。&#x1f4dd; 个人主页&#xff1a;清流君_CSDN博客&#xff0c;期待与您一同探索 移动机器人 领域的无限可能。 &#x1f50d; 本文系 清流君 原创之作&…

Matlab进行频率切片小波变换

Matlab进行频率切片小波变换(FSWT)源代码&#xff0c;将一维信号生成时频图。 输入信号可以是任何一维信号&#xff0c;心电信号、脑电信号、地震波形、电流电压数据等。 相比连续小波变换(CWT)&#xff0c;频率切片小波变换(Frequency Slice Wavelet Transform,FSWT)是一种更具…

C# 使用代码清理 以及禁用某个代码清理

初级代码游戏的专栏介绍与文章目录-CSDN博客 我的github&#xff1a;codetoys&#xff0c;所有代码都将会位于ctfc库中。已经放入库中我会指出在库中的位置。 这些代码大部分以Linux为目标但部分代码是纯C的&#xff0c;可以在任何平台上使用。 源码指引&#xff1a;github源…

鸿蒙Harmony应用开发,数据驾驶舱 项目结构搭建

对于一个项目而言&#xff0c;在拿到我们的开发任务后&#xff0c;我们最重要的就是技术的选型。选型定下来了之后我们便开始脚手架的搭建&#xff0c;然后开始撸代码&#xff0c;开搞. 首先我们需要对一些常见依赖库的引入 我们需要再oh-package.json5的dependencies节点下面…

strlen和sizeof

在 C 语言中&#xff0c;strlen 和 sizeof 是两个非常常用的操作符&#xff0c;但它们的作用和用途有很大的不同。下面详细解释这两个操作符&#xff1a; strlen strlen 是一个函数&#xff0c;定义在 <string.h> 头文件中&#xff0c;用于计算一个以空字符&#xff08…

华为OD机试 - 字符串划分(Java 2024 E卷 100分)

华为OD机试 2024E卷题库疯狂收录中&#xff0c;刷题点这里 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;E卷D卷A卷B卷C卷&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;私信哪吒&#xff0c;备注华为OD&#xff0c;加…

注意,传统的提示工程对新模型o1可能失效:来自OpenAI官方的4条提示词建议!

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;专注于分享AI全维度知识&#xff0c;包括但不限于AI科普&#xff0c;AI工…

Flink系列知识之:Checkpoint原理

Flink系列知识之&#xff1a;Checkpoint原理 在介绍checkpoint的执行流程之前&#xff0c;需要先明白Flink中状态的存储机制&#xff0c;因为状态对于检查点的持续备份至关重要。 State Backends分类 下图显示了Flink中三个内置的状态存储种类。MemoryStateBackend和FsState…

linux设置常见开机自启动命令

本文介绍了三种开机自启的方式&#xff0c;重点介绍使用systemctl的方式自启动的 方式一、修改 /etc/rc.d/rc.local 文件 /etc/rc.d/rc.local 文件会在 Linux 系统各项服务都启动完毕之后再被运行。所以你想要自己的脚本在开机后被运行的话&#xff0c;可以将自己脚本路径加到…

Kubernetes从零到精通(12-Ingress、Gateway API)

Ingress和Gateway API都是Kubernetes中用于管理外部访问集群服务的机制&#xff0c;但它们有不同的设计理念和适用场景。它们的基本原理是通过配置规则&#xff0c;将来自外部的网络流量路由到Kubernetes集群内部的服务上。 Ingress/Gateway API和Service Ingress/Gateway API…

边缘计算智能网关的功能应用与优势-天拓四方

在物联网的世界中&#xff0c;数以亿计的设备不断产生、传输和处理数据。然而&#xff0c;传统的云计算架构在面对这些实时性要求高、数据量庞大的物联网数据时&#xff0c;常常面临着网络延迟、带宽限制和安全风险等问题。这时&#xff0c;边缘计算智能网关作为一种新兴的技术…

图书馆座位预约系统小程序的设计

管理员账户功能包括&#xff1a;系统首页&#xff0c;个人中心&#xff0c;用户管理&#xff0c;图书馆管理&#xff0c;座位信息管理&#xff0c;预约选座管理&#xff0c;签到信息管理&#xff0c;系统管理 微信端账号功能包括&#xff1a;系统首页&#xff0c;论坛&#xf…