Java多线程篇(3)——线程池

news2025/1/12 19:49:18

文章目录

  • 线程池
  • ThreadPoolExecutor源码分析
    • 1、如何提交任务
    • 2、如何执行任务
    • 3、如何停止过期的非核心线程
    • 4、如何使用拒绝策略
  • ScheduledThreadPoolExecutor源码分析

线程池

快速过一遍基础知识
7大参数
corePoolSize : 核心线程数
maximumPoolSize: 最大线程数
keepAliveTime: 空闲线程存活时间
TimeUnit: 时间单位
BlockingQueue:任务队列
ThreadFactory: 创建线程的工厂
RejectedExecutionHandler:拒绝策略

拒绝策略
AbortPolicy:中止策略,线程池会抛出异常并中止执行此任务;
CallerRunsPolicy:把任务交给添加此任务的(main)线程来执行;
DiscardPolicy:忽略此任务,忽略最新的一个任务;
DiscardOldestPolicy:忽略最早的任务,最先加入队列的任务。

内置的线程池
SingleThreadExecutor(单线程):1 - 1 - Interge.MAX(核心线程-最大线程-队列长度)
FixedThreadPool(固定大小):N - N - Interge.MAX
CachedThreadPool(缓存):0 - Integer.MAX - 0
ScheduledThreadPool(定时):线程池的另一个关于定时的分支

为什么不推荐使用内置的线程池?
SingleThreadExecutor和FixedThreadPool无法控制队列长度可能导致OOM ,而CachedThreadPool无法控制线程数量可能导致大量的线程创建。


ThreadPoolExecutor源码分析

先不考虑ScheduledThreadPool,后面再单独说明定时线程池。

1、如何提交任务

ThreadPoolExecutor#execute

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        //新建核心线程
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //加入阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
       		//双重检测
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
             //如果当前没有正在运行的线程,则新增一个非核心线程(任务为null,表示线程的任务将会从阻塞队列中获取)
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //新建非核心线程
        else if (!addWorker(command, false))
            reject(command);
    }

也就是
在这里插入图片描述
submit和execute的区别
在这里插入图片描述
其实没啥太大的区别,submit最后也是调用的execute,只不过在调用之前封装了task为FutureTask,表示有返回值的任务,最后将返回值返回
不过有一点需要注意的是。FutureTask,不仅会返回结果,还会把原本runnable中的异常吃了。所以submit提交的任务如果抛异常了,外部是无法感知的
FutureTask#run
在这里插入图片描述
测试结果
在这里插入图片描述

2、如何执行任务

ThreadPoolExecutor#addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (int c = ctl.get();;) {
            if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue.isEmpty()))
                return false;
            for (;;) {
                //COUNT_MASK掩码,舍去前3位(因为前3位是状态位,后面的才是任务数)
                if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();
                if (runStateAtLeast(c, SHUTDOWN))
                    continue retry;
            }
        }
        //上面主要是ctl++,其他很多都是检测

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            //新建一个worker,封装了firstTask
            //(worker也实现了Runnable,相当于对firstTask封装了一层)
            w = new Worker(firstTask);
            //这里线程的runable实现是worker而不是firstTask
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    int c = ctl.get();
                    //一些检测
                    if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) {
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    //Thread.start()->runnable.run()也就是worker.run()->runWorker(worker)
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

addWorker新建worker对象,封装了新建的线程对象和原始task。线程的执行调用如下:
thread.start()->runnable.run()也就是worker.run()->runWorker(worker)
在这里插入图片描述

ThreadPoolExecutor#runWorker

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock();
        boolean completedAbruptly = true;
        
        try {
        
        	//worker的task为null(addWorker传入的参数)则从阻塞队列中获取一个task
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //检测是否需要中止线程
                if ((runStateAtLeast(ctl.get(), STOP) 
                	|| (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
                    wt.interrupt();
                    
                try {
                    //执行前回调
                    beforeExecute(wt, task);
                    try {
                    	//执行任务
                        task.run();
                        //执行后回调
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            
            completedAbruptly = false;
        } finally {
            // finally 调用
            processWorkerExit(w, completedAbruptly);
        }
    }

所以runWorker就是如果worker手上有task,就先把手头上的task执行了,然后再(循环)去阻塞队列获取task执行。如果没有就直接去阻塞队列获取task执行。

那么 finally 那里的 processWorkerExit 是干嘛用的?

执行到processWorkerExit要么就是异常情况跳出循环(completedAbruptly=true),要么就是worker手上和阻塞队列均没有task跳出循环(completedAbruptly=false)。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
		//如果是异常退出的,此时workerCount还没调整,所以需要工作线程数减1
        if (completedAbruptly)
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        
        //更新 完成任务数,以及移除worker
        try {
            completedTaskCount += w.completedTasks;
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
		
		//尝试终止线程
        tryTerminate();

        int c = ctl.get();
        //如果不是异常退出,则根据配置计算需要的最小工作线程数
        //如果是异常退出,或者当前工作线程小于上面根据配置计算的最小工作线程
        //则都用一个新worker来替换原来的worker
        if (runStateLessThan(c, STOP)) {
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                if (workerCountOf(c) >= min)
                    return;
            }
            //启动一个worker替换原来的worker
            addWorker(null, false);
        }
    }

总之这段代码的主要作用是在工作线程退出时,更新线程池的状态、计数,以及根据配置来决定是否需要新的worker替代退出的工作线程,以保持线程池的正常运行。

3、如何停止过期的非核心线程

答案在getTask()。

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();

            // 一些退出的状态就直接返回
            if (runStateAtLeast(c, SHUTDOWN)
                && (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            //是否需要超时淘汰
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
			
			//在确保当workQueue不为空时至少有一个工作线程的前提下
			//来淘汰超出 maximumPoolSize 或者超时的线程
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //阻塞获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //标记超时
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

其实线程池并没有标记谁是核心线程,谁是非核心线程,只关心核心线程和非核心线程的数量。也就是说无论是哪个线程在获取任务时都有可能被标记为timeOut,并且每次获取任务都会根据核心线程数,最大线程数,当前线程数,timeout标记等判断是否需要当前worker,如果不需要就返回null,跳出runWorker的循环,进而结束线程。

4、如何使用拒绝策略

在提交任务的时候,如果addWorker失败就会进入拒绝策略的逻辑。

 public void execute(Runnable command) {
        //...
        //加入阻塞队列
        if (isRunning(c) && workQueue.offer(command)) {
            //...
            if (! isRunning(recheck) && remove(command))
                //双重检测失败进入拒绝策略
                reject(command);
                //...               
        }
        //新建非核心线程
        else if (!addWorker(command, false))
        	//非核心线程添加失败,进入拒绝策略
            reject(command);
}


final void reject(Runnable command) {
     handler.rejectedExecution(command, this);
}

ScheduledThreadPoolExecutor源码分析

.schedule():延迟执行,只执行一次。
.scheduleAtFixedRate():固定频率执行,按照固定的时间间隔来调度任务。
.scheduleWithFixedDelay():固定延迟执行,在上一次任务完成后的固定延迟之后再次执行任务。

无论是哪种都会先将task封装成 ScheduledFutureTask,然后调用 delayedExecute
scheduleAtFixedRate为例:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0L)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                           //scheduleWithFixedDelay与scheduleAtFixedRate的区别就只在这里
                                           //scheduleWithFixedDelay 传的是 -unit.toNanos(period)
                                           //后续会根据这个值的正负来判断是固定频率还是固定延迟
                                          unit.toNanos(period),
                                          sequencer.getAndIncrement());
        //封装成 ScheduledFutureTask 
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        //调用 delayedExecute
        delayedExecute(t);
        return t;
    }

delayedExecute

private void delayedExecute(RunnableScheduledFuture<?> task) {
        if (isShutdown())
            reject(task);
        else {
            //task添加到队列
            //这同样也是自己实现的一个延迟队列,大概的逻辑就是:先按时间排,如果时间一样就按插入的顺序排。
            super.getQueue().add(task);
            //一些检测
            if (!canRunInCurrentRunState(task) && remove(task))
                task.cancel(false);
            else
            	//保证有足够的woker正在工作
                ensurePrestart();
        }
    }

void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
        	//addWorker跟就上面的是一样的了
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    }

那么凭什么将Worker的task封装成 ScheduledFutureTask 能起到持续调用的效果,来看看他的 run 方法。
ScheduledFutureTask#run

        public void run() {
        	//一些检测
            if (!canRunInCurrentRunState(this))
                cancel(false);
            //如果不是周期性任务就只调用一次(period不为0则表示不是周期性任务)
            else if (!isPeriodic())
                super.run();
            //如果是周期性任务就在调用完之后
            //设置下次调用时间并将任务放回队列且保证有足够的woker正在工作
            else if (super.runAndReset()) {
                setNextRunTime();
                reExecutePeriodic(outerTask);
            }
        }

ScheduledFutureTask#setNextRunTime

        private void setNextRunTime() {
            long p = period;
            //根据period的正负来区分是固定频率还是固定延迟
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
        }

ScheduledThreadPoolExecutor#reExecutePeriodic

    void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        if (canRunInCurrentRunState(task)) {
        	//放回队列
            super.getQueue().add(task);
            if (canRunInCurrentRunState(task) || !remove(task)) {
            	//保证有足够的woker正在工作
                ensurePrestart();
                return;
            }
        }
        task.cancel(false);
    }

所以ScheduledThreadPoolExecutor的总体框架设计和上面的ThreadPoolExecutor是一样的(毕竟是他的子类)。
最主要的区别在于ScheduledThreadPoolExecutor里worker使用的task是自己内部实现的 ScheduledFutureTask 类,而该类的run方法在执行完后会设置下一次的执行时间并将任务放回队列中等待执行。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1023463.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docsify介绍—md文件直接生成网页的工具

Markdown是一种轻量级标记语言&#xff0c;它使用易读易写的纯文本格式&#xff0c;用于编写文档&#xff0c;如README&#xff0c;wiki&#xff0c;博客文章等。Markdown语言最初由约翰格鲁伯&#xff08;John Gruber&#xff09;和亚伦斯沃茨&#xff08;Aaron Swartz&#x…

ssm637教材管理系统+vue

项目介绍 当下&#xff0c;正处于信息化的时代&#xff0c;许多行业顺应时代的变化&#xff0c;结合使用计算机技术向数字化、信息化建设迈进。以前学校对于教材信息的管理和控制&#xff0c;采用人工登记的方式保存相关数据&#xff0c;这种以人力为主的管理模式已然落后。本…

YOLO系列

一、YOLOv1 Bbox使用开根号的原因&#xff1a; 小目标与大目标偏移相同尺度时&#xff0c;小目标检测效果差&#xff0c;那么公式中使用根号就会让小目标损失值更大了 V1网络的一些问题&#xff1a; 小目标检测效果差&#xff0c;当目标出现新配置也不行&#xff0c;目标定位…

红黑树插入的实现

红黑树&#xff1a; 1.概念&#xff1a; 红黑树的性质&#xff1a; 红黑树的插入操作&#xff1a; 其前面的插入和二叉搜索树的一模一样&#xff0c;只是后面需要判断是否满足红黑树的性质&#xff1a; 具体分为三种情况&#xff1a; 1.uncle节点存在且为红色的&#xff1a…

芯科蓝牙BG27开发笔记9-资料整理

SSV5中的例程 在ssv5中有少量开箱即用的例程&#xff0c;第一篇笔记就是从这里开始的&#xff1a; 然而&#xff0c;仅仅这些代码吗&#xff1f;其他例程代码在何处&#xff1f; Software Developer Docs - Silicon Labs 所有内容都可以在官网文档找到。 之前是从ssv5直接开…

Bard人工智能9月19日重大更新

1、巴德现在可以回复来自谷歌地图、航班、酒店和YouTube的实时信息&#xff0c;因此您可以在一个地方完成更多工作。 2、Bard 可能会与其他服务共享您的部分对话和其他相关信息&#xff0c;例如您的位置。这些服务可能会使用该信息进行改进&#xff0c;即使您以后删除了您的 Ba…

python基础语法(四)

感谢各位大佬对我的支持,如果我的文章对你有用,欢迎点击以下链接 &#x1f412;&#x1f412;&#x1f412;个人主页 &#x1f978;&#x1f978;&#x1f978;C语言 &#x1f43f;️&#x1f43f;️&#x1f43f;️C语言例题 &#x1f423;&#x1f413;&#x1f3c0;python 这…

day50:QTday3,对话框补充、事件处理机制

一、完成文本编辑器的保存工作 widget.h: #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include<QFontDialog> //字体对话框 #include<QFont> //字体类 #include<QMessageBox> //消息对话框 #…

springboot配置注入增强(三)自定义数据源/自定义解析方法

我们回忆下上一篇文章的内容&#xff0c;属性注入的关键节点是PropertySourcesPlaceholderConfigurer的BeanFactory后置处理器org.springframework.context.support.PropertySourcesPlaceholderConfigurer#postProcessBeanFactory&#xff0c;只有在执行这个方法前设置到Enviro…

竞赛 基于深度学习的中文情感分类 - 卷积神经网络 情感分类 情感分析 情感识别 评论情感分类

文章目录 1 前言2 情感文本分类2.1 参考论文2.2 输入层2.3 第一层卷积层&#xff1a;2.4 池化层&#xff1a;2.5 全连接softmax层&#xff1a;2.6 训练方案 3 实现3.1 sentence部分3.2 filters部分3.3 featuremaps部分3.4 1max部分3.5 concat1max部分3.6 关键代码 4 实现效果4.…

平均精度(AP)

什么是平均精度(AP) 平均精度 (AP)并不是精度 (P)的平均值。 平均精度 (AP) 是按类别计算的。 mAP&#xff08;mean average precision&#xff09;是一个平均值&#xff0c;常用作目标检测中的检测精度指标mAP 指标通过对于一个平均目标来检测任务中多个目标所对应不同 AP&a…

9.19号作业

2> 完成文本编辑器的保存工作 widget.h #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QFontDialog> #include <QFont> #include <QMessageBox> #include <QDebug> #include <QColorDialog> #include <QColor&g…

Centos安装显卡

1、安装基础环境 yum -y install epel-release yum -y install gcc kernel-devel kernel-headers 2.对应内核版本 yum info kernel-devel kernel-headers Cat /proc/version 3、yum安装版本不对应。则去官网手动下载 离线安装对应的rpm&#xff1a; https://pkgs.org/dow…

电脑桌面的复选框如何取消

电脑桌面图标的复选框如何取消 1. 概述2. 去掉图标的复选框方法结束语 1. 概述 当你拿到新的电脑开机后&#xff0c;发现桌面上软件应用的图标左上角有个小框&#xff0c;每次点击图标都会显示&#xff0c;并且点击图标时&#xff0c;小框还会打上√&#xff1b; 这个小框的…

移动端APP测试-如何指定测试策略、测试标准?

制定项目的测试策略是一个重要的步骤&#xff0c;可以帮助测试团队明确测试目标、测试范围、测试方法、测试资源、测试风险等&#xff0c;从而提高测试效率和质量。本篇是一些经验总结&#xff0c;理论分享。并不是绝对正确的&#xff0c;也欢迎大家一起讨论。 文章目录 一、测…

activiti7的数据表和字段的解释

activiti7的数据表和字段的解释 activiti7版本有25张表&#xff0c;而activiti6有28张表&#xff0c;activiti5有27张表&#xff0c;绝大部分的表和字段的含义都是一样的&#xff0c;所以本次整理的activiti7数据表和字段的解释&#xff0c;也同样适用于activiti6和5。 1、总览…

higher-order function in functional programming (JS)

1 functional programming该怎么理解&#xff1f; functions就是values&#xff0c;就像String or Numbers那样&#xff0c;可以构造匿名函数&#xff0c;并把函数赋给某个变量 或者 传递给其他函数&#xff08;higher-order function&#xff09; 2 higher-order function有…

B树的定义和特点

1.多叉查找树的效率 策略1:m叉查找树中&#xff0c;规定除了根节点外&#xff0c;任何结点至少有[m/2]个分叉&#xff0c;即至少含有[m/2]-1个关键字。策略2:m叉查找树中&#xff0c;规定对于任何一个结点&#xff0c;其所有子树的高度都要相同。 而满足以上两种策略的树被称…

新手怎样快速上手接口测试?掌握这几个知识点直接起飞!

接口测试是测试系统组件间接口的一种方式&#xff0c;接口测试主要用于检测外部系统与系统之间以及内部各个子系统之间的交互点。测试的重点是检查数据的增删改查操作&#xff0c;以及系统之间的逻辑关系等。 接口的几种类型 接口的类型包括&#xff1a;post &#xff0c;get&…

Postman应用——控制台调试

当你在测试脚本中遇到错误或意外行为时&#xff0c;Postman控制台可以帮助你识别&#xff0c;通过将console.log调试语句与你的测试断言相结合&#xff0c;你可以检查http请求和响应的内容&#xff0c;以及变量之类的。 通常可以使用控制台日志来标记代码执行&#xff0c;有时…