手写ThreadPoolExecutor线程池

news2024/12/27 9:49:43

很多人不推荐造轮子,我偏不。我造轮子又不是为了上生产环境,而是为了加深理解,有何不可?私以为造轮子几乎是最好的学习方式,甚至没有之一。因为造轮子需要至少做足以下两点:

  • 了解设计思想(设计层面)
  • 大略看过源码(代码层面)

不了解设计,就无法把握整体。没看过代码,就无法完善细节。

另外,从创作者的角度来说,直接分析源码有时太困难了,代码太多,抽象层次太深。如果可以通过造轮子,把抽象层次减少一些,采用平铺直叙的方式呈现,那么读者理解起来也就更容易些。

既然造轮子这么好,那就,拿来吧你。今天带大家造一个线程池。

线程池设计思想

池化技术

大家多少听过所谓的“池化技术”,比如数据库连接池、常量池、线程池、对象池等等。池化技术是计算机世界里比较常用的、行之有效的优化手段。那么我想问你,线程池中的“池”,到底指代什么?

抛开无足轻重的小虾米,线程池中最主要的就两个:我们向Executor提交的任务、Executor自己维护的Thread。其中,线程“池”显然指代的是Thread的集合。

但和数据库连接池等一般的池化技术不同的是,ThreadPool的作用不单单是“池化”,它更重要的职责其实是“做功”,也即是执行任务。举个例子,平时我们使用数据库连接池,其实都是从池中取出一个Connection,执行完SQL后会调用重写过的close()归还Connection。但你可曾见过有人向ThreadPool讨要Thread的?它会给你吗?ThreadPool的做法是:

想要从池中拿Thread?没门儿!你不知道自己多线程知识多菜啊?小心玩火自焚。要执行任务的话,你自己把Task丢进来,哥罩着你。

也就是说,ThreadPool从一开始就没想过让你们把Thread拿走!但你们又要返回结果咋整?我返回一个FutureTask,需要结果的话,自己FutureTask#get()。但主动权还是在ThreadPool这,能不能拿到结果、是否要阻塞都是它说了算!

大部分人觉得线程池难,并非搞不清楚线程“池”,而是不了解它是如何“做功”的,也就是说:线程池是如何执行任务的呢?这就涉及到线程池和一般池化技术最大的不同:内化执行操作,而且是通过生产消费的模式执行任务。

生产消费模型

如果往线程池不断提交任务,大致会经历4个阶段:

  • 核心线程处理任务
  • 任务进入任务队列等待
  • 非核心线程处理任务
  • 拒绝策略

特别是第二个阶段,来不及处理的任务会被暂存入workQueue(任务队列),于是典型的生产消费模型就出现了。

调用者投递Task ====> ThreadPool.workQueue ====> workerThread阻塞获取Task执行

几个重要概念

线程池如何复用线程?

有时候,要解决一个问题,从反方向入手可能更简单写。我们暂且先不管如何复用线程,我就问大家:如何回收/销毁线程?(知道线程什么情况会被销毁,那么只要避免销毁,也就可以复用)

“线程”这个词,其实有两个层次的指代:Thread对象、JVM线程资源(本质还是操作系统线程)。Thread对象与线程资源之间是绑定关系,一个线程资源被分配后,会找到Thread#run()作为代码的执行入口。

线程什么时候销毁呢?正常来说,new Thread(tartget).start()后,操作系统就会分配线程资源。等到线程执行完Thread#run()中的代码,就会自然消亡。至于Thread对象,如果没有引用,也会被GC回收。

看到这,我想大家应该明白了:只要任务永远不结束,线程就永远死不了。任务如何才能永远不结束呀?要么循环做任务、要么阻塞。

线程池本质也是Thread,只是单体和集合的区别。既然Thread“跑完任务就销毁”的特性是天生的、注定的,线程池也无法改变这一点。所以,线程池要想让内部线程一直存活着,就要keeps threads busy working,也就是让它们一直干活。实在没活干怎么办?那就阻塞着呗(可以用阻塞队列)!总之,不能让你“执行完毕”,否则就销毁了。

如何保证只销毁“非核心线程”

大家都听过一些八股文的口诀,比如“在空闲时间,如果非核心线程空闲超过keepAliveTime就会被回收”,这是怎么实现的呢?

首先,有一个常见的误区是,很多人以为线程池创建线程时会给每一个Thread做标记,比如给核心线程标记为coreThread,非核心线程标记为nonCoreThread,然后空闲时间回收nonCoreThread。

然而JDK的Doug Lea可不这么想,人家采用的方案更加简单粗暴:

  • 当前线程数 <= corePoolSize,那么所有线程都是核心线程,不回收
  • 当前线程数 > corePoolSize,那么适当回收部分线程

看吧,“部分”线程,甚至压根不管你是谁。

OK,罗里吧嗦讲了一大堆,开始coding吧。

山寨线程池Demo

建议大家拷贝代码到本地debug调试,代码已经很精简了,比较容易追踪。

public class ThreadPool {

    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * 工作线程
     */
    private final List<Worker> workers = new ArrayList<>();
    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> workQueue;
    /**
     * 核心线程数
     */
    private final int corePoolSize;
    /**
     * 最大线程数
     */
    private final int maximumPoolSize;
    /**
     * 非核心线程最大空闲时间(否则销毁线程)
     */
    private final long keepAliveTime;

    public ThreadPool(int corePoolSize,
                      int maximumPoolSize,
                      long keepAliveTime,
                      TimeUnit timeUnit,
                      BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.keepAliveTime = timeUnit.toNanos(keepAliveTime);
    }

    public void execute(Runnable task) {
        Assert.notNull(task, "task is null");

        // 创建核心线程处理任务
        if (workers.size() < corePoolSize) {
            this.addWorker(task, true);
            return;
        }

        // 尝试加入任务队列
        boolean enqueued = workQueue.offer(task);
        if (enqueued) {
            return;
        }

        // 创建非核心线程处理任务
        if (!this.addWorker(task, false)) {
            // 非核心线程数达到上限,触发拒绝策略
            throw new RuntimeException("拒绝策略");
        }
    }

    private boolean addWorker(Runnable task, boolean core) {
        int wc = workers.size();
        if (wc >= (core ? corePoolSize : maximumPoolSize)) {
            return false;
        }

        boolean workerStarted = false;
        try {
            Worker worker = new Worker(task);
            final Thread thread = worker.getThread();
            if (thread != null) {
                mainLock.lock();
                workers.add(worker);
                thread.start();
                workerStarted = true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            mainLock.unlock();
        }

        return workerStarted;
    }

    private void runWorker(Worker worker) {
        Runnable task = worker.getTask();

        try {
            // 循环处理任务
            while (task != null || (task = getTask()) != null) {
                task.run();
                task = null;
            }
        } finally {
            // 从循环退出来,意味着当前线程是非核心线程,而且需要被销毁
            // Java的线程,既可以指代Thread对象,也可以指代JVM线程,一个Thread对象绑定一个JVM线程
            // 因此,线程的销毁分为两个维度:1.把Thread对象从workers移除 2.JVM线程执行完当前任务,会自然销毁
            workers.remove(worker); // TODO 最好加锁
        }
    }


    private Runnable getTask() {
        boolean timedOut = false;

        // 循环获取任务
        for (; ; ) {

            // 是否需要检测超时:当前线程数超过核心线程
            boolean timed = workers.size() > corePoolSize;

            // 需要检测超时 && 已经超时了
            if (timed && timedOut) {
                return null;
            }

            try {
                // 是否需要检测超时
                // 1.需要:poll阻塞获取,等待keepAliveTime,等待结束就返回,不管有没有获取到任务
                // 2.不需要:take持续阻塞,直到获取结果
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    @Getter
    @Setter
    private class Worker implements Runnable {
        private Thread thread;
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
            thread = new Thread(this);
        }

        @Override
        public void run() {
            runWorker(this);
        }
    }

}

代码示意图(虚线框内由Thread异步执行):

这个图有瑕疵,实际上线程池并不区分coreThread和nonCoreThread,仅看当前线程数是否大于corePoolSize

测试案例

@Slf4j
public class ThreadPoolTest {

    public static void main(String[] args) {

        // 创建线程池,核心线程1,最大线程2
        // 提交4个任务:第1个任务交给核心线程、第2个任务入队、第3个任务交给非核心线程、第4个任务被拒绝
        ThreadPool threadPoolExecutor = new ThreadPool(
                1,
                2,
                1,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1)
        );

        threadPoolExecutor.execute(() -> {
            log.info("{}:执行第1个任务...", Thread.currentThread().getName());
            sleep(10);
        });

        sleep(1);

        threadPoolExecutor.execute(() -> {
            log.info("{}:执行第2个任务...", Thread.currentThread().getName());
            sleep(10);

        });

        sleep(1);

        threadPoolExecutor.execute(() -> {
            log.info("{}:执行第3个任务...", Thread.currentThread().getName());
            sleep(10);
        });

        sleep(1);

        threadPoolExecutor.execute(() -> {
            log.info("{}:执行第4个任务...", Thread.currentThread().getName());
            sleep(10);
        });

        sleep(1);

        log.info("main结束");
    }

    private static void sleep(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

大家可以把测试案例中的线程池换成JDK的ThreadPoolExecutor,执行效果很类似:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1207070.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【vue2】vue2 实现手风琴效果,复制粘贴直接使用

【vue2】vue2 实现手风琴效果&#xff0c;复制粘贴直接使用 效果 代码 <template><divclass"about-index":style"{ backgroundImage: url( lisData.img ) }"><div class"container"><div class"fine-grained"…

振南技术干货集:深入浅出的Bootloader(2)

注解目录 1、烧录方式的更新迭代 1.1 古老的烧录方式 (怀旧一下&#xff0c;单片机高压烧录器。) 1.2 ISP 与ICP 烧录方式 (还记得当年我们玩过的 AT89S51?) 1.3 更方便的 ISP 烧录方式 1.3.1串口 ISP &#xff08;是 STC 单片机成就了我们&#xff0c;还是我们成就了…

【SpringBoot】序列化和反序列化介绍

一、认识序列化和反序列化 Serialization&#xff08;序列化&#xff09;是一种将对象以一连串的字节描述的过程&#xff1b;deserialization&#xff08;反序列化&#xff09;是一种将这些字节重建成一个对象的过程。将程序中的对象&#xff0c;放入文件中保存就是序列化&…

驱动程序编进内核或则编成模块

驱动程序可以编进内核或则编成模块 驱动程序编成模块 打开/home/book/100ask_imx6ull-sdk/Linux-4.9.88/drivers/char/Kconfig文件&#xff0c;添加以下信息。 在/home/book/100ask_imx6ull-sdk/Linux-4.9.88在目录下使用make memuconfig命令查看配置菜单。 可以按/&#…

escape, encodeURI, encodeURIComponent 有什么区别以及作用?

目录 前言 全部内容 1. 注释 2. 写法 3. 代码 4. 事件 5. 总结 6. 理论 7. 用法 8. 结论 9. API 10. 优缺点 escape: encodeURI: encodeURIComponent: 11. 方法 总结 &#x1f642;博主&#xff1a;冰海恋雨. &#x1f642;文章核心&#xff1a;escape, encod…

突破职场竞争,引领未来发展:考取《研发效能(DevOps)工程师职业技术认证》

就业形势堪忧&#xff0c;什么最有保障&#xff1f;考个“国家级”证书傍身吧&#xff01; 工信部教考中心作为中国领先的行业技能认证机构&#xff0c;其颁发的认证证书不仅代表了个人在信息技术领域的专业能力&#xff0c;更可以录入工业和信息化技术技能人才数据库&#xf…

上海市合成生物产业协会第一届第一次会员大会暨成立仪式今日召开

IFTNews科技讯&#xff1a;11月12日下午&#xff0c;上海市合成生物产业协会第一届第一次会员大会暨成立仪式在上海浦东成功举办。上海市经济和信息化委员会副主任刘平、上海市科学技术委员会一级巡视员兼副主任朱启高、上海市推进科技创新中心建设办公室专职副主任陈尧水出席大…

【技术干货】开源库 Com.Gitusme.Net.Extensiones.Core 的使用(二)

Com.Gitusme.Net.Extensiones.Core 扩展库 1.0.6 版本已发布。 1、版本变更说明 新增Sokcet套接字扩展。简化Socket操作&#xff0c;支持自定义命令封装&#xff0c;使用方便快捷&#xff0c;让您聚焦业务实现&#xff0c;而不必关心底层逻辑&#xff0c;提高开发效率。日志功…

二叉树理论碎碎记

二叉树的种类 在刷题的过程中&#xff0c;我们主要关注两种主要形式&#xff1a;满二叉树和完全二叉树。 满二叉树 如果一棵二叉树只有度为0的结点和度为2的结点&#xff0c;并且度为0的结点在同一层上&#xff0c;则这棵二叉树为满二叉树。如下图所示&#xff1a;这棵二叉…

RT1170的ITM SWO配置,实现printf输出及PC指针的采样分析

最近公司准备启动一个新的项目&#xff0c;使用NXP的MIMXRT1170芯片作为主控&#xff0c;在熟悉芯片的过程中发现RT1176具备ITM和SWO功能模块&#xff0c;于是针对之前项目中因工程庞大导致调试困难的问题&#xff0c;决定使用SWO输出调试信息&#xff0c;这样既可以节省硬件的…

Centos7.9用rancher来快速部署K8S

什么是 Rancher&#xff1f; Rancher 是一个 Kubernetes 管理工具&#xff0c;让你能在任何地方和任何提供商上部署和运行集群。 Rancher 可以创建来自 Kubernetes 托管服务提供商的集群&#xff0c;创建节点并安装 Kubernetes&#xff0c;或者导入在任何地方运行的现有 Kube…

微服务系列-使用 RestTemplate 的 Spring Boot 微服务通信示例

公众号「架构成长指南」&#xff0c;专注于生产实践、云原生、分布式系统、大数据技术分享。 概述 下面我们将学习如何创建多个 Spring boot 微服务以及如何使用 RestTemplate 类在多个微服务之间进行同步通信。 微服务通信有两种风格&#xff1a; 同步通讯异步通信 同步通…

CTFhub-RCE-远程包含

给咱一个phpinfo那么必然有他的道理 PHP的配置选项allow_url_include为ON的话&#xff0c;则include/require函数可以加载远程文件&#xff0c;这种漏洞被称为"远程文件包含漏洞(Remote File Inclusion RFI)"。 allow_url_fopen On 是否允许打开远程文件 allow_u…

基于FPGA的图像RGB转HLS实现,包含testbench和MATLAB辅助验证程序

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1计算最大值和最小值 4.2计算亮度L 4.3计算饱和度S 4.4计算色调H 5.算法完整程序工程 1.算法运行效果图预览 将FPGA结果导入到MATLAB显示效果&#xff1a; 2.算法运行软件版本 Vivado…

深入理解 Spring Boot 内置工具类:ReflectionUtils

文章目录 1. 什么是反射&#xff1f;2. 使用 ReflectionUtils2.1 获取类的所有字段2.2 调用方法2.3 访问字段 3. 源码分析3.1 doWithFields3.2 findMethod3.3 invokeMethod 4. 拓展与分析4.1 拓展4.2 性能考虑4.3 Java 9 模块化 5. 总结 &#x1f389;欢迎来到架构设计专栏~深入…

STM32 I2C详解

STM32 I2C详解 I2C简介 I2C&#xff08;Inter IC Bus&#xff09;是由Philips公司开发的一种通用数据总线 两根通信线&#xff1a; SCL&#xff08;Serial Clock&#xff09;串行时钟线&#xff0c;使用同步的时序&#xff0c;降低对硬件的依赖&#xff0c;同时同步的时序稳定…

振南技术干货集:深入浅出的Bootloader(3)

注解目录 1、烧录方式的更新迭代 1.1 古老的烧录方式 (怀旧一下&#xff0c;单片机高压烧录器。) 1.2 ISP 与ICP 烧录方式 (还记得当年我们玩过的 AT89S51?) 1.3 更方便的 ISP 烧录方式 1.3.1串口 ISP &#xff08;是 STC 单片机成就了我们&#xff0c;还是我们成就了…

迅软DSE答疑专业解析:内网遭受攻击的威胁到底有多大

当今数字化时代&#xff0c;企业数据安全已演变为企业生存和发展的至关重要因素。随着信息技术的迅猛发展&#xff0c;企业内网不仅是承载核心数据和信息的关键平台&#xff0c;也成为黑客和恶意软件攻击的主要目标。因此&#xff0c;确保企业数据安全和内网安全已成为企业管理…

2024上海国际智能驾驶技术展览会(自动驾驶展)

2024上海国际智能驾驶技术展览会 2024 Shanghai International Autonomous driving Expo 时间&#xff1a;2024年3月26-28日 地点&#xff1a;上海跨国采购会展中心 随着科技的飞速发展&#xff0c;智能驾驶已经成为了汽车行业的重要趋势。在这个时代背景下&#xff0c;汽车不…

element ui的日期选择器动态设定年份,并默认显示在该年份范围的日期时间

选中某个年份,让日期选择器只能选择该年份内的时间&#xff0c;并且默认显示该年份的时间&#xff08;由于日期选择器默认显示为当前时间&#xff0c;所以需要跳转到选择的年份&#xff09; 例&#xff1a;年份选择了2022年&#xff0c;那么日期选择也相应显示到2022年&#xf…