并发学习25--多线程 ThreadPoolExecutor

news2025/1/11 18:50:45

类图:定义了一些重要的接口和实现类

线程池的几种状态:

ThreadPoolExecutor构造方法

1.救急线程

线程池中会有核心线程和救急线程;救急线程数=最大线程数-核心线程数。而救急线程会在阻塞队列已经占满的情况下,执行下一个即将要被拒绝策略执行任务。且救急线程在执行完任务后的KeepAliveTime时间内,如果没有执行新的任务,那么就会从线程池remove这个线程。不同于核心线程是一直存在于线程池中的。

救急线程是懒惰创建的,只有当有界阻塞队列满的时候才会创建救急线程执行任务。如果阻塞队列是无界的,那么永远不会创建救急线程。

2.核心线程

会优先使用核心线程来执行任务,且核心线程是懒惰创建。当核心线程执行完任务后,并不会主动结束自己,而是还在运行中。需要特殊处理使之结束。

3.拒绝策略

当救急线程被占用完后再来新的任务会由拒绝策略完成。

4.JDK提供线程池执行过程

4.JDK提供的拒绝策略

NewFixedThreadPool 固定大小线程池

1.介绍

2.代码示例
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j(topic = "TC41")
public class TC41 {
    public static void main(String[] args) throws InterruptedException{
        ExecutorService pool = Executors.newFixedThreadPool(2, new ThreadFactory() {
            private AtomicInteger threadPoolNumber = new AtomicInteger(1);
            
            //使用ThreadFactory自定义线程名
            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r,"Pool_t"+threadPoolNumber.getAndIncrement());
            }
        });

        pool.execute(()->{
            log.debug("1");
        });
        pool.execute(()->{
            log.debug("2");
        });
        pool.execute(()->{
            log.debug("3");
        });
        //result: 且程序不会结束,因为核心线程只是执行完任务,但并没有结束
        //11:10:02.144 [Pool_t2] DEBUG TC41 - 2
        //11:10:02.144 [Pool_t1] DEBUG TC41 - 1
        //11:10:02.147 [Pool_t2] DEBUG TC41 - 3
    }
}

NewCachedThreadPool 带缓冲的线程池

1.介绍

若每个任务执行时间都很长的话会创建太多线程,消耗CPU影响性能。

2.SynchronousQueue

SynchronousQueue指t1线程在Queue里放任务时放不进去,只能阻塞在外面,当t2线程来取的时候,就可以把任务取走了。意味着:该队列就是为了阻塞一个任务且不用拒绝策略,只等待线程来执行。当SynchronousQueue配合NewCachedThreadPool执行时,就是每当阻塞一个任务就创建一个救急线程。

SynchronousQueue使用代码示例
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.SynchronousQueue;

@Slf4j(topic = "TC42")
public class TC42 {
    public static void main(String[] args) {
        SynchronousQueue queue = new SynchronousQueue();
        new Thread(()->{
            try {
                log.debug("put 1....");
                //当执行put()后被阻塞住,当take()执行完后,put()才执行完毕
                queue.put(1);
                log.debug("putted 1....");

                log.debug("put 2....");
                queue.put(2);
                log.debug("putted 2....");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t1").start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            log.debug("get 1...");
            try {
                queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t2").start();

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        new Thread(()->{
            log.debug("get 2...");
            try {
                queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        },"t3").start();
    }
}

 NewSingleThreadExecutor 单线程执行器

线程异常后,线程池会重新创建新的线程
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Slf4j(topic = "TC43")
public class TC43 {
    public static void main(String[] args) throws InterruptedException {
        test1();
        
        //result: 线程1抛出异常停止后,线程池又创建了一个线程2去执行后面的codes 
        //14:15:44.489 [pool-1-thread-1] DEBUG TC43 - 1
        //Exception in thread "pool-1-thread-1" 14:15:44.493 [pool-1-thread-2] DEBUG TC43 - 2
        //14:15:44.493 [pool-1-thread-2] DEBUG TC43 - 3
    }

    public static void test1() throws InterruptedException{
        ExecutorService service = Executors.newSingleThreadExecutor();
        service.execute(()->{
            log.debug("1");
            int i=1/0;
        });

        service.execute(()->{
            log.debug("2");
        });

        service.execute(()->{
            log.debug("3");
        });
    }
}

提交任务

 Future<T> Submit

有一个Future类型的返回值

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.*;

@Slf4j(topic = "TC44")
public class TC44 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        Future<String> future = pool.submit(()->{
            log.debug("running");
            Thread.sleep(1000);
            return "ok";
        });

        log.debug("{}",future.get());
    }
}
List<Future<T>> invokeAll()

执行集合中所有线程,并返回所有返回值到一个集合里。

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

@Slf4j(topic = "TC45")
public class TC45 {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(2);

        List<Future<Object>> futures = pool.invokeAll(Arrays.asList(
                ()->{
                    log.debug("begin...");
                    Thread.sleep(1000);
                    return "1";
                },
                ()->{
                    log.debug("begin...");
                    Thread.sleep(500);
                    return "2";
                },
                ()->{
                    log.debug("begin...");
                    Thread.sleep(2000);
                    return "3";
                }
        ));

        futures.forEach(f->{
            try {
                log.debug("{}",f.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }
}
Object invokeAny()

集合中哪个线程结束最早返回哪个线程,其余线程停止执行任务。

0.5S后打印结果,因为第二个callable线程结束最早只等待了0.5S.

import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Slf4j(topic = "TC46")
public class TC46 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService pool = Executors.newFixedThreadPool(3);
        Object o = pool.invokeAny(Arrays.asList(
                ()->{
                  log.debug("begin 1....");
                  Thread.sleep(1000);
                    log.debug("end 1....");
                    return "1";
                },
                ()->{
                    log.debug("begin 2....");
                    Thread.sleep(500);
                    log.debug("end 2....");
                    return "2";
                },
                ()->{
                    log.debug("begin 3....");
                    Thread.sleep(800);
                    log.debug("end 3....");
                    return "3";
                }
        ));

        log.debug("{}",o.toString());
    }
}

关闭线程池

shutdown()

 不会等待正在运行的线程,等它们运行结束,就会自己停止

shutdownNow()

其他终结方法

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1548534.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RK3568-开启ptp服务

硬件支持 mac或者phy需要支持ptp驱动支持 CONFIG_PTP_1588_CLOCK=y虚拟机端:虚拟机只支持软件时间戳。 安装ptp服务:sudo apt-get install linuxptp 修改ptp服务:sudo vi /lib/systemd/system/ptp4l.service,修改为ens33网口,-S表示使用软件时间戳。forlinx@ubuntu:~$ s…

Redis修改开源协议,6大备胎重见天日

背景&#xff1a;Redis2018年以来修改了多次开源协议&#xff0c;以前是把一些高级功能收费&#xff0c;这次彻底怒了&#xff0c;把核心代码的协议修改为RSALv2和SSPL双重协议&#xff0c;这个修改对普通用户不受影响&#xff0c;是向所有云厂商开炮&#xff0c;以后云厂商将不…

迁移android studio 模拟器位置

android studio 初始位置是安装在c盘&#xff0c;若是要迁移需 1创建一个目标位置如我的F:/avd 2在系统环境变量里面设置新的地址 变量名&#xff1a;ANDROID_SDK_HOME 变量值&#xff1a;F:/avd 3最重要的是文件复制&#xff0c;将C盘里面avd的上层目录.android的目录整体…

C++ static静态变量详解

目录 觉得有用就给博主点个赞吧&#xff01;谢谢啦&#xff01;嘻嘻嘻 概念&#xff1a; 特性 如何计算一个类创建了多少个对象&#xff1f; 概念&#xff1a; 用static关键字修饰的成员变量叫做静态成员变量 在类内部&#xff0c;但是属于全局&#xff0c;不属于任意一个…

前端调用接口地址跨越问题,nginx配置处理

在nginx配置里面添加add_header如下&#xff1a; add_header Access-Control-Allow-Origin *; #add_header Access-Control-Allow-Origin http://localhost:8080 always; add_header Access-Control-Allow-Methods GET, POST, PUT, D…

JavaScript Uncaught ReferenceError: WScript is not defined

项目场景&#xff1a; 最近在Visual Studio 2019上编译libmodbus库&#xff0c;出现了很多问题&#xff0c;一一解决特此记录下来。 问题描述 首先就是configure.js文件的问题&#xff0c;它会生成两个很重要的头文件modbus_version.h和config.h&#xff0c;这两个头文件其中…

【Lazy ORM 框架学习】

Gitee 点赞关注不迷路 项目地址 快速入门 模块所属层级描述快照版本正式版本wu-database-lazy-lambdalambda针对不同数据源wu-database-lazy-orm-coreorm 核心orm核心处理wu-database-lazy-sqlsql核心处理成处理sql解析、sql执行、sql映射wu-elasticsearch-starterESESwu-hb…

【React】vite + react 项目,进行配置 eslint

安装与配置 eslint 1 安装 eslint @babel/eslint-parser2 初始化配置 eslint3 安装 vite-plugin-eslint4 配置 vite.config.js 文件5 修改 eslint 默认配置1 安装 eslint @babel/eslint-parser npm i -D eslint @babel/eslint-parser2 初始化配置 eslint npx eslint --init相…

Vmware虚拟机无法用root直连说明

Vmware虚拟机无法用root直连说明 背景目的SSH服务介绍无法连接检查配置 背景 今天在VM上新装了一套Centos-stream-9系统&#xff0c;网络适配器的连接方式采用的是桥接&#xff0c;安装好虚拟机后&#xff0c;在本地用ssh工具进行远程连接&#xff0c;ip、用户、密码均是成功的…

【正点原子FreeRTOS学习笔记】————(12)信号量

这里写目录标题 一、信号量的简介&#xff08;了解&#xff09;二、二值信号量&#xff08;熟悉&#xff09;三、二值信号量实验&#xff08;掌握&#xff09;四、计数型信号量&#xff08;熟悉&#xff09;五、计数型信号量实验&#xff08;掌握&#xff09;六、优先级翻转简介…

LInux|命令行参数|环境变量

LInux|命令行参数|环境变量 命令行参数main的参数之argc&#xff0c;argv几个小知识<font color#0099ff size 5 face"黑体">1.子进程默认能看到并访问父进程的数据<font color#4b0082 size 5 face"黑体">2.命令行创建的程序父进程都是bash 环…

iterm2下使用tmux如果左右分屏用鼠标选中文字跨越pane的问题

原来的样子 如图所示&#xff0c;我开了左右分屏的tmux&#xff0c;如果我指向选择右边的三行div&#xff0c;用鼠标选中之后&#xff0c;会发现把左边的内容也框选了。 解决办法 Edit -》 Selection Respects Soft Boundaries 勾选这个功能&#xff08;选择遵循软边界&#…

秒杀VLOOKUP函数,查找数字我只服SUMIF函数

一提到数据查询&#xff0c;相信很多人的第一反应就是使用Vlookup函数。但是今天我想跟大家分享另一种比较另类的数据查询方式&#xff0c;就是利用SUMIF函数&#xff0c;相较于Vlookup函数它更加的简单灵活、且不容易出错&#xff0c;下面我们就来学习下它的使用方法。 1、常…

win10开启了hyper-v,docker 启动还是报错 docker desktop windows hypervisor is not present

问题 在安装了docker windows版本后启动 docker报错docker desktop windows hypervisor is not present 解决措施 首先确认windows功能是否打开Hyper-v 勾选后重启&#xff0c;再次启动 启动后仍报这个错误&#xff0c;是Hyper-v没有设置成功 使用cmd禁用再启用 一.禁用h…

rust中常用cfg属性和cfg!宏的使用说明,实现不同系统的条件编译

cfg有两种使用方式&#xff0c;一种是属性&#xff1a; #[cfg()]&#xff0c;一种是宏&#xff1a;cfg! &#xff0c;这两个都是非常常用的功能。 #[cfg()]是 Rust 中的一个属性 用于根据配置条件来选择性地包含或排除代码。cfg 是 "configuration" 的缩写&#xf…

左手医生:医疗 AI 企业的云原生提效降本之路

相信这样的经历对很多人来说并不陌生&#xff1a;为了能到更好的医院治病&#xff0c;不惜路途遥远奔波到大城市&#xff1b;或者只是看个小病&#xff0c;也得排上半天长队。这些由于医疗资源分配不均导致的就医问题已是老生长谈。 云计算、人工智能、大数据等技术的发展和融…

微信小程序页面制作练习——制作一个九宫格导航图

要求&#xff1a; 代码实现&#xff1a; 先将所需要的资源图片存入我的image文件里面 模拟练习供参考&#xff0c;不建议这样存入image里&#xff0c;因为本地图片占内存太大&#xff0c;不能预览。 一、list.wxml里面搭建框架代码&#xff1a; <!--pages/list/list.wxml…

有关Kitchen-Rosenfeld角点检测的公式推导

第一次看到下面这个公式时,不太清楚怎么推导过来的 后面看了有关Kitchen-Rosenfeld的文章后,明白了 假设梯度的角度 θ \theta θ tan ⁡ θ = I y I x \tan \theta =\frac{I_y}{I_x} tanθ=Ix​Iy​​ 其中 I y I_y Iy​为y偏导, I x I_x Ix​为x偏导, I x x I_{xx} I…

初步接触C++

hello&#xff0c;各位小伙伴&#xff0c;本篇文章跟大家一起学习C&#xff0c;感谢大家对我上一篇的支持&#xff0c;如有什么问题&#xff0c;还请多多指教 &#xff01; 文章目录 初步区别C语言和C命名空间1.命名空间的定义2.命名空间的使用 C的输入输出缺省参数1.缺省参数…