Java并发之线程池

news2024/10/6 4:05:59

文章目录

  • 前言
  • 一、Java中线程池概览
    • 1.1 类图
    • 1.2 内部流程图
  • 二、源码探索
    • 2.1 构造参数
    • 2.2 线程池状态
    • 2.3 Worker 的添加和运行
    • 2.4 阻塞队列
    • 2.5 任务拒绝策略
  • 三、实际使用
    • 3.1 动态线程池
    • 3.2 拓展使用
    • 3.3 springboot 中线程池
  • 参考

前言

在高并发的 Java 程序设计中,编写多线程代码可以最大限度发挥现代多核处理器的计算能力,提升系统的吞吐和性能。线程是多线程代码的基础工具,但不能无限制增加线程的数量,线程的创建和销毁、所占内存都要消耗系统资源,如果处理不当,可能会导致 OOM,并且大量线程的回收也会给 GC 带来压力,延长停顿时间。

在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。

为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。和数据库连接池类似,使用线程池有以下优点:

  1. 降低资源消耗:减少新建和销毁线程所调用的资源
  2. 提高响应速度:任务到达时,不需要等待新建线程后执行任务
  3. 提高线程的可管理性:线程是有限的资源,如果创建太多可能会导致系统故障,使用线程池可以做到统一的分配,调用和监控

一、Java中线程池概览

在 Java 中讲线程池一般是指 JDK 中提供的 ThreadPoolExecutor 类,这是由 Doug Lea 操刀实现的线程池类。

JDK 中并发包 java.util.concurrent(简称 JUC )是由这位大佬开发的,包含很多Java开发者常用的并发类如 ConcurrentHashMap、ReentrantLock、AtomicInteger、CountDownLatch 等等

下面会从类图,线程池运行流程图来简单概览

1.1 类图

thread_pool_uml

1.2 内部流程图

开始
提交任务
线程池状态是否RUNNING
线程数小于核心数
任务拒绝
添加工作线程并执行
阻塞队列已满?
线程数小于最大线程数
添加到阻塞队列,等待工作线程获取执行
结束

二、源码探索

下面会从源码分析 ThreadPoolExecutor

2.1 构造参数

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
        null :
    AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
构造函数参数名是否必填范围或类型含义
corePoolSize0 到 Integer.MAX_VALUE核心线程数
maximumPoolSize1到 Integer.MAX_VALUE,并且要 > corePoolSize最大线程数
keepAliveTime0 到 Long.MAX_VALUE当线程数大于核心线程时,空闲线程存活时间
unitTimeUnitkeepAliveTime 时间单位
workQueueBlockingQueue<Runnable>任务队列
threadFactoryThreadFactory线程工厂,默认提供工厂,名字以"pool-" + poolNumber.getAndIncrement() + “-thread-” 为前缀
handlerRejectedExecutionHandler拒绝策略,默认 AbortPolicy,直接抛异常

2.2 线程池状态

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; // 29
private static final int CAPACITY   = (1 << COUNT_BITS) - 1; //0x1fffffff

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;  //0xe0000000
private static final int SHUTDOWN   =  0 << COUNT_BITS;  //0x00000000
private static final int STOP       =  1 << COUNT_BITS;  //0x20000000
private static final int TIDYING    =  2 << COUNT_BITS;  //0x40000000
private static final int TERMINATED =  3 << COUNT_BITS;  //0x60000000

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池的状态,主要保存在 ctl 这个 AtomicInteger 变量中,由两部分构成,runState 和 workerCount。前3位存状态,后面29位存worker数量

  • workerCount 线程worker 数量, 这是允许启动而不允许停止的worker的数量。该值可能暂时不同于实际的活动线程数。
  • runState 提供线程池主要的生命周期控制
    • RUNNING: 接受新任务并处理队列任务
    • SHUTDOWN:不接受新任务,但处理队列任务
    • STOP:不接受任务,不处理队列任务,并中断正在进行的任务
    • TIDYING:所有任务都已终止,workerCount为零,转换到状态TIDYING的线程将运行terminated()钩子方法
    • TERMINATED:terminated() 执行完成

线程池状态转化:

shutdown
shutdownNow
阻塞队列为空,workerCount为0
workCount为0
terminated
开始
RUNNING
SHUTDOWN
STOP
TIDYING
TERMINATED
结束

线程池的 toString 方法会返回线程池的状态,不过只返回三种,会将上述五种状态中间的三种状态归于一种

  • Running RUNNING
  • Shutting down SHUTDOWNSTOPTIDYING
  • Terminated TERMINATED
private static boolean runStateLessThan(int c, int s) {
    return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
    return c >= s;
}
String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
                     (runStateAtLeast(c, TERMINATED) ? "Terminated" :
                      "Shutting down"));

2.3 Worker 的添加和运行

线程池为了掌握线程的状态并维护线程的生命周期,设计了线程池内的工作线程 Worker。

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
 {

     /** Thread this worker is running in.  Null if factory fails. */
     final Thread thread;
     /** Initial task to run.  Possibly null. */
     Runnable firstTask;
     /** Per-thread task counter */
     volatile long completedTasks;

     /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
     Worker(Runnable firstTask) {
         setState(-1); // inhibit interrupts until runWorker
         this.firstTask = firstTask;
         this.thread = getThreadFactory().newThread(this);
     }
     ...
 }
  • 增加Worker

    worker 的增加由方法boolean addWorker(Runnable firstTask, boolean core) 实现,内部流程如下:

在这里插入图片描述在在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的执行过程如下:

  1. while循环不断地通过getTask()方法获取任务。
  2. getTask()方法从阻塞队列中取任务。
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
  4. 执行任务。
  5. 如果getTask结果为null则跳出循环,执行processWorkerExit()方法,销毁线程。在这里插入图片描述

2.4 阻塞队列

用于保存任务并移交给工作线程的队列.

常用队列如下表所示:

队列名元素出入顺序描述
ArrayBlockingQueue队列元素FIFO数组实现的有界阻塞队列,支持公平锁和非公平锁
LinkedBlockingQueue队列元素FIFO链表实现的有界阻塞队列.
链表队列通常比数组队列具有更高的吞吐量,但在大多数并发应用程序中,其性能不可预测。
容量(可选)可通过构造函数参数配置,用于防止过度的队列扩展。如果未指定,则容量等于Intger.MAX_VALUE。
PriorityBlockingQueue根据优先级,但是不能保证同优先级的顺序元素具有优先级的无界阻塞队列,无界在逻辑上是无限,但在OOM的情况下会添加失败。
队列内元素必须是可比较的,且不能为null。
DelayQueue根据到期时间排列,队列的头部是延迟过期时间最长的延迟元素具有延迟元素的无界阻塞队列,延迟元素指元素需要在队列中等待一段时间才能被取用。
SynchronousQueue队列的元素插入完成必须等待另一个线程取出,反之亦然一种不存储元素的阻塞队列,支持公平锁和非公平锁
LinkedTransferQueue无界阻塞队列,队列的头部是某个生产者在队列中停留时间最长的元素,尾部是某个生产者在队列中停留时间最短的元素
可以看做是LinkedBolckingQueueSynchronousQueue 的合体
LinkedBlockingDeque对头和对尾都可出入链表实现的双端有界阻塞队列,高并发时可将锁的竞争最多下降一半

2.5 任务拒绝策略

线程池的任务拒绝策略都实现了 RejectedExecutionHandler 接口,ThreadPoolExecutor 内部提供了四种拒绝策略,也可以自定义实现

策略类名含义备注
AbortPolicy直接抛异常,这是默认策略比较关键的业务推荐使用此策略,能够及时发现异常
CallerRunsPolicy由提交任务的线程执行,会调用任务的run方法,而不是start如果线程池关闭了,任务会被丢弃。这种策略一般是需要任务都要执行,但是当任务数量过多就会把提交任务的线程阻塞住。需要权衡考虑
DiscardPolicy直接丢弃不重要的任务可以使用此策略
DiscardOldestPolicy丢弃队列头部的任务,重新提交队列头部的任务是最老的任务,需要根据业务衡量是否采用此种策略

三、实际使用

3.1 动态线程池

线程池使用面临的核心的问题在于:线程池的参数并不好配置。如果要修改运行中应用线程池参数,需要停止线上应用,调整成功后再发布,而这个过程异常的繁琐,如果能在运行中动态调整线程池的参数多好。

美团技术团队基于这些痛点,推出了 动态线程池 的概念,催生了一批动态线程池框架,hippo4j 也是其一。

动态化线程池的核心设计包括以下三个方面:

  1. 简化线程池配置:线程池构造参数有8个,但是最核心的是3个:corePoolSizemaximumPoolSizeworkQueue,它们最大程度地决定了线程池的任务分配和线程分配策略。考虑到在实际应用中我们获取并发性的场景主要是两种:

    • 并行执行子任务,提高响应速度。这种情况下,应该使用同步队列,没有什么任务应该被缓存下来,而是应该立即执行。
    • 并行执行大批次任务,提升吞吐量。这种情况下,应该使用有界队列,使用队列去缓冲大批量的任务,队列容量必须声明,防止任务无限制堆积。

    所以线程池只需要提供这三个关键参数的配置,并且提供两种队列的选择,就可以满足绝大多数的业务需求,Less is More。

  2. 参数可动态修改:为了解决参数不好配,修改参数成本高等问题。在Java线程池留有高扩展性的基础上,封装线程池,允许线程池监听同步外部的消息,根据消息进行修改配置。将线程池的配置放置在平台侧,允许开发同学简单的查看、修改线程池配置。

  3. 增加线程池监控:对某事物缺乏状态的观测,就对其改进无从下手。在线程池执行任务的生命周期添加监控能力,帮助开发同学了解线程池状态。

hippo4j github 地址:https://github.com/opengoofy/hippo4j

3.2 拓展使用

线程池提供了可被子类拓展的方法 beforeExecute 和 afterExecute ,能够在线程执行前后回调。

3.3 springboot 中线程池

当开启@EnableAsync,使用@Async标记方法时,会通过默认线程池执行。默认线程池

  • corePoolSize :8
  • maximumPoolSize :Integet.MAX_VALUE,
  • workQueue:LinkedBlockingQueue,容量是:Integet.MAX_VALUE,
  • keepAliveTime :60
  • unit:second
  • handler:AbortPolicy
    可以看到默认线程池会无限创建线程,实际使用中会手动配置线程池
@EnableAsync
@Configuration
public class ThreadPoolConfig {

    @Bean(name = AsyncExecutionAspectSupport.DEFAULT_TASK_EXECUTOR_BEAN_NAME)
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(10);
        //最大线程数
        executor.setMaxPoolSize(30);
        //队列容量
        executor.setQueueCapacity(100);
        //活跃时间
        executor.setKeepAliveSeconds(60);
        //线程名字前缀
        executor.setThreadNamePrefix("taskExecutor-");
        // 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }
}

参考

  1. Java线程池实现原理及其在美团业务中的实践

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/30632.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数字化安全生产平台 DPS 重磅发布

11 月 5 日&#xff0c;在 2022 杭州 云栖大会上&#xff0c;数字化安全生产平台 DPS 重磅发布&#xff0c;助力传统运维向 SRE 转型。 阿里巴巴资深技术专家 周洋 十四五规划下&#xff0c;各行各业全面加速数字化转型与升级。随着企业数字化业务规模变大&#xff0c;迭代速…

Dubbo服务远程调用的简介及使用教程

一、Dubbo的简介 Dubbo是阿里巴巴公司开源的一个高性能、轻量级的 Java RPC 框架。 致力于提供高性能和透明化的 RPC 远程服务调用方案&#xff0c;以及 SOA 服务治理方案。 官网&#xff1a;https://dubbo.apache.org/ SOA架构&#xff1a;&#xff08;Service-Oriented Arch…

华为云RDS数据库测评:性能超出预期,双11优惠还在继续

一、前言 作为一名电商行业公司的员工&#xff0c;深刻体会到系统大压力、高并发下保证服务的正常使用是多么严峻的挑战。双11这段时间&#xff0c;因为激增的使用量让我们的数据库服务严重吃紧&#xff0c;压力特别的大&#xff0c;甚至还出现了交易漏单&#xff0c;脏数据等…

【Servlet】3:Servlet 的基本原理、Servlet对象的生命周期

目录 第五章 | 动态资源与Servlet | 章节概述 | Tomcat与Servlet的 原理、关系 Tomcat的基本构成​编辑 Server处理HTTP请求 Connector内部架构分析 Container内部架构分析 Tomcat的执行流程小结 | Servlet 概述、接口实现 Servlet的基本概述 实现Servlet接口并通过U…

LeetCode HOT 100 —— 10.正则表达式匹配

题目 给你一个字符串 s 和一个字符规律 p&#xff0c;请你来实现一个支持 ‘.’ 和 ‘*’ 的正则表达式匹配。 ‘.’ 匹配任意单个字符 ‘*’ 匹配零个或多个前面的那一个元素 所谓匹配&#xff0c;是要涵盖 整个 字符串 s的&#xff0c;而不是部分字符串。 思路 对于字符串…

11月24日国产蓝牙AOA高精度定位vs国外知名厂家的蓝牙aoa定位效果的展示

11月24日国产蓝牙AOA高精度定位vs国外知名厂家的蓝牙aoa定位效果的展示 11月24日国产蓝牙AOA高精度定位vs国外知名厂家的蓝牙aoa定位效果的展示

操作系统的基本概念

文章目录一、操作系统的概念1.什么是操作系统&#xff1f;2 计算机系统的构成3 系统软件的概念4 操作系统的主要作用二、操作系统目标和功能1. 目标1.1 有效性1.2 方便性1.3 可扩充性1.4 开放性2. 功能2.1 作为系统资源的管理者2.2 作为用户与计算机[硬件系统]之间的接口2.3 实…

Linus 文件处理(一)

目录 一、前言 二、低级文件访问 1、write 2、read 3、open 4、Initial Permissions &#xff08;1&#xff09;umask &#xff08;2&#xff09;close &#xff08;3&#xff09;ioctl &#xff08;4&#xff09;第一个 copy_system.c 程序 &#xff08;5&#xff…

Apache ShardingSphere(一) 基本概念介绍

文章目录一 基本介绍1.1 概述1.2 ShardingSphere JDBC1.3 ShardingSphere Proxy1.4 ShardingSphere Sidecar1.5 数据库的扩展1.5.1 向上扩展1.5.2 横向扩展1.5.2.1 读写分离1.5.2.2 垂直切分1.5.2.3 水平切分1.6 分库与分表1.6.1 水平分库1.6.2 水平分表1.6.3 垂直分库1.6.4 垂…

[iOS]使用MonkeyDev完成Hook

一、确定目标 先定个小目标&#xff0c;使用七猫举个例&#xff0c;去移除小说阅读页底部广告和章节之间的广告。 二、HOOK 1. 创建MonkeyApp项目导入砸壳包 2. 使用Reveal工具确定“底部广告”和“章末广告”的视图名称 底部广告 View Controller: Class: QMReader.YYReade…

Strassen矩阵乘法问题(Java)

Strassen矩阵乘法问题&#xff08;Java&#xff09; 文章目录Strassen矩阵乘法问题&#xff08;Java&#xff09;1、前置介绍3、代码实现4、复杂度分析5、参考资料1、前置介绍 矩阵乘法是线性代数中最常见的问题之一 &#xff0c;它在数值计算中有广泛的应用。 设A和B是2个nXn…

搭建灾情快速分析系统 | Bigemap助力防灾减灾重点工作

Bigemap国产基础软件凭借自身强大的新GIS引擎技术与完善的产品链&#xff0c;为相关部门提供了集"灾情采集-灾情监测-灾害快速评估-应急指挥"于一体的灾害防灾减灾解决方案&#xff0c;搭建了灾情快速分析系统&#xff0c;该系统成为相关部门应对灾情的重要支撑平台。…

虚拟号码认证如何开通?

近年来&#xff0c;经常会接到外卖、房产中介、信用贷款等电话&#xff0c;让顾客不胜其扰。现在电话标记功能使用越来越普遍&#xff0c;可以大概了解电话“来意”&#xff0c;同时也会让误标记、恶意标记很方便。对于开展业务或办公司或企业的人&#xff0c;更加不能让自己的…

排序算法之选择排序

今天来给大家介绍一下排序算法之选择排序 选择排序&#xff1a;&#xff08;Selection sort&#xff09;是一种简单直观的排序算法&#xff0c;也是一种不稳定的排序方法。 选择排序的原理&#xff1a; 一组无序待排数组&#xff0c;做升序排序&#xff0c;我们先假定第一个…

【生成模型】Diffusion Models:概率扩散模型

---前言一、Diffusion Model 基本介绍二、生成模型对比三、直观理解Diffusion model四、形式化解析Diffusion model五、详解 Diffusion Model&#xff08;数学推导&#xff09;1.前向过程(扩散过程)2.逆扩散过程3.逆扩散条件概率推导4.训练损失六、训练、测试伪代码1. 训练2.测…

鲲鹏devkit编译调试工具——《sudoku》作业解析

《sudoku》作业解析 本次实验以sudoku项目为例介绍鲲鹏编译调试插件的基本使用方法 本次实验的步骤主要为 获取源码安装鲲鹏编译调试插件服务器配置进行代码同步配置配置测试任务进行编译调试 接下来我们先获取本次实验所需要的源码 获取源码 sudoku项目已经上传到github使…

stata外部命令大全(包含面板门槛、系统GMM、空间计量、Pvar、中介效应等)

1、数据来源&#xff1a;自主整理 2、时间跨度&#xff1a;无 3、区域范围&#xff1a;无 4、指标说明&#xff1a; 该些外部命令包含面板门槛、系统GMM、空间计量、pvar、中介效应等涵盖全部 以下是部分命令截图&#xff1a; 空间计量&#xff1a; 系统GMM&#xff08;动…

Allure使用手册

一. 简介 Allure是一款支持多语言的测试结果可视化软件&#xff0c;支持Java、Python&#xff0c;搭配Junit、pytest等测试框架食用更香。本文主要讲解搭配Junit4。 二. 下载、安装部署 2.1 下载 百度搜索Allure2&#xff01;&#xff01;&#xff01; 敲重点&#xff1a;…

基于Qlearning强化学习的倒立摆控制系统matlab仿真

目录 1.算法描述 2.仿真效果预览 3.MATLAB部分代码预览 4.完整MATLAB程序 1.算法描述 强化学习通常包括两个实体agent和environment。两个实体的交互如下&#xff0c;在environment的statestst下&#xff0c;agent采取actionatat进而得到rewardrtrt 并进入statest1st1。Q-l…

【头歌实验】五、Python循环结构

文章目录>>>第1关&#xff1a;达依尔的麦子数任务描述案例分析相关知识for循环测试说明参考答案>>>第2关&#xff1a;四级单词查询任务描述案例分析相关知识如何处理文件文件打开文件循环文件关闭遍历文件测试说明第3关&#xff1a;出租车车费计算任务描述案…