设计山寨线程池

news2024/10/7 10:22:32
作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

联系qq:184480602,加我进群,大家一起学习,一起进步,一起对抗互联网寒冬

私以为造轮子几乎是最好的学习方式,甚至没有之一。因为造轮子需要至少做足以下两点:

  • 了解设计思想(设计层面)
  • 大略看过源码(代码层面)

不了解设计,就无法把握整体。没看过代码,就无法完善细节。另外,从创作者的角度来说,直接分析源码有时太困难了,代码太多,抽象层次太深。如果可以通过造轮子,把抽象层次减少一些,采用平铺直叙的方式呈现,那么读者理解起来也就更容易些。

既然造轮子这么好,那就,拿来吧你。今天带大家造一个线程池。

设计思路

不能免俗,直接“图片来自百度”:

池化技术

大家多少听过所谓的“池化技术”,比如数据库连接池、常量池、线程池、对象池等等。池化技术是计算机世界里比较常用的、行之有效的优化手段。那么我想问你,线程池中的“池”,到底指代什么?

抛开无足轻重的小虾米,线程池中最主要的就两个:我们向Executor提交的任务、Executor自己维护的Thread。其中,线程“池”显然指代的是Thread的集合。

但和数据库连接池等一般的池化技术不同的是,ThreadPool的作用不单单是“池化”,它更重要的职责其实是“做功”,也即是执行任务。举个例子,平时我们使用数据库连接池,其实都是从池中取出一个Connection,执行完SQL后会调用重写过的close()归还Connection。但你可曾见过有人向ThreadPool讨要Thread的?它会给你吗?ThreadPool的做法是:

想要从池中拿Thread?没门儿!你不知道自己多线程知识多菜啊?小心玩火自焚。要执行任务的话,你自己把Task丢进来,哥罩着你。

也就是说,ThreadPool从一开始就没想过让你们把Thread拿走!但你们又要返回结果咋整?我返回一个FutureTask,需要结果的话,自己FutureTask#get()。但主动权还是在ThreadPool这,能不能拿到结果、是否要阻塞都是它说了算!

到这里,ExecutorService为什么要提供返回值、AbstractExecutorService为什么要引入FutureTask都说得通了!然而,大部分人觉得线程池难,并非搞不清楚线程“池”,而是不了解它是如何“做功”的,也就是说:线程池是如何执行任务的呢?

这就涉及到线程池和一般池化技术最大的不同:内化执行操作,而且是通过生产消费的模式执行任务,大家可以在后面的Demo中看到。

生产消费模型

如果往线程池不断提交任务,大致会经历4个阶段:

  • 核心线程处理任务
  • 任务进入任务队列等待
  • 非核心线程处理任务
  • 拒绝策略

特别是第二个阶段,来不及处理的任务会被暂存入workQueue(任务队列),于是典型的生产消费模型就出现了。

调用者投递Task ====> ThreadPool.workQueue ====> workerThread阻塞获取Task执行

几个重要概念

JVM的Thread与操作系统的线程资源

平时我们都是通过new Thread().start()开启一个线程,而线程本质是操作系统资源,Java作为一门编程语言,是没法自己分配线程的。那么,JVM的Thread对象和操作系统的线程资源之间是什么关系呢?大家不妨想象一个场景:

在一个风雨交加的郊外,龙虎山张天师独自一人面对各路妖孽!只见张天师缓缓从怀中掏出一张“天雷符”,指尖轻轻一捻,符咒顿时蹿起了蓝色火苗。说时迟那时快,天空忽然乌云密布发出轰鸣,然后一条紫电如龙蛇一般从天而降,劈向那群魑魅魍魉。

“天雷符”不是真正的天雷,但通过天雷符可以召唤天雷,是一种契约绑定关系,Thread和操作系统的线程同理。

线程池如何复用线程?

有时候,要解决一个问题,从反方向入手可能更简单些。我们暂且先不管如何复用线程,我就问大家:如何回收/销毁线程?(知道线程什么情况会被销毁,那么只要避免销毁,也就可以复用)

“线程”这个词,其实有两个层次的指代:Thread对象、JVM线程资源(本质还是操作系统线程)。Thread对象与线程资源之间是绑定关系,一个线程资源被分配后,会找到Thread#run()作为代码的执行入口。

线程什么时候销毁呢?正常来说,new Thread(tartget).start()后,操作系统就会分配线程资源,而等到线程执行完Thread#run()中的代码,就会自然消亡。至于Thread对象,如果没有引用,也会被GC回收。

看到这,我想大家应该明白了:只要任务永远不结束,线程就永远死不了。任务如何才能永远不结束呀?要么循环做任务、要么阻塞。

线程池本质也是Thread,只是单体和集合的区别。既然Thread“跑完任务就销毁”的特性是天生的、注定的,线程池也无法改变这一点。所以,线程池要想让内部线程一直存活着,就要keeps threads busy working,也就是让它们一直干活。实在没活干怎么办?那就阻塞着呗(可以用阻塞队列)!总之,不能让你“执行完毕”,否则就销毁了。

如何保证只销毁“非核心线程”

大家都听过一些八股文的口诀,比如“在空闲时间,如果非核心线程空闲超过keepAliveTime就会被回收”,这是怎么实现的呢?

首先,有一个常见的误区是,很多人以为线程池创建线程时会给每一个Thread做标记,比如给核心线程标记为coreThread,非核心线程标记为nonCoreThread,然后空闲时间回收nonCoreThread。然而JDK的Doug Lea可不这么想,人家采用的方案更加简单粗暴:

  • 当前线程数 <= corePoolSize,那么所有线程都是核心线程,不回收
  • 当前线程数 > corePoolSize,回收多余线程

看吧,超出corePoolSize时,超出的部分线程就是“多余”的,实际回收时不会做具体选择,有一个回收一个,直到线程数不大于corePoolSize。(后面有一张示意图,可以帮助理解线程是如何被回收的)

超过corePoolSize后为什么优先入队?

很多面试官喜欢问:为什么核心线程数满了以后,线程池选择优先把任务丢入队列,而不是立即分配非核心线程处理呢?

实际上,这是把因果倒置了。线程池原本就是打算用队列缓冲执行任务,但同时为了保证“弹性”,所以才允许队列满了以后扩展非核心线程加快效率。所以不要问“为什么corePoolSize满了以后要优先入队,而不是直接分配非核心线程”,而应该思考“队列满了以后,再分配非核心线程的好处是什么”。

另外,线程资源比较宝贵(特别是频繁创建销毁),所以能用队列缓冲的话,就别额外创建线程咯。

简单版TheadPool

为了降低难度,我们先写一个极简版的线程池。

public class SimpleThreadPool {

    /**
     * 任务队列
     */
    BlockingQueue<Runnable> workQueue;

    /**
     * 工作线程
     */
    List<Worker> workers = new ArrayList<>();

    /**
     * 构造器
     *
     * @param poolSize  线程数
     * @param workQueue 任务队列
     */
    SimpleThreadPool(int poolSize, BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;
        // 创建线程,并加入线程池
        for (int i = 0; i < poolSize; i++) {
            Worker work = new Worker();
            work.start();
            workers.add(work);
        }
    }

    /**
     * 提交任务
     *
     * @param command
     */
    void execute(Runnable command) {
        try {
            // 任务队列满了则阻塞
            workQueue.put(command);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    /**
     * 工作线程,负责执行任务
     */
    class Worker extends Thread {
        public void run() {
            // 循环获取任务,如果任务为空则阻塞等待
            while (true) {
                try {
                    Runnable task = workQueue.take();
                    task.run();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

示意图

测试案例

public class SimpleThreadPoolTest {
    
    public static void main(String[] args) {
        
        SimpleThreadPool simpleThreadPool = new SimpleThreadPool(2, new ArrayBlockingQueue<Runnable>(2));
        
        simpleThreadPool.execute(() -> {
            System.out.println("第1个任务开始");
            sleep(3);
            System.out.println("第1个任务结束");
        });
        simpleThreadPool.execute(() -> {
            System.out.println("第2个任务开始");
            sleep(4);
            System.out.println("第2个任务结束");
        });
        simpleThreadPool.execute(() -> {
            System.out.println("第3个任务开始");
            sleep(5);
            System.out.println("第3个任务结束");
        });
    }


    private static void sleep(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上面SimpleThreadPool的核心只有一个:生产消费模型,不涉及提交任务时的各种逻辑判断(直接加入阻塞队列),也没有非核心线程的销毁。另外,往阻塞队列提交任务时用的是put,队列满了以后会阻塞,这种“拒绝策略”显然不太合理。复杂版的ThreadPool则会在此基础上扩展,把一些细节补充完整。

复杂版ThreadPool

由于拒绝策略并不是核心逻辑,这里就直接用抛异常代替了。除此以外,代码逻辑与结构、甚至变量名都基本与ThreadPoolExecutor一致,可谓麻雀虽小五脏俱全,建议debug走读一遍:

public class ThreadPool {

    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * 工作线程
     */
    private final List<Worker> workers = new ArrayList<>();
    /**
     * 任务队列
     */
    private BlockingQueue<Runnable> workQueue;
    /**
     * 核心线程数
     */
    private final int corePoolSize;
    /**
     * 最大线程数
     */
    private final int maximumPoolSize;
    /**
     * 非核心线程最大空闲时间(否则销毁线程)
     */
    private final long keepAliveTime;

    public ThreadPool(int corePoolSize,
                      int maximumPoolSize,
                      long keepAliveTime,
                      TimeUnit timeUnit,
                      BlockingQueue<Runnable> workQueue) {
        this.workQueue = workQueue;
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.keepAliveTime = timeUnit.toNanos(keepAliveTime);
    }

    public void execute(Runnable task) {
        Assert.notNull(task, "task is null");

        // 创建核心线程处理任务
        if (workers.size() < corePoolSize) {
            this.addWorker(task, true);
            return;
        }

        // 尝试加入任务队列
        boolean enqueued = workQueue.offer(task);
        if (enqueued) {
            return;
        }

        // 创建非核心线程处理任务
        if (!this.addWorker(task, false)) {
            // 非核心线程数达到上限,触发拒绝策略
            throw new RuntimeException("拒绝策略");
        }
    }

    private boolean addWorker(Runnable task, boolean core) {
        int wc = workers.size();
        if (wc >= (core ? corePoolSize : maximumPoolSize)) {
            return false;
        }

        boolean workerStarted = false;
        try {
            Worker worker = new Worker(task);
            final Thread thread = worker.getThread();
            if (thread != null) {
                mainLock.lock();
                workers.add(worker);
                thread.start();
                workerStarted = true;
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            mainLock.unlock();
        }

        return workerStarted;
    }

    private void runWorker(Worker worker) {
        Runnable task = worker.getTask();

        try {
            // 循环处理任务
            while (task != null || (task = getTask()) != null) {
                task.run();
                task = null;
            }
        } finally {
            // 从循环退出来,意味着当前线程是非核心线程,而且需要被销毁
            // Java的线程,既可以指代Thread对象,也可以指代JVM线程,一个Thread对象绑定一个JVM线程
            // 因此,线程的销毁分为两个维度:1.把Thread对象从workers移除 2.JVM线程执行完当前任务,会自然销毁
            workers.remove(worker); // 这里前后应该加锁,否则线程不安全。由于是demo,很多处理比较随意
        }
    }


    private Runnable getTask() {
        boolean timedOut = false;

        // 循环获取任务
        for (; ; ) {

            // 是否需要检测超时:当前线程数超过核心线程
            boolean timed = workers.size() > corePoolSize;

            // 需要检测超时 && 已经超时了
            if (timed && timedOut) {
                return null;
            }

            try {
                // 是否需要检测超时
                // 1.需要:poll阻塞获取,等待keepAliveTime,等待结束就返回,不管有没有获取到任务
                // 2.不需要:take持续阻塞,直到获取结果
                Runnable r = timed ?
                        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                        workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

    @Getter
    @Setter
    private class Worker implements Runnable {
        private Thread thread;
        private Runnable task;

        public Worker(Runnable task) {
            this.task = task;
            thread = new Thread(this);
        }

        @Override
        public void run() {
            runWorker(this);
        }
    }

}

代码示意图(虚线框内由Thread异步执行):

这个图有瑕疵,实际上线程池并不区分coreThread和nonCoreThread,仅看当前线程数是否大于corePoolSize

测试案例

@Slf4j
public class ThreadPoolTest {

    public static void main(String[] args) {

        // 创建线程池,核心线程1,最大线程2
        // 提交4个任务:第1个任务交给核心线程、第2个任务入队、第3个任务交给非核心线程、第4个任务被拒绝
        ThreadPool threadPoolExecutor = new ThreadPool(
                1,
                2,
                1,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(1)
        );

        threadPoolExecutor.execute(() -> {
            log.info("{}:执行第1个任务...", Thread.currentThread().getName());
            sleep(10);
        });

        sleep(1);

        threadPoolExecutor.execute(() -> {
            log.info("{}:执行第2个任务...", Thread.currentThread().getName());
            sleep(10);

        });

        sleep(1);

        threadPoolExecutor.execute(() -> {
            log.info("{}:执行第3个任务...", Thread.currentThread().getName());
            sleep(10);
        });

        sleep(1);

        threadPoolExecutor.execute(() -> {
            log.info("{}:执行第4个任务...", Thread.currentThread().getName());
            sleep(10);
        });

        sleep(1);

        log.info("main结束");
    }

    private static void sleep(int seconds) {
        try {
            TimeUnit.SECONDS.sleep(seconds);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

大家可以把测试案例中的线程池换成JDK的ThreadPoolExecutor,执行效果很类似:

作者简介:大家好,我是smart哥,前中兴通讯、美团架构师,现某互联网公司CTO

进群,大家一起学习,一起进步,一起对抗互联网寒冬

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1296929.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MPEG4Extractor

1、readMetaData 必须要找到 Moov box&#xff0c;找到 Mdat box或者 Moof box&#xff0c;并且创建了 ItemTable 大端 box 分为 box header 和 box content&#xff1a; box header由8个字节组成&#xff0c;前面四个字节表示这个box 的大小&#xff08;包含这个头的8字节&a…

『Linux升级路』进度条小程序

&#x1f525;博客主页&#xff1a;小王又困了 &#x1f4da;系列专栏&#xff1a;Linux &#x1f31f;人之为学&#xff0c;不日近则日退 ❤️感谢大家点赞&#x1f44d;收藏⭐评论✍️ 目录 一、预备知识 &#x1f4d2;1.1缓冲区 &#x1f4d2;1.2回车和换行 二、倒计…

Python与ArcGIS系列(十六)重复节点检测

目录 0 简述1 实例需求2 arcpy开发脚本0 简述 在处理gis线图层和面图层数据时,有时候会遇到这种情况:数据存在重复节点或伪重复节点(两个节点距离极小),往往我们需要对这种数据进行检测标注或进行修改。本篇将介绍如何利用arcpy及arcgis的工具箱实现这个功能。 1 实例需求…

【Hadoop_02】Hadoop运行模式

1、Hadoop的scp与rsync命令&#xff08;1&#xff09;本地运行模式&#xff08;2&#xff09;完全分布式搭建【1】利用102将102的文件推到103【2】利用103将102的文件拉到103【3】利用103将102的文件拉到104 &#xff08;3&#xff09;rsync命令&#xff08;4&#xff09;xsync…

PR剪辑视频做自媒体添加字幕快速方式(简单好用的pr视频字幕模板)

如何选择合适的字幕添加进短视频呢&#xff1f;首先要先确定增加的视频风格&#xff0c;简约、商务、科技感、炫酷&#xff1b;再确定用途&#xff0c;注释、标记、语音翻译、引用、介绍&#xff1b;最后在相应的模板中挑选几个尝试&#xff0c;悬着一个最切合主题的使用&#…

【C++学习手札】基于红黑树封装模拟实现map和set

​ &#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 &#x1f49c;本文前置知识&#xff1a; 红黑树 ♈️今日夜电波&#xff1a;漂流—菅原纱由理 2:55━━━━━━️&#x1f49f;──────── 4:29 …

node js 递归生成vue文件目录

目录 什么是 fs 文件系统模块 fs.existsSync方法 方法说明&#xff1a; 语法&#xff1a; 向指定的文件中写入内容 writeFile fs.writeFile() 的语法格式 fs.writeFile() 的示例代码 判断文件是否写入成功 fs.mkdir 创建目录 目录已存在&#xff0c;重复创建 创建的目…

Linux各目录结构说明

文章目录 目录说明源码放哪里&#xff1f;拓展&#xff1a;Linux里面安装软件是装在home目录还是opt目录还是/usr/local好&#xff1f; bin boot dev etc home lib lib64 lostfound media mnt opt proc root run sbin srv sys tmp usr var 目录说明 bin 存放二进制可执行文件&…

玩转大数据11:数据可视化与交互式分析

1. 引言 数据可视化和交互式分析是大数据领域中的重要方面。随着大数据时代的到来&#xff0c;数据量越来越大&#xff0c;数据类型越来越复杂&#xff0c;传统的数据处理和分析方法已经无法满足我们的需求。数据可视化可以将复杂的数据以简单、直观的方式呈现出来&#xff0c…

素材创作平台,解决企业素材供给问题

企业对于高质量素材的需求日益增长。无论是为了提升品牌形象&#xff0c;还是为了推动产品销售&#xff0c;都需要大量的专业设计素材。然而&#xff0c;素材的获取、设计和定制往往是一项耗时耗力的工作。这时&#xff0c;美摄科技素材创作平台应运而生&#xff0c;为企业提供…

JVM 虚拟机 类的加载器分类与测试详解

Java全能学习面试指南&#xff1a;https://javaxiaobear.cn 1、类加载器的分类说明 JVM支持两种类型的类加载器&#xff0c;分别为引导类加载器&#xff08;Bootstrap ClassLoader&#xff09;和自定义类加载器&#xff08;User-Defined ClassLoader&#xff09;。 从概念上来…

webpack该如何打包

1.我们先创建一个空的大文件夹 2.打开该文件夹的终端 输入npm init -y 2.1.打开该文件夹的终端 2.2在该终端运行 npm init -y 3.安装webpack 3.1打开webpack网址 点击“中文文档” 3.2点击“指南”在点击“起步” 3.3复制基本安装图片画线的代码 4.在一开始的文件夹下在创建一…

营收增速持续放缓,博通CEO期待AI崛起救场 | 百能云芯

博通作为苹果等大型科技公司的芯片供应商&#xff0c;于周四发布了财报。尽管截至10月29日的第四季度营收增长了4%至93亿美元&#xff0c;符合市场预期&#xff0c;但增速已经降至2020年以来的最低水平。 由于企业客户和电信供应商在控制支出方面的谨慎态度&#xff0c;博通的销…

C 语言 变量

变量初始值 全局变量&#xff1a;初始值是 0 局部变量&#xff1a;初始值是 随机的 类型限定符 通常不需要显式使用 register 关键字来优化变量的存储和访问。 关键字 _Complex和_Imaginary分别用于表示复数和虚数&#xff08;二者皆是数学概念&#xff09; 变量的声明和定义 c…

【小白专用】MySQL创建数据库和创建数据表

1.在Windows开始搜索输入Mysql,并选择第一个打开。 2.输入安装时的密码 3.说明安装成功。 二、创建数据库 1. 连接 MySQL 输入 mysql -u root -p 命令&#xff0c;回车&#xff0c;然后输入 MySQL 的密码(不要忘记了密码)&#xff0c;再回车&#xff0c;就连接上 MySQL 了。 …

Vue3选项式-基础部分篇

Vue3选项式风格-基础部分篇 简介模板语法文本插值原始HTMLAttribute 绑定使用 JavaScript 表达式调用函数全局组件调用内置指令动态参数注意事项 data()data()深度响应 methods有状态的methods(防抖) DOM更新时机计算属性class和style绑定条件渲染列表渲染数组变换侦听事件处理…

全网最新最全【Kali Linux】 渗透测试技术大全

利矛出击&#xff1a;Kali Linux 渗透测试大法 在网络攻防实践中&#xff0c;系统的防御措施可以称为“盾”&#xff0c;而渗透测试则是“矛”。二者同等重要&#xff0c;因为“矛”可以试探出“盾”有多坚固&#xff0c;并且及时发现问题&#xff0c;修补漏洞&#xff0c;提升…

k8s之镜像拉取时使用secret

k8s之secret使用 一、说明二、secret使用2.1 secret类型2.2 创建secret2.3 配置secret 一、说明 从公司搭建的网站镜像仓库&#xff0c;使用k8s部署服务时拉取镜像失败&#xff0c;显示未授权&#xff1a; 需要在拉取镜像时添加认证信息. 关于secret信息,参考: https://www.…

[英语学习][10][Word Power Made Easy]的精读与翻译优化

[序言] 下面这段话, 译者翻译没有太大问题, 就是某些单词上, 跟他理解得不一样. 另外还有一个关键的定语从句, 我认为译者理解不到位, 导致翻译不够通顺. [英文学习的目标] 提升自身的英语水平, 对日后编程技能的提升有很大帮助. 希望大家这次能学到东西, 同时加入我的社区讨…

修改正点原子综合实验的NES模拟器按键控制加横屏

​​​​​​​ 开发板&#xff1a;stm32f407探索者开发板V2 屏幕是4.3寸-800-480-MCU屏 手头没有V3开发板&#xff0c;只有V2&#xff0c;所以没法测试 所以只讲修改哪里&#xff0c;请自行修改 先改手柄部分&#xff0c;把手柄改成按键 找到左边的nes文件夹中的nes_mai…