JAVA并发编程工具篇--1理解线程池任务的执行和线程的销毁

news2024/11/22 21:52:27

前言:在编程中我们为什么要使用线程池,线程池中的线程是怎么执行任务的,线程池中的线程是如何复用和销毁的;

1 什么是线程池:
提前创建一些线程放到一个地方,使用的时候直接获取,避免频繁的创建和销毁线程,节省内存和CPU资源;
2 Java 中已有的线程池:

ExecutorService fixedThreadPool = Executors.newFixedThreadPool(10);

ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();

ExecutorService cashedThreadPool = Executors.newCachedThreadPool();

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(2);

3 一个线程池构建需要的参数:
我们的任务是由某一个线程具体去执行的,所以我们就要定义好池中线程的数量,并且当任务数量增加时,可以开辟一些临时线程进行任务处理,当池中的线程已经是多余的时候再回收掉一些线程来节约资源;当池中的线程都有任务执行,这个时候来了新的任务,需要有个地方能把这些任务先行储存,以便当有空余的线程时在执行存储下来的任务,而且考虑到资源的情况这个存储任务的队列也最好是有限量的,如果超出了程序的处理能力,使用者可以自己决定拒绝策略;
所以在创建线程池的时候有必要以下参数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;

corePoolSize: 正式存在的线程数;
maximumPoolSize:允许存在的最大线程,扩容的线程数= maximumPoolSize-corePoolSize
keepAliveTime:临时线程存活的时间
Unit:临时线程存活的时间单位
workQueue:阻塞队列
threadFactory:线程工厂
Handler:拒绝策略;

4 线程池任务的执行和线程销毁:
demo:

ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(new Thread( () -> testGetFutureMap("param")));
private static Map<String, Object> testGetFutureMap(String param) {
   // 处理业务逻辑
   Map<String, Object>  mapData = new HashMap<>();
   /**
    * do some thing
    */
   System.out.printf("do some thing");
   return mapData;
}

ThreadPoolExecutor:execute 线程任务的执行

 public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
    	// 当前工作的线程小于核心线程,直接新建线程,并且进行任务的执行
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 当前工作的线程大于核心线程,或者直接添加任务失败
    if (isRunning(c) && workQueue.offer(command)) {
    	// 添加到队列中,等待后续空闲线程执行任务
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
   //   当前工作的线程大于核心线程,或者直接添加任务失败,并且添加队列失败
   // 开启临时线程进行任务处理
    else if (!addWorker(command, false))
    	// 如果失败执行拒绝策略
        reject(command);
}

addWorker:线程任务的执行

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
		// 判断当前线程池是否可用	
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
			 // 判断当前工作线程数和核心线程数或者最大线程数
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
			// 增加工作线程数,增加成功,直接跳出for 循环
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
			// 增加线程数失败,进行重试
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
       // 创建线程,传入任务
      // 执行任务 new Thread(new Worker()).start();
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 添加任务  
		             workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
		         // 启动线程执行任务
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
	        // 添加任务失败,工作任务数量-1,移除任务
            addWorkerFailed(w);
    }
    return workerStarted;
}

compareAndIncrementWorkerCount:增加线程池中的线程数量

private boolean compareAndIncrementWorkerCount(int expect) {
	// 工作线程数量+1
     return ctl.compareAndSet(expect, expect + 1);
 }

构建Worker用于具体任务的执行:

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}

执行任务 t.start() ,调用Worker run():

public void run() {
    runWorker(this);
}
finalvoid runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
        	// 任务执行,没有任务的时候不进入while 循环
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            // 线程不可用
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
            	// 在任务执行之前,此方法可以被重写
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                	// 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                	// 任务执行以后
                    afterExecute(task, thrown);
                }
            } finally {
            	// 执行任务后
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
    	// 线程的退出,减少工作线程数量
        processWorkerExit(w, completedAbruptly);
    }
}

获取任务 getTask():

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        // 线程池不可用
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // Are workers subject to culling?
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            // 当前工作线程大于核心线程,并且没有了任务,则将线程池中线程数量-1
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
        	// 当前工作线程大于核心线程数 则进行规定时间内获取任务(规定时间内没有获取到则说明当前没有需要执行的任务)
        	// 否则直接获取任务
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;// 返回任务
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

任务执行完毕后退出线程,processWorkerExit:

private void processWorkerExit(Worker w, boolean completedAbruptly) {
   if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
       decrementWorkerCount();

   final ReentrantLock mainLock = this.mainLock;
   mainLock.lock();
   try {
   		// 增加任务完成的数量
       completedTaskCount += w.completedTasks;
       // 移除任务
       workers.remove(w);
   } finally {
       mainLock.unlock();
   }

   tryTerminate();

   int c = ctl.get();
   if (runStateLessThan(c, STOP)) {
       if (!completedAbruptly) {
           int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
           if (min == 0 && ! workQueue.isEmpty())
               min = 1;
           if (workerCountOf(c) >= min)
               return; // replacement not needed
       }
       addWorker(null, false);
   }
}

以上为线程池中线程创建,执行任务,以及销毁线程的过程,流程图如下:
在这里插入图片描述
过程:
(1)当提交任务后,如果当前工作的线程没有超过核心线程,则创建线程然后进行任务的执行;
(2)当工作的现车超过核心线程数,则尝试添加到阻塞队列中,添加成功后,有空闲线程时从队列中获取任务并执行;
(3)如果阻塞队列已满,则判断是否要增加临时线程处理任务,如果已经达到最大线程数,则执行拒绝策略;否则创建临时线程,执行任务;
(4)当任务执行完毕后,如果没有了任务,并且当前工作的线程大于核心小程,则执行线程的销毁;

5 总结:
5.1 线程池的创建是为了避免线程频繁的创建和销毁,是为了线程的复用,增加线程池中的线程可以提高任务执行的效率,但是线程池中线程过多会造成频繁的上下文切换,所以线程数量并不是越多越好;
5.2 我们可以自定义线程池,通过重写方法的方式,更好的监控线程的执行:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceMonitor extends ThreadPoolExecutor {
    public ExecutorServiceMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }
	// 线程执行任务之前
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        
        super.beforeExecute(t, r);
    }
  // 线程执行任务之后
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/144650.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

CalDAV网页客户端AgenDAV

什么是 AgenDAV &#xff1f; AgenDAV 是一个类似于 Google 日历的 CalDAV 网络客户端&#xff0c;具有 AJAX 界面&#xff0c;允许用户管理自己的日历和共享的日历。 注意事项 AgenDAV依赖于 CalDAV 服务器&#xff08;Bakal、DAViCal 等&#xff09;&#xff0c;所以需要先安…

软件测试员在面试中常遇问题

目前&#xff0c;疫情已经逐渐得到了控制&#xff0c;各行各业都掀起了复工大潮。与此同时&#xff0c;软件测试的招聘需求也随着复工的开始而变得紧急起来&#xff0c;而求职者应该怎样抓住机会进行应聘呢&#xff1f;首先最重要的就是多刷面试题&#xff0c;这样才能才面试过…

CSS权威指南(五)字体

文章目录1.字体族2.font-face3.字重&#xff08;font-weight&#xff09;4.字号&#xff08;font-size&#xff09;5.字形&#xff08;font-style&#xff09;6.字体拉伸&#xff08;font-stretch&#xff09;7.字距&#xff08;font-kerning&#xff09;8.字体变形&#xff08…

Python 办公自动化,全网最全整理来了!拒绝无效率加班!

大家好&#xff0c;今天给大家分享一篇 Python 自动化办公干货&#xff0c;整整42个实战项目案例。每一个项目案例都有详细的视频讲解&#xff0c;是一套非常全面的Python自动化办公项目&#xff0c;建议大家收藏后学习&#xff0c;梳理不易&#xff0c;记得点赞支持。详细目录…

【菜菜的CV进阶之路 - 深度学习环境搭建】常用软件安装

四、安装网易云 双系统装完了&#xff0c;下一步当然是&#xff0c;休息一下&#xff0c;听一首歌啦~ 1、连网&#xff1a;只能使用wifi连&#xff0c;网线直连的话&#xff0c;还需要配置 2、安装网易云&#xff1a; 下载最新的Linux安装包&#xff0c;然后 sudo apt inst…

数据的存储(C语言)

数据类型&#xff1a; 要了解数据是如何存储的&#xff0c;我们就得先知道C语言中的数据类型 基本数据类型 基本数据类型&#xff0c;也就是C语言内置类型&#xff1a; char -> 字符型 short -> 短整型 int -> 整…

html textarea 插入字符在光标处

textarea 插入字符在光标处前言深度解析1 效果图上代码前言 深度解析 1 效果图 上代码 <!DOCUMENT><html><head> <link rel"stylesheet" href"https://cdnjs.cloudflare.com/ajax/libs/font-awesome/4.7.0/css/font-awesome.min.css&qu…

Semantic Segmentation | 评价指标代码解析

如有错误&#xff0c;恳请指出。 文章目录1. 定义解析2. 代码解析之前有记录过关于图像语义分割的相关评价指标与经典网络&#xff0c;在看PointNet的语义分割训练脚本的时候&#xff0c;图像的语义分割和点云的语义分割其实本质上是一致的。所以这里想记录一下语义分割的评价指…

MySQL下载及使用navicat连接mysql数据库(含下载地址、超具体细节、推荐数据库教程)

目录下载地址安装流程第一步&#xff1a;开始安装第二步&#xff1a;类型选择第三步&#xff1a;developer default第四步&#xff1a;execute第五步&#xff1a;服务器配置窗口第六步&#xff1a;网络类型配置窗口第七步&#xff1a;第八步&#xff1a;服务器密码设置窗口第九…

Vue全家桶 Pinia状态管理

Pinia状态管理1. Pinia和Vuex的对比2. 创建Pinia的Store3. Store 简介与使用4. Pinia核心概念State5. Pinia核心概念Getters6. Pinia核心概念ActionsPinia开始于大概2019年&#xff0c;最初是作为一个实验为Vue重新设计状态管理&#xff0c;让它用起来像组合式API&#xff0c;从…

[C语言]初步的来了解一下指针(多图详解)

目录 1.指针是什么 2.指针类型 2.1指针类型的意义(-整数) 2.2指针的解引用 3.野指针 3.1野指针出现的情况 3.2 如何规避野指针 4.指针运算 4.1指针-整数 4.2指针-(减)指针 5.二级指针 1.指针是什么 指针是内存中最小的单元编号&#xff0c;也就是地址。指针还可以是一种…

录屏软件电脑版免费哪个好?4款免费屏幕录制软件下载

在电脑上经常能使用的录屏功能&#xff1a;录制软件的操作过程、精彩的游戏瞬间、经典的影视故事、网络教学等。许多人都在问&#xff0c;录屏软件电脑版哪个好&#xff1f;Windows平台上有很多录屏软件&#xff0c;如果对录屏需求不高的朋友&#xff0c;可以通过内置的视频软件…

08线性回归+基础优化算法

P2基础优化算法 1.最常见的优化算法——梯度下降&#xff0c;用在模型没有显示解的情况下&#xff08;线性回归有显示解&#xff0c;但是现实中很少有这样理想的情况&#xff09; 2.梯度下降的实现方法&#xff1a;沿着反梯度更新方向参数求解 解释&#xff1a; 超参数&#x…

HTTP_day03

当键入网址后&#xff0c;到网页显示&#xff0c;其间发生了什么&#xff08;下&#xff09; 掘金地址 键入 localhost ,通过 Wireshark 抓包分析&#xff0c;抓包结果如下图所示 抓包结果 我们知道 HTTP 协议是运行在 TCP/IP 基础 之上的。 浏览器 通过 HTTP 接收和发送数据…

怎么才能写出好的代码

前言这是一篇如何写好代码的水文&#xff0c;因为最近输出了不少干货&#xff0c;但是大家点赞太少&#xff0c;我越来越没有激情了&#xff0c;那就放放水啦。所以如果大家觉得我的分享对你有用&#xff0c;动动发财小手&#xff0c;赞起来吧&#xff01;虽然是一篇水文&#…

谷粒学苑项目-第一章数据库设计与项目结构-1.1

一、数据库设计 1、数据库 guli2、数据表 CREATE TABLE edu_teacher (id char(19) NOT NULL COMMENT 讲师ID,name varchar(20) NOT NULL COMMENT 讲师姓名,intro varchar(500) NOT NULL DEFAULT COMMENT 讲师简介,career varchar(500) DEFAULT NULL COMMENT 讲师资历,一句话说…

Java--经典九道练习题

文章内容 一、用户登录 二、遍历字符串 三、统计字符个数 四、拼接字符串 五、字符串反转 六、金额转换&#xff08;较难&#xff09; 七、手机号屏蔽 八、身份证号码信息查看 九、游戏骂人敏感词替换 一、用户登录 一直正确的用户名和密码&#xff0c;请用程序实现模…

获取当前进程的启动程序

本文迁移自本人网易博客&#xff0c;写于2012年3月23日&#xff0c;获取当前进程的启动程序 - lysygyy的日志 - 网易博客 (163.com)1.获取当前进程的启动程序CString sFile;GetModuleFileName(NULL, sFile.GetBuffer(), 255);2.获取文件类型3.Autocad文档交互时&#xff0c;使用…

Camera | 1.Camera基础知识

一口君最近在玩瑞芯微的板子&#xff0c;之前写了几篇基于瑞芯微的文章&#xff0c;大家可以学习一下。 《瑞芯微rk356x板子快速上手》 《Linux驱动|rtc-hym8563移植笔记》 《Linux驱动 | Linux内核 RTC时间架构》 《瑞芯微 | 摄像头ov13850移植笔记》 《rk3568 | 瑞芯微平…

图的生成树与生成森林

文章目录连通图与连通分量强连通图与强连通分量图的连通性判断生成树深度优先生成树邻接表邻接矩阵广度优先生成树邻接表邻接矩阵生成森林获取边弧的权值源代码连通图与连通分量 在无向图中, 若从顶点v到顶点w有路径存在, 则称v和w是连通的. 若图G中任意两个顶点都是连通的, 则…