Java并发编程——线程池(基础,使用,拒绝策略,命名,提交方式,状态)

news2025/1/17 19:48:16
我是一个计算机专业研0的学生卡蒙Camel🐫🐫🐫(刚保研)
记录每天学习过程(主要学习Java、python、人工智能),总结知识点(内容来自:自我总结+网上借鉴)
希望大家能一起发现问题和补充,也欢迎讨论👏👏👏

文章目录

    • 线程池🏊
      • 线程池的好处👍
      • 线程池的创建🏗️
      • 线程池(ThreadPoolExecutor)常见参数🔢
      • 处理任务流程🔃
      • 拒绝策略⭐
        • 使用数据库任务表来自定义拒绝策略
      • 线程池中两种提交方式
      • 线程池命名♂️♀️
      • 线程池状态

线程池🏊

线程池是多线程应用中的一种资源管理技术,它旨在减少创建和销毁线程所带来的开销,并且通过复用已存在的线程来执行任务,提高响应速度。线程池提供了一种限制和管理资源(包括执行一个任务的线程)的方法。

线程池的好处👍

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

线程池的创建🏗️

  1. 使用ThreadPoolExecutor构造函数来创建自定义线程池(建议这种方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险)
  2. 通过使用 java.util.concurrent.Executors 工厂类创建

常用的线程池:

Executors 返回线程池对象特点
FixedThreadPool创建一个固定大小的线程池。如果所有线程都处于活动状态,新任务将在队列中等待,直到有线程可用。
CachedThreadPool创建一个可根据需要创建新线程的线程池,但在前一次构造的线程可用时将重用它们。适用于执行大量短期异步任务的应用程序。
SingleThreadExecutor创建一个单线程化的 Executor,它会确保所有任务都在同一个线程中按顺序执行。
ScheduledThreadPool创建一个支持定时及周期性的任务执行的线程池,类似于 Timer
WorkStealingPool创建一个具有多个任务队列的工作窃取线程池,适用于处理大量可并行的任务。

线程池对象的构造方法:

// LinkedBlockingQueue有界队列
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}

// LinkedBlockingQueue 无界队列
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

// SynchronousQueue同步队列
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

// DelayedWorkQueue(延迟阻塞队列)
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}
public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService {
    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
    ...
}

线程池(ThreadPoolExecutor)常见参数🔢

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

参数解释

参数解释
corePoolSize线程池的核心线程数量:任务队列未达到队列容量时,最大可以同时运行的线程数量。
maximumPoolSize线程池的最大线程数:任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
keepAliveTime当线程数大于核心线程数时,多余的空闲线程存活的最长时间
unit时间单位,使用java.util.concurrent.TimeUnit枚举值来指定时间单位
workQueue任务队列,用来储存等待执行任务的队列
threadFactory线程工厂,用来创建线程,一般默认即可
handler拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务(常见的拒绝策略包括抛出异常、丢弃任务、执行者自身运行任务等。)

工作队列(workQueue):用于存放待执行的任务的阻塞队列。可以使用BlockingQueue接口的任何实现,常见的有:

  • ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue:一个基于链表结构的有界阻塞队列。如果构造时未指定容量,则默认容量为Integer.MAX_VALUE,即视为无界队列
  • SynchronousQueue:不存储元素的阻塞队列,每个插入操作必须等待另一个线程的对应移除操作。
  • PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
  • **LinkedBlockingDeque:**一个由链表结构组成的双端阻塞队列。

img

处理任务流程🔃

ThreadPoolExecutor中最关键的execute源码:

public void execute(Runnable command) {
    // 先检查传入的command是否为空,若空则报错
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get(); // 原子操作,返回线程池状态和线程计数的控制字段值
    // 1. 如果线程数少于corePoolSize,调用addWorker创建核心线程数
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2. 如果线程数大于corePoolSize,将任务添加进workQueue队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 2.1 如果检查isRunning的状态为false,则remove这个任务,然后执行拒绝策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 2.2 线程池处于running状态,但是没有线程,则创建线程
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3.如果放入workQueue失败,则创建非核心线程执行任务,如果创建失败(当前线程总数不小于maximumPoolSize),就会拒绝
    else if (!addWorker(command, false))
        reject(command);
}

img

拒绝策略⭐

在Java的ThreadPoolExecutor中,当线程池无法处理新提交的任务时(例如,线程池已关闭或者队列已满),可以配置不同的拒绝策略来决定如何处理这些任务。

拒绝策略详细
ThreadPoolExecutor.AbortPolicy**默认的拒绝策略。**抛出 RejectedExecutionException来拒绝新任务的处理。
ThreadPoolExecutor.CallerRunsPolicy如果使用这种策略,那么当任务被拒绝时,该任务将在调用者的线程中执行,即直接在调用execute方法的线程中运行这个任务。这可能会降低任务提交线程的速度,从而减缓任务提交的速度,给线程池一些时间来处理队列中的任务。
ThreadPoolExecutor.DiscardPolicy该策略会悄悄地丢弃无法处理的任务,既不会抛出异常也不会通知调用者。因此,使用此策略时需要注意可能丢失任务的情况。
ThreadPoolExecutor.DiscardOldestPolicy此策略会丢弃队列中最旧的一个未处理任务,并尝试重新提交当前任务。这意味着最老的任务将从队列中移除,然后尝试将当前任务添加到队列中进行处理。
自定义拒绝策略通过实现接口可以自定义任务拒绝策略。

在日常开发中,我们不希望任务被丢弃,我们就会使用CallerRunsPolicy拒绝策略。

// 被拒绝任务的处理程序,直接在execute方法的调用线程中运行被拒绝的任务,除非执行器已关闭,在这种情况下,任务将被丢弃。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /**
     * Creates a {@code CallerRunsPolicy}.
     */
    public CallerRunsPolicy() { }

    /**
     *在调用者的线程中执行任务r,除非执行器已关闭,在这种情况下,任务将被丢弃
     * @param r 请求执行的可运行任务
     * @param e 试图执行此任务的执行器
     */
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

又源代码可知:只要当前程序不关闭就会使用执行execute方法的线程执行该任务。

但是会有一定风险

如果走到CallerRunsPolicy的任务是个非常耗时的任务,且处理提交任务的线程是主线程,可能会导致主线程阻塞,影响程序的正常运行。也可能会造成死锁(如果调用者线程正在等待线程池中的某个任务完成,而这个任务又依赖于调用者线程能够继续运行,那么就会形成循环依赖,进而导致死锁。)

解决方案:

  1. 继续采用CallerRunsPolicy拒绝策略

  2. 在内存允许的情况下,我们可以增加阻塞队列BlockingQueue的大小并调整堆内存以容纳更多的任务,确保任务能够被准确执行。

    1. 1为了充分利用 CPU,我们还可以调整线程池的maximumPoolSize (最大线程数)参数,这样可以提高任务处理速度,避免累计在 BlockingQueue的任务过多导致内存用完。
  3. 持久化思路(自定义)

    1. 设计一个数据库表来存储到数据库中

    2. 使用Redis缓存

    3. 提交到消息队列

使用数据库任务表来自定义拒绝策略
  1. 实现RejectedExecutionHandler接口自定义拒绝策略,自定义拒绝策略负责将线程池暂时无法处理(此时阻塞队列已满)的任务入库(保存到 MySQL 中)。注意:线程池暂时无法处理的任务会先被放在阻塞队列中,阻塞队列满了才会触发拒绝策略。
public class DatabaseRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        // 保存任务到数据库的方法
        saveTaskToDatabase(r);
    }

    private void saveTaskToDatabase(Runnable task) {
        // 实现保存任务到数据库的逻辑
        // 可以使用JDBC或者其他ORM框架如MyBatis、Hibernate等
    }
}
  1. 继承BlockingQueue实现一个混合式阻塞队列,该队列包含 JDK 自带的ArrayBlockingQueue。另外,该混合式阻塞队列需要修改取任务处理的逻辑,也就是重写take()方法,取任务时优先从数据库中读取最早的任务,数据库中无任务时再从 ArrayBlockingQueue中去取任务。
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class HybridBlockingQueue<E> implements BlockingQueue<E> {

    private final ArrayBlockingQueue<E> queue;

    public HybridBlockingQueue(int capacity) {
        this.queue = new ArrayBlockingQueue<>(capacity);
    }

    // 其他BlockingQueue接口方法的实现...

    @Override
    public E take() throws InterruptedException {
        // 尝试从数据库中取任务
        E taskFromDb = takeFromDatabase();
        if (taskFromDb != null) {
            return taskFromDb;
        } else {
            // 如果数据库中没有任务,则从queue中取
            return queue.take();
        }
    }

    private E takeFromDatabase() {
        // 实现从数据库中取任务的逻辑
        // 注意这里需要根据你的E类型来确定如何反序列化或转换成任务对象
        return null; // 返回null表示数据库中没有任务
    }

    @Override
    public boolean offer(E e) {
        return queue.offer(e);
    }

    @Override
    public int remainingCapacity() {
        return queue.remainingCapacity();
    }

    // ...其他必须实现的方法,例如put(), poll(), etc.
}

线程池中两种提交方式

  1. execute`方法

void execute(Runnable command): Executor接口中的方

  1. submit方法

<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);

两个方法区别:

  1. 异常处理

  2. 使用execute()时,未捕获异常导致线程终止,线程池创建新线程替代

  3. 使用submit()时,异常被封装在Future中,线程继续复用。(更加灵活的错误处理机制,允许调用者决定如何处理异常)

  4. 接受参数

  5. 返回值

线程池命名♂️♀️

给线程池命名可以帮助你在调试和监控多线程应用程序时更容易识别不同的线程池。默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。

  1. 使用ThreadFactory来创建一个带有自定义名称的线程池:
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(String name) {
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }
}

线程池状态

线程池有5种状态:

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
线程池状态详细介绍
RUNNING表示线程池处于正常运行状态,可以接受新的任务并处理已提交的任务。
SHUTDOWN表示线程池不再接受新任务,但会继续执行已经提交的任务直到所有任务完成。通过调用 shutdown() 方法进入此状态。
STOP表示线程池不再接受新任务,并尝试停止正在执行的所有任务。通过调用 shutdownNow() 方法进入此状态。
TIDYING表示所有任务都已完成,工作线程数量为零,即将进入 TERMINATED 状态。这是一个短暂的过渡状态,在这个状态下,terminated() 钩子方法会被调用。
TERMINATED表示线程池完全终止,所有的任务都已完成并且所有的工作线程都已被销毁。

shutdownNow为STOP,并试图停止所有正在执行的线程,不再处理还在池队列中等待的任务,当然,它会返回那些未执行的任务。它试图终止线程的方法是通过调用 Thread.interrupt() 方法来实现的,但是这种方法的作用有限,如果线程中没有sleep、wait、Condition、定时锁等应用,interrupt()方法是无法中断当前的线程的。所以,shutdownNow()并不代表线程池就一定立即就能退出,它可能必须要等待所有正在执行的任务都执行完成了才能退出。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2278132.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【HarmonyOS NEXT】鸿蒙跳转华为应用市场目标APP下载页

【HarmonyOS NEXT】鸿蒙跳转华为应用市场目标APP下载页 一、问题背景&#xff1a; 如今&#xff0c;大家都离不开各种手机应用。随着鸿蒙系统用户越来越多&#xff0c;大家都希望能在鸿蒙设备上快速找到想用的 APP。华为应用市场里有海量的 APP&#xff0c;但之前从鸿蒙设备进…

智能物流升级利器——SAIL-RK3576核心板AI边缘计算网关设计方案(一)

近年来&#xff0c;随着物流行业智能化和自动化水平不断提升&#xff0c;数据的实时处理与智能决策成为推动物流运输、仓储管理和配送优化的重要手段。传统的集中式云平台虽然具备强大计算能力&#xff0c;但高延迟和带宽限制往往制约了物流现场的即时响应。为此&#xff0c;我…

nacos环境搭建以及SpringCloudAlibaba脚手架启动环境映射开发程序

1&#xff1a;下载nacos 地址&#xff1a;https://github.com/alibaba/nacos/tags 2:选择server的zip包下载 3:启动mysql服务&#xff0c;新建数据库&#xff1a;nacos_yh 4&#xff1a;解压下载的nacos_server 进入conf目录 5&#xff1a;mysql运行sql脚本变得到下面的表 6&a…

Java中线程的学习

目录​​​​​​​ 程序,进程,线程 创建线程 继承Thread类 实现Runnable接口 Thread类中方法 线程优先级 线程状态 多线程的概念 线程同步 在Java代码中实现同步 以下代码使用继承Thread方式实现 以下代码使用实现Runnable方式实现 Lock&#xff08;锁&#xf…

HTTP/HTTPS ⑤-CA证书 || 中间人攻击 || SSL/TLS

这里是Themberfue ✨上节课我们聊到了对称加密和非对称加密&#xff0c;实际上&#xff0c;单纯地非对称加密并不能保证数据不被窃取&#xff0c;我们还需要一个更加重要的东西——证书 中间人攻击 通过非对称加密生成私钥priKey和公钥pubKey用来加密对称加密生成的密钥&…

leetcode:205. 同构字符串(python3解法)

难度&#xff1a;简单 给定两个字符串 s 和 t &#xff0c;判断它们是否是同构的。 如果 s 中的字符可以按某种映射关系替换得到 t &#xff0c;那么这两个字符串是同构的。 每个出现的字符都应当映射到另一个字符&#xff0c;同时不改变字符的顺序。不同字符不能映射到同一个字…

从epoll事件的视角探讨TCP:三次握手、四次挥手、应用层与传输层之间的联系

目录 一、应用层与TCP之间的联系 二、 当通信双方中的一方如客户端主动断开连接时&#xff0c;仅是在客户端的视角下连接已经断开&#xff0c;在服务端的眼中&#xff0c;连接依然存在&#xff0c;为什么&#xff1f;——触发EPOLLRDHUP事件&#xff1a;对端关闭连接或停止写…

EMS专题 | 守护数据安全:数据中心和服务器机房环境温湿度监测

您需要服务器机房温度监测解决方案吗&#xff1f; 服务器机房是企业中用于存储、管理和维护服务器及其相关组件的设施。服务器机房通常位于数据中心内&#xff0c;是一个专门设计的物理环境&#xff0c;旨在确保服务器的稳定运行和数据的安全性。服务器机房主要起到存储和管理数…

运输层安全协议SSL

安全套接字层 SSL (Secure Socket Layer) SSL 作用在端系统应用层的 HTTP 和运输层之间&#xff0c;在 TCP 之上建立起一个安全通道&#xff0c;为通过 TCP 传输的应用层数据提供安全保障。 应用层使用 SSL 最多的就是 HTTP&#xff0c;但 SSL 并非仅用于 HTTP&#xff0c;而是…

网络安全面试题汇总(个人经验)

1.谈一下SQL主从备份原理&#xff1f; 答&#xff1a;主将数据变更写入自己的二进制log,从主动去主那里去拉二进制log并写入自己的二进制log,从而自己数据库依据二进制log内容做相应变更。主写从读 2.linux系统中的计划任务crontab配置文件中的五个星星分别代表什么&#xff…

51单片机 DS18B20温度储传感器

DS18B20温度传感器 64-BITROM&#xff1a;作为器件地址&#xff0c;用于总线通信的寻址&#xff0c;是唯一的&#xff0c;不可更改 SCRATCHPAD&#xff08;暂存器&#xff09;&#xff1a;用于总线的数据交互 EEPROM&#xff1a;用于保存温度触发阈值和配置参数 暂存器 单总线…

如何保证光谱相机的稳定性和可靠性

光学系统设计与制造 高质量光学元件&#xff1a;采用高精度研磨和镀膜的透镜、棱镜、光栅等光学元件。优质的透镜可以减少像差和色差&#xff0c;确保光线准确聚焦&#xff1b;高质量的镀膜能够提高光学元件的透光率&#xff0c;降低反射损失&#xff0c;并且增强对不同波段光…

【芯片封测学习专栏 -- 2D | 2.5D | 3D 封装的区别和联系】

请阅读【嵌入式开发学习必备专栏 Cache | MMU | AMBA BUS | CoreSight | Trace32 | CoreLink | ARM GCC | CSH】 文章目录 Overview线键合&#xff08;wire-bonding&#xff09;封装FOWLP2D封装2.5D 封装硅通孔(TSV)硅中介层无TSV的2.5D 3D封装 Overview 我们先要了解一下&…

Apache搭建https服务器

Apache搭建https服务器 REF: 使用OpenSSL自建一个HTTPS服务

element-ui textarea备注 textarea 多行输入框

发现用这个组件&#xff0c;为了给用户更好的体验&#xff0c;要加下属性 1. 通过设置 autosize 属性可以使得文本域的高度能够根据文本内容自动进行调整&#xff0c;并且 autosize 还可以设定为一个对象&#xff0c;指定最小行数和最大行数。:autosize"{ minRows: 3, ma…

图论1-问题 C: 算法7-6:图的遍历——广度优先搜索

题目描述 广度优先搜索遍历类似于树的按层次遍历的过程。其过程为&#xff1a;假设从图中的某顶点v出发&#xff0c;在访问了v之后依次访问v的各个未曾被访问过的邻接点&#xff0c;然后分别从这些邻接点出发依次访问它们的邻接点&#xff0c;并使“先被访问的顶点的邻接点”先…

基于vue框架的的校园心理咨询室系统63w37(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。

系统程序文件列表 项目功能&#xff1a;用户,咨询师,文章类型,心理文章,在线咨询,典型案例,线下预约,取消预约 开题报告内容 基于Vue框架的校园心理咨询室系统开题报告 一、研究背景与意义 随着社会的快速发展和竞争的加剧&#xff0c;校园中的学生面临着越来越多的心理压力…

Spring 6 第1章——概述

一.Spring是什么 Spring是一款主流的Java EE轻量级&#xff08;体积小、不需要依赖其它组件&#xff09;开源框架Spring的目的是用于简化Java企业级应用的开发难度和开发周期Spring的用途不仅限于服务端的开发&#xff0c;从简单性、可测试性和松耦合的角度而言&#xff0c;任…

基于vite+vue3+mapbox-gl从零搭建一个项目

下面是基于 Vite、Vue 3 和 Mapbox GL 从零搭建一个项目的完整步骤&#xff0c;包括环境搭建、依赖安装、配置和代码示例。 1. 初始化项目 首先&#xff0c;使用 Vite 快速创建一个 Vue 3 项目&#xff1a; npm create vuelatest vue3-mapboxgl --template vue cd vue3-mapbo…

Kibana:ES|QL 编辑器简介

作者&#xff1a;来自 Elastic drewdaemon ES|QL 很重要 &#x1f4aa; 正如你可能已经听说的那样&#xff0c;ES|QL 是 Elastic 的新查询语言。我们对 ES|QL 寄予厚望。它已经很出色了&#xff0c;但随着时间的推移&#xff0c;它将成为与 Elasticsearch 中的数据交互的最强大…