java线程池原理及使用和处理流程

news2025/4/20 21:37:12

实际测试使用如下:

package com.study;

import java.util.concurrent.*;

/**
 * 线程池作用:
 * 1、线程的复用
 * 2、资源管理
 * 3、任务调度
 * --------------执行过程--------------
 * 第1-3个任务进来时,直接创建任务并执行
 * 第4-8个任务进来时,会把新任务放到队列,然后按照顺序执行队列中的任务,新的任务在队列最后
 * 第9-15个任务进来时,会先执行队列中已有的任务,再执行新的任务
 * 第16个任务进来时,会执行拒绝策略
 *
 * @author admin
 * @since 2025-04-18 15:48
 */
public class ThreadPoolDemo {

    // 核心线程数
    private static final int CORE_POOL_SIZE = 3;

    // 最大线程数
    private static final int MAX_POOL_SIZE = 10;

    // 空闲线程存活时间
    private static final long KEEP_ALIVE_TIME = 5;

    // 时间单位
    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;

    // 队列容量
    private static final int QUEUE_SIZE = 10;

    /**
     * 任务队列
     * 1、ArrayBlockingQueue 有界队列,通用队列,线程池的默认队列
     * 2、LinkedBlockingQueue 无界队列,默认大小为Integer.MAX_VALUE
     * 3、SynchronousQueue 无容量队列,不存储任务,直接提交给线程池处理
     * 4、PriorityBlockingQueue 优先级队列,线程池的默认队列
     * 5、DelayQueue 延迟队列
     */
    static ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(QUEUE_SIZE);

    /**
     * 拒绝策略
     * 1、abortPolicy 直接抛出异常(默认)
     * 2、discardPolicy 直接丢弃任务
     * 3、discardOldestPolicy 踢出队列中最老的任务,再次提交当前任务
     * 4、callerRunsPolicy 由提交任务的线程来执行任务
     */
    static final RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();

    /**
     * 线程工厂
     */
    static final ThreadFactory threadFactory = new CustomThreadFactory(Thread.NORM_PRIORITY, false);

    /**
     * 传统线程池创建
     * 场景:不同业务线需隔离资源(如支付交易与普通查询互不影响)
     */
    private static final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TIME_UNIT, queue, threadFactory, handler);

    /**
     * 1、固定-线程池创建
     * 默认核心线程数和最大线程数相同
     * 默认空闲线程存活时间为0s
     * 默认LinkedBlockingQueue无界队列
     * 默认拒绝策略和默认线程工厂
     * 场景:短时涌入大量HTTP请求(如电商秒杀、票务系统),需快速响应且避免服务器崩溃
     */
    private static final ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);

    /**
     * 2、缓存-线程池创建
     * 默认核心线程数0,所有线程均为非核心线程
     * 最大线程数为Integer最大值
     * 默认空闲线程存活时间60s
     * 使用SynchronousQueue无容量队列,不存储任务,直接提交给线程池处理
     * 默认拒绝策略和默认线程工厂
     * 场景:主线程需快速返回,耗时操作异步执行(如发送邮件、生成报表)
     */
    private static final ExecutorService cachedExecutor = Executors.newCachedThreadPool();

    /**
     * 3、任务-线程池创建
     * 最大线程数为Integer最大值
     * 默认空闲线程存活时间0s
     * 默认DelayedWorkQueue高性能队列
     * 默认拒绝策略和默认线程工厂
     * 场景:定时执行任务(如每日数据备份、定期推送消息)
     */
    private static final ExecutorService scheduledExecutor = Executors.newScheduledThreadPool(1);

    /**
     * 4、单线程-线程池创建
     * 核心线程数和最大线程数都为1
     * 默认空闲线程存活时间0s
     * 默认LinkedBlockingQueue无界队列
     * 默认拒绝策略和默认线程工厂
     * 场景:跨服务任务调度(如分布式锁续期、集群状态同步)
     */
    private static final ExecutorService singleExecutor = Executors.newSingleThreadExecutor();

    /**
     * 5、并行-线程池创建
     * 并行级别(默认 CPU 核心数)
     * 场景:多核CPU任务并行执行(如并行计算、并行处理),海量数据分片并行处理(如日志分析、图像渲染)、递归任务分解
     */
    private static final ExecutorService workExecutor = Executors.newWorkStealingPool();

    /**
     * 测试
     */
    public static void main(String[] args) throws InterruptedException {
        String type = "fixed";
        String returnType = "test";
        ExecutorService executorService = getExecutorService(type);
        for (int i = 0; i < 10; i++) {
            executeTask(executorService, i, returnType);
        }
        // 等待任务执行完成关闭线程池
        executorService.shutdown();
        // 添加等待终止逻辑(确保任务完成)
        if (!executorService.awaitTermination(30, TimeUnit.SECONDS)) {
            System.err.println("线程池未在限定时间内关闭");
        }
    }

    /**
     * 线程池执行任务
     */
    private static void executeTask(ExecutorService executorService, int taskId, String returnType) {
        // 根据返回值判断是否执行成功
        if (("wait").equals(returnType)) {
            Future<String> future = getFuture(executorService, taskId);
            // 方法1:最大等待30s
            getFutureResultWait(future, taskId);
        } else if (("while").equals(returnType)) {
            FutureTask<String> future = getFutureTask(executorService, taskId);
            // 方法2:轮询判断是否执行完成
            getFutureResultWhile(future, taskId);
        } else {
            // 方法3:异步获取返回值
            CompletableFuture<Integer> futureWithExecutor = CompletableFuture.supplyAsync(() -> {
                System.out.println(taskId + "\t" + Thread.currentThread().getName());
                return taskId;
            }, executorService);

            // 添加上述异步回调处理
            futureWithExecutor.whenComplete((result, ex) -> {
                if (ex != null) {
                    System.err.println("Task " + taskId + " failed: " + ex.getMessage());
                } else {
                    System.out.println("Task " + taskId + " completed: " + result);
                }
            });
        }
    }

    /**
     * 提交异步任务并返回Future对象
     *
     * @param executorService 线程池执行器,用于提交异步任务
     * @param taskId          任务标识符,用于日志跟踪
     * @return Future<String> 表示异步计算的结果对象
     */
    private static Future<String> getFuture(ExecutorService executorService, int taskId) {
        return executorService.submit(() -> {
            // 任务执行逻辑:打印线程信息并模拟耗时操作
            System.out.println(taskId + "\t" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
                return "OK";
            } catch (InterruptedException e) {
                // 正确的中断处理:恢复中断状态并记录日志
                Thread.currentThread().interrupt();
                System.out.println("Task interrupted: " + taskId);
                return "ERROR";
            }
        });
    }

    /**
     * 提交异步任务并返回FutureTask对象
     *
     * @param executorService 线程池执行器,用于提交异步任务
     * @param taskId          任务标识符,用于日志跟踪
     * @return FutureTask<String> 表示异步计算的结果对象
     */
    private static FutureTask<String> getFutureTask(ExecutorService executorService, int taskId) {
        // 创建Callable任务
        Callable<String> task = () -> {
            // 任务执行逻辑:打印线程信息并模拟耗时操作
            System.out.println(taskId + "\t" + Thread.currentThread().getName());
            try {
                Thread.sleep(1000);
                return "OK";
            } catch (InterruptedException e) {
                // 正确的中断处理:恢复中断状态并记录日志
                Thread.currentThread().interrupt();
                System.out.println("Task interrupted: " + taskId);
                return "ERROR";
            }
        };

        /*
         * 创建FutureTask实例以包装异步任务
         * @param task 需要执行的Callable/Runnable任务对象
         * FutureTask兼具Runnable和Future的特性:
         * 1. 可作为Runnable被线程池执行
         * 2. 通过Future接口方法获取计算结果
         */
        FutureTask<String> futureTask = new FutureTask<>(task);

        /*
         * 将FutureTask提交到线程池执行
         * @param futureTask 包装了任务的可执行对象
         * 提交后会立即返回,实际执行由线程池调度
         * 后续可通过futureTask.get()阻塞获取计算结果
         * 或通过futureTask.cancel()取消任务
         */
        executorService.submit(futureTask);
        return futureTask;
    }

    /**
     * 阻塞等待Future结果并处理超时
     *
     * @param future 异步任务Future对象
     * @param taskId 任务标识符,用于异常日志
     */
    private static void getFutureResultWait(Future<String> future, int taskId) {
        try {
            // 设置最长等待时间为10秒
            String result = future.get(5, TimeUnit.SECONDS);
            System.out.println("Task result: " + result);
        } catch (InterruptedException | ExecutionException e) {
            // 中断处理:恢复中断状态并记录日志
            Thread.currentThread().interrupt();
            System.out.println("Task interrupted: " + taskId);
        } catch (TimeoutException e) {
            // 超时处理:主动取消任务并记录日志
            future.cancel(true);
            System.out.println("Task timeout: " + taskId);
        }
    }

    /**
     * 轮询检查任务完成状态
     *
     * @param future 异步任务Future对象
     * @param taskId 任务标识符,用于中断日志
     */
    private static void getFutureResultWhile(Future<String> future, int taskId) {
        String result = "ERROR";
        // 轮询机制检查任务状态
        while (!future.isDone()) {
            System.out.println("任务仍在执行中...");
            try {
                // 降低轮询频率以避免CPU过载
                Thread.sleep(500);
                result = "OK";
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                System.out.println("Task interrupted: " + taskId);
                // 立即退出循环
                break;
            }
        }
        System.out.println("Task result: " + result);
    }

    /**
     * 获取不同类型的线程池
     */
    private static ExecutorService getExecutorService(String type) {
        switch (type) {
            case "fixed":
                return fixedExecutor;
            case "cached":
                return cachedExecutor;
            case "scheduled":
                return scheduledExecutor;
            case "single":
                return singleExecutor;
            case "work":
                return workExecutor;
            default:
                return poolExecutor;
        }
    }
}

自定义线程工厂

package com.study;

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 自定义线程工厂
 *
 * @author admin
 * @since 2025-04-18 17:29
 */
public class CustomThreadFactory implements ThreadFactory {
    // 编号从1开始
    private final AtomicInteger number = new AtomicInteger(1);
    private static final String NAME_PREFIX = "pool-thread-";
    // 线程优先级
    private final int priority;
    // 是否守护线程
    private final boolean daemon;

    /**
     * 默认构造方法,创建普通优先级、非守护线程
     */
    public CustomThreadFactory() {
        this(Thread.NORM_PRIORITY, false);
    }

    /**
     * 构造方法,自定义优先级和是否守护线程
     *
     * @param priority 优先级
     * @param daemon   是否守护线程
     */
    public CustomThreadFactory(int priority, boolean daemon) {
        if (priority < Thread.MIN_PRIORITY || priority > Thread.MAX_PRIORITY) {
            throw new IllegalArgumentException("优先级超出范围: " + priority);
        }
        this.priority = priority;
        this.daemon = daemon;
    }

    /**
     * 创建线程
     *
     * @param r 线程任务
     * @return 线程对象
     */
    @Override
    public Thread newThread(Runnable r) {
        Thread thread = new Thread(r);
        thread.setName(NAME_PREFIX + number.getAndIncrement());
        thread.setPriority(priority);
        thread.setDaemon(daemon);
        thread.setUncaughtExceptionHandler((t, e) -> {
            System.err.println("线程 " + t.getName() + " 发生异常: " + e);
            e.printStackTrace();
        });
        return thread;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2338126.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

用 NLP + Streamlit,把问卷变成能说话的反馈

网罗开发 &#xff08;小红书、快手、视频号同名&#xff09; 大家好&#xff0c;我是 展菲&#xff0c;目前在上市企业从事人工智能项目研发管理工作&#xff0c;平时热衷于分享各种编程领域的软硬技能知识以及前沿技术&#xff0c;包括iOS、前端、Harmony OS、Java、Python等…

TCP/IP和UDP协议的发展历程

TCP/IP和UDP协议的发展历程 引言 互联网的发展史是人类技术创新的辉煌篇章&#xff0c;而在这一发展过程中&#xff0c;通信协议发挥了奠基性的作用。TCP/IP&#xff08;传输控制协议/互联网协议&#xff09;和UDP&#xff08;用户数据报协议&#xff09;作为互联网通信的基础…

Function Calling的时序图(含示例)

&#x1f9cd; 用户&#xff1a; 发起请求&#xff0c;输入 prompt&#xff08;比如&#xff1a;“请告诉我北京的天气”&#xff09;。 &#x1f7ea; 应用&#xff1a; 将用户输入的 prompt 和函数定义&#xff08;包括函数名、参数结构等&#xff09;一起发给 OpenAI。 …

若依框架修改左侧菜单栏默认选中颜色

1.variables.sacc中修改为想要的颜色 2.给目标设置使用的颜色

搜广推校招面经七十八

字节推荐算法 一、实习项目&#xff1a;多任务模型中的每个任务都是做什么&#xff1f;怎么确定每个loss的权重 这个根据实际情况来吧。如果实习时候用了moe&#xff0c;就可能被问到。 loss权重的话&#xff0c;直接根据任务的重要性吧。。。 二、特征重要性怎么判断的&…

广搜bfs-P1443 马的遍历

P1443 马的遍历 题目来源-洛谷 题意 要求马到达棋盘上任意一个点最少要走几步 思路 国际棋盘规则是马的走法是-日字形&#xff0c;也称走马日&#xff0c;即x,y一个是走两步&#xff0c;一个是一步 要求最小步数&#xff0c;所以考虑第一次遍历到的点即为最小步数&#xff…

强化学习算法系列(六):应用最广泛的算法——PPO算法

强化学习算法 &#xff08;一&#xff09;动态规划方法——策略迭代算法(PI)和值迭代算法(VI) &#xff08;二&#xff09;Model-Free类方法——蒙特卡洛算法(MC)和时序差分算法(TD) &#xff08;三&#xff09;基于动作值的算法——Sarsa算法与Q-Learning算法 &#xff08;四…

AI Agents系列之AI代理架构体系

1. 引言 智能体架构是定义智能体组件如何组织和交互的蓝图,使智能体能够感知其环境、推理并采取行动。本质上,它就像是智能体的数字大脑——集成了“眼睛”(传感器)、“大脑”(决策逻辑)和“手”(执行器),用于处理信息并采取行动。 选择正确的架构对于构建有效的智能…

2025海外代理IP测评:Bright Data,ipfoxy,smartproxy,ipipgo,kookeey,ipidea哪个值得推荐?

近年来&#xff0c;随着全球化和跨境业务需求的不断扩大“海外代理IP”逐渐成为企业和个人在多样化场景中的重要工具。无论是进行数据采集、广告验证、社交媒体管理&#xff0c;还是跨境电商平台运营&#xff0c;选择合适的代理IP服务商都显得尤为重要。然而&#xff0c;市场上…

Android守护进程——Vold (Volume Daemon)

简介 介绍&#xff1a;Vold 是用来管理 android 系统的存储设备&#xff0c;如U盘、SD卡、磁盘等移动设备的热插拔、挂载、卸载、格式化 框架结构&#xff1a;Vold 在系统中以守护进程存在&#xff0c;是一个单独的进程。处于Kernel和Framework之间&#xff0c;是两个层级连接…

vue3+vite 实现.env全局配置

首先创建.env文件 VUE_APP_BASE_APIhttp://127.0.0.1/dev-api 然后引入依赖&#xff1a; pnpm install dotenv --save-dev 引入完成后&#xff0c;在vite.config.js配置文件内加入以下内容&#xff1a; const env dotenv.config({ path: ./.env }).parsed define: { // 将…

AI 组件库是什么?如何影响UI的开发?

AI组件库是基于人工智能技术构建的、面向用户界面&#xff08;UI&#xff09;开发的预制模块集合。它们结合了传统UI组件&#xff08;如按钮、表单、图表&#xff09;与AI能力&#xff08;如机器学习、自然语言处理、计算机视觉&#xff09;&#xff0c;旨在简化开发流程并增强…

OpenCV day6

函数内容接上文&#xff1a;OpenCV day4-CSDN博客 , OpenCV day5-CSDN博客 目录 平滑&#xff08;模糊&#xff09; 25.cv2.blur()&#xff1a; 26.cv2.boxFilter(): 27.cv2.GaussianBlur()&#xff1a; 28.cv2.medianBlur(): 29.cv2.bilateralFilter()&#xff1a; 锐…

【AI飞】AutoIT入门七(实战):python操控autoit解决csf视频批量转换(有点难,AI都不会)

背景&#xff1a; 终极目标&#xff1a;通过python调用大模型&#xff0c;获得结果&#xff0c;然后根据返回信息&#xff0c;控制AutoIT操作电脑软件&#xff0c;执行具体工作。让AI更具有执行力。 已完成部分&#xff1a; 关于python调用大模型的&#xff0c;可以参考之前的…

MARA/MARC表 PSTAT字段

最近要开发一个维护物料视图的功能。其中PSTAT字段是来记录已经维护的视图的。这里记录一下视图和其对应的字母。 MARA还有个VPSTA&#xff08;完整状态&#xff09;字段&#xff0c;不过在我试的时候每次PSTAT出现一个它就增加一个&#xff0c;不知道具体是为什么。 最近一直…

学习型组织与系统思考

真正的学习型组织不是只关注个人的学习&#xff0c;而是关注整个系统的学习。—彼得圣吉 在这两年里&#xff0c;越来越多的企业开始询问是否可以将系统思考的内容内化给自己的内训师&#xff0c;进而在公司内部进行教学。我非常理解企业这样做的动机&#xff0c;毕竟内部讲师…

支持mingw g++14.2 的c++23 功能print的vscode tasks.json生成调试

在mingw14.2版本中, print库的功能默认没有开启, 生成可执行文件的tasks.json里要显式加-lstdcexp, 注意放置顺序. tasks.json (支持mingw g14.2 c23的print ) {"version": "2.0.0","tasks": [{"type": "cppbuild","…

守护者进程小练习

守护者进程含义 定义&#xff1a;守护进程&#xff08;Daemon&#xff09;是运行在后台的特殊进程&#xff0c;独立于控制终端&#xff0c;周期性执行任务或等待事件触发。它通常以 root 权限运行&#xff0c;名称常以 d 结尾&#xff08;如 sshd, crond&#xff09;。 特性&a…

opencv函数展示3

一、图像平滑&#xff08;模糊&#xff09; 线性滤波&#xff08;速度快&#xff09;&#xff1a; 1.cv2.blur() 2.cv2.boxFilter() 3.cv2.GaussianBlur() 非线性滤波&#xff08;速度慢但效果好&#xff09;&#xff1a; 4.cv2.medianBlur() 5.cv2.bilateralFilter() 二、锐…

遥感技术赋能电力设施监控:应用案例篇

目前主流的电力巡检手段利用无人机能够通过设定灵活航线进行低空飞行、搭载不同的采集设备&#xff0c;能够从不同角度对输电线进行贴近拍摄&#xff0c;但缺陷是偏远山区无人机飞行技术要求高&#xff0c;成本高&#xff0c;且飞行的无人机也可能会对输电线产生破坏。 星图云开…