线程池吞掉异常的case:源码阅读与解决方法

news2024/11/26 17:50:37

1. 问题背景

有一天给同事CR,看到一段这样的代码

try {
    for (param : params) {
        //并发处理,func无返回值
        ThreadPool.submit(func(param));
    }
} catch (Exception e) {
    log.info("func抛异常啦,参数是:{}", param)
}

我:你这段代码是利用并发降低RT对吧,如果func内部抛异常,你确定可以catch到吗

同事:可以啊, 为什么不可以(...

我:不如你run一把,在func mock一个异常出来试试

同事:我靠还真是

我:你可以用execute,改动比较小

同事:那么是为什么呢

2. 同事的例子

import java.util.concurrent.*;

public class ThreadPoolTest {

        public static void main(String[] args) throws Exception {
            ExecutorService executorService = new ThreadPoolExecutor(10, 20, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
            testExecute(executorService);
            Thread.sleep(2000);
            testSubmit1(executorService);
            Thread.sleep(2000);
            testSubmit2(executorService);
        }

        private static void testExecute(ExecutorService executorService) {
            executorService.execute(() -> {
                System.out.println("执行线程池execute方法");
                throw new RuntimeException("execute方法抛出异常");
            });
        }

        private static void testSubmit1(ExecutorService executorService) {
            executorService.submit(() -> {
                System.out.println("执行线程池submit方法1");
                throw new RuntimeException("submit方法1抛出异常");
            });
        }

        private static void testSubmit2(ExecutorService executorService) throws Exception {
            Future<Object> feature = executorService.submit(() -> {
                System.out.println("执行线程池submit方法2");
                throw new RuntimeException("submit方法2抛出异常");
            });
            feature.get();
        }
}

执行结果:

执行线程池execute方法
Exception in thread "pool-1-thread-1" java.lang.RuntimeException: execute方法抛出异常
	at ThreadPoolTest.lambda$testExecute$0(ThreadPoolTest.java:23)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
执行线程池submit方法1
执行线程池submit方法2
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: submit方法2抛出异常
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at ThreadPoolTest.testSubmit2(ThreadPoolTest.java:39)
	at ThreadPoolTest.main(ThreadPoolTest.java:17)
Caused by: java.lang.RuntimeException: submit方法2抛出异常
	at ThreadPoolTest.lambda$testSubmit2$2(ThreadPoolTest.java:37)
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
	at java.util.concurrent.FutureTask.run(FutureTask.java)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

3. 原理分析

3.1 线程池包的继承结构

3.2 submit和execute方法的差异

3.2.1 execute

方法定义在最顶层的Executor接口,并且Executor接口有且仅有这一个方法

public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

方法实现在ThreadPoolExecutor:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

实际执行的过程,在worker(是runnable的实现类)的run方法,run方法实际执行的是runWorker方法

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

可以看到执行过程中,如果task.run();发生异常,没有catch处理,异常会层层向外抛出;最终进入finally块,执行processWorkerExit;

3.2.2 submit

submit方法定义在ExecutorService

public interface ExecutorService extends Executor {

    /**
     * Submits a value-returning task for execution and returns a
     * Future representing the pending results of the task. The
     * Future's {@code get} method will return the task's result upon
     * successful completion.
     *
     * <p>
     * If you would like to immediately block waiting
     * for a task, you can use constructions of the form
     * {@code result = exec.submit(aCallable).get();}
     *
     * <p>Note: The {@link Executors} class includes a set of methods
     * that can convert some other common closure-like objects,
     * for example, {@link java.security.PrivilegedAction} to
     * {@link Callable} form so they can be submitted.
     *
     * @param task the task to submit
     * @param <T> the type of the task's result
     * @return a Future representing pending completion of the task
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if the task is null
     */
    <T> Future<T> submit(Callable<T> task);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return the given result upon successful completion.
     *
     * @param task the task to submit
     * @param result the result to return
     * @param <T> the type of the result
     * @return a Future representing pending completion of the task
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if the task is null
     */
    <T> Future<T> submit(Runnable task, T result);

    /**
     * Submits a Runnable task for execution and returns a Future
     * representing that task. The Future's {@code get} method will
     * return {@code null} upon <em>successful</em> completion.
     *
     * @param task the task to submit
     * @return a Future representing pending completion of the task
     * @throws RejectedExecutionException if the task cannot be
     *         scheduled for execution
     * @throws NullPointerException if the task is null
     */
    Future<?> submit(Runnable task);
}

实现在AbstractExecutorService

    public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

可以看到这里创建了RunnableFuture(而不是基础的worker),顾名思义,RunnableFuture同时实现了Runnable和Future接口,也就意味着可以对该任务执行get操作,看看RunnableFuture的run方法:

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

catch块对方法异常做了处理,与执行结果一同在Future中暂存起来;submit()执行完毕后返回Future对象,执行future.get()会触发异常的抛出;

当然了,如果你只是执行了submit,没有获取future,异常就会“神奇地”消失。

参考:

Java线程池实现原理及其在美团业务中的实践 - 美团技术团队

https://zhuanlan.zhihu.com/p/651997713

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1832508.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

函数式编程基本语法

文章目录 1.函数对象表现形式1.Lambda表达式&#xff08;功能全面&#xff09;1.基本语法2.只有一行逻辑&#xff0c;该逻辑结果是返回值3.复杂逻辑4.省略参数类型&#xff08;可以通过上下文推导出类型时&#xff0c;比如实现了函数式接口&#xff09;5.只有一个参数时&#x…

SQL160 国庆期间每类视频点赞量和转发量

描述 用户-视频互动表tb_user_video_log iduidvideo_idstart_timeend_timeif_followif_likeif_retweetcomment_id110120012021-09-24 10:00:002021-09-24 10:00:20110NULL210520022021-09-25 11:00:002021-09-25 11:00:30001NULL310220022021-09-25 11:00:002021-09-25 11:00…

【数据结构】红黑树实现详解

在本篇博客中&#xff0c;作者将会带领你使用C来实现一棵红黑树&#xff0c;此红黑树的实现是基于二叉搜索树和AVLTree一块来讲的&#xff0c;所以在看本篇博客之前&#xff0c;你可以先看看下面这两篇博客 【C】二叉搜索树-CSDN博客 【数据结构】AVLTree实现详解-CSDN博客 在这…

21.FuturePromise

在异步处理时,经常用到两个接口Future 和 Promise。 说明:Netty中的Future与jdk中的Future同名,但是是两个接口,netty的Future继承了jdk的Future,而Promise又对Netty的Future进行了扩展。 JDK的Future只能同步等待任务结束(成功、失败)才能得到结果。FutureTask.get()方…

项目(一)--高并发内存池项目简介

什么是高并发内存池 它是一个全球性大厂google(谷歌)的 开源项目,项目名字叫tcmalloc,全称是Thread-Caching Malloc,即线程缓存的malloc 作用&#xff1a; 我们知道C语言在堆上开辟空间和 释放使用的是malloc和free函数 并且C的动态内存管理new和delete 的底层实际上也调用了…

高考志愿填报:选择好专业还是好学校?

人不走空 &#x1f308;个人主页&#xff1a;人不走空 &#x1f496;系列专栏&#xff1a;算法专题 ⏰诗词歌赋&#xff1a;斯是陋室&#xff0c;惟吾德馨 高考志愿填报&#xff1a;选择好专业还是好学校&#xff1f; 每年高考结束后&#xff0c;考生和家长面临的一个…

数据结构之线性表(3)

数据结构之线性表&#xff08;3&#xff09; 上文我们了解了线性表的静动态存储的相关操作&#xff0c;此篇我们对线性表中链表的相关操作探讨。 在进行链表的相关操作时&#xff0c;我们先来理解单链表是什么&#xff1f; 1.链表的概念及结构 链表是一种物理存储结构上非连…

密钥管理简介

首先我们要知道什么是密钥管理&#xff1f; 密钥管理是一种涉及生成、存储、使用和更新密钥的过程。 密钥的种类 我们知道&#xff0c;对称密码主要包括分组密码和序列密码。但有时也可以将杂凑函数和消息认证码划分为这一类&#xff0c;将它们的密钥称为对称密钥&#xff1b;…

RAG与Langchain简介

RAG与Langchain简介 什么是RAGRAG解决的问题RAG工作流程RAG调优策略LangChain简介 什么是RAG 检索增强生成&#xff08;Retrieval-Augmented Generation&#xff09;&#xff0c;主要是通过从外部给大模型补充一些知识&#xff0c;相当于给模型外挂了一个知识库&#xff0c;让…

Navicat 安装及初步配置指南

Navicat 是一款广泛使用的数据库管理工具&#xff0c;支持多种数据库&#xff0c;如 MySQL、PostgreSQL、SQLite 等。以下是 Navicat 安装步骤的详细说明&#xff1a; 在 Windows 上安装 Navicat 下载 Navicat 安装包&#xff1a; 访问 Navicat 官方网站&#xff1a;Navicat 官…

iCopy for Mac 剪切板 粘贴工具 历史记录 安装(保姆级教程,新手小白轻松上手)

Mac分享吧 文章目录 效果可留存文本、图片、文件等复制历史记录也可根据关键字进行历史记录检索点击一下&#xff0c;可复制双击两下&#xff0c;复制内容&#xff0c;并将信息粘贴至鼠标指针处 一、准备工作二、开始安装1、双击运行软件&#xff0c;将其从左侧拖入右侧文件夹…

【Vue】Pinia管理用户数据

Pinia管理用户数据 基本思想&#xff1a;Pinia负责用户数据相关的state和action&#xff0c;组件中只负责触发action函数并传递参数 步骤1&#xff1a;创建userStore 1-创建store/userStore.js import { loginAPI } from /apis/user export const useUserStore defineStore(…

Havoc工具

Team端 客户端 打开后需要生成监听器和agent 监听 生成payload 最后上线 HTTPS流量 HTTP流量 心跳

基于微信小程序的在线答题小程序设计与实现

个人介绍 hello hello~ &#xff0c;这里是 code袁~&#x1f496;&#x1f496; &#xff0c;欢迎大家点赞&#x1f973;&#x1f973;关注&#x1f4a5;&#x1f4a5;收藏&#x1f339;&#x1f339;&#x1f339; &#x1f981;作者简介&#xff1a;一名喜欢分享和记录学习的…

安装 Nuxt.js 的步骤和注意事项

title: 安装 Nuxt.js 的步骤和注意事项 date: 2024/6/17 updated: 2024/6/17 author: cmdragon excerpt: Nuxt.js在Vue.js基础上提供的服务器端渲染框架优势&#xff0c;包括提高开发效率、代码维护性和应用性能。指南详细说明了从环境准备、Nuxt.js安装配置到进阶部署技巧&…

大模型KV Cache节省神器MLA学习笔记(包含推理时的矩阵吸收分析)

首先&#xff0c;本文回顾了MHA的计算方式以及KV Cache的原理&#xff0c;然后深入到了DeepSeek V2的MLA的原理介绍&#xff0c;同时对MLA节省的KV Cache比例做了详细的计算解读。接着&#xff0c;带着对原理的理解理清了HuggingFace MLA的全部实现&#xff0c;每行代码都去对应…

dentacare - hackmyvm

简介 靶机名称&#xff1a;dentacare 难度&#xff1a;中等 靶场地址&#xff1a;https://hackmyvm.eu/machines/machine.php?vmdentacare 本地环境 虚拟机&#xff1a;vitual box 靶场IP&#xff08;dentacare&#xff09;&#xff1a;192.168.56.120 跳板机IP(window…

使用Multipass编译OpenHarmony工程

Multipass 是一个轻量级虚拟机管理器&#xff0c;支持 Linux、Windows 与 macOS&#xff0c;这是为希望使用单个命令提供全新 Ubuntu 环境的开发人员而设计的。使用 Linux 上的 KVM、Windows 上的 Hyper-V 和 macOS 上的 HyperKit 来以最小的开销运行 VM&#xff0c;同时它还可…

驱动开发(二):创建字符设备驱动

往期文章&#xff1a; 驱动开发&#xff08;一&#xff09;&#xff1a;驱动代码的基本框架 驱动开发&#xff08;二&#xff09;&#xff1a;创建字符设备驱动 ←本文 驱动开发&#xff08;三&#xff09;&#xff1a;内核层控制硬件层 目录 字符驱动设备的作用 函数 …

找不到vcomp100.dll无法继续执行代码的原因及解决方法

在日常使用电脑的过程中&#xff0c;我们可能会遇到一些错误提示&#xff0c;其中之一就是“vcomp100.dll丢失”。那么&#xff0c;vcomp100.dll是什么&#xff1f;它为什么会丢失&#xff1f;对电脑有什么具体影响&#xff1f;如何解决这个问题&#xff1f;本文将为您详细解答…