【线上问题】CompletableFuture与线程池使用不当导致服务整个挂掉

news2025/4/7 2:51:13

Informal Essay By English

It is always a pleasure to learn

背景

在某一个风和日丽的早上,小组同事说昨晚线上服务有20分钟左右的不可用,当时内心一紧,不会是我写的代码有bug导致的吧👀,我正了正心态,故作轻松地说有定位到是什么原因导致的吗?(内心慌的一批🌝)他开始滔滔不绝地说了一大堆是如何排查问题的(技术人的特性,对于解决问题非常热忱),虽然我当时一直保持着很认真的神态,但其实心里非常煎熬(是谁的代码导致的一直没有说!!)。10分钟后…,同事语重心长地说,这次这个线上问题暴露我们以前写的代码还是欠缺一些场景的考虑。听到这里我大概已经知道不是我的代码导致的(注:我刚进这个小组不久),这个时候我也开始语重心长地附和道“是啊,以前我们在这块的考量还是有些不足的…然后就开始和他就‘代码质量如何保证’话题讨论了半小时🌚”。

问题描述

问题最终定位到是一个并发问题。线上有一个接口是通过CompletableFuture与线程池结合使用去获取下游的数据(注:使用异步的方式去获取下游数据则是因为调用的下游的接口是一个耗时高的soa接口),大致代码如下:

public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {

        ThreadPoolExecutor flowContractThreadPool = new ThreadPoolExecutor(2,
                5, 15, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(5),
                new ThreadPoolExecutor.DiscardPolicy());

        List<CompletableFuture> threadList = Lists.newArrayList();

        for (int i = 0; i < 100; i++) {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                log.info("main for");
                // do something
            }, flowContractThreadPool);
            threadList.add(future);
        }
        CompletableFuture.allOf(threadList.toArray(new CompletableFuture[threadList.size()])).join();
        //获取结果,然后做相关业务处理
        log.info("end");
    }

现在暂时不去分析代码,先描述现象。出现问题的那个晚上,有一波突刺流量,由于我们没有针对接口的请求失败做短信告警(虽然有钉钉异常告警群,但是大家都不是很关心群里消息🌞),因此一开始出现问题的接口出现大面积的请求超时而我们都没有感知到,直到最终服务出现不可用,值班同事才发现这个问题(经过此事件,我们很自然地加上请求失败告警🌦️)。从结合链路和日志分析定位问题出现的原因到服务恢复的MTTR大概花了10分钟(在此特意表扬一下此同事☀️☀️)。

问题分析

通过上面背景与问题的描述,已经知道整个问题的全貌,现在从技术的角度去分析一下CompletableFuture结合线程池的原理与使用注意事项。

CompletableFuture

从上述的案例代码里面,涉及到CompletableFuture中的三个方法,他们分别是runAsync、join、allOf,下面我们逐步去分析这几个方法:

runAsync

这个方法的效果返回一个新的CompletableFuture,该CompletableFuture是在给定执行器中运行的任务在运行给定动作后异步完成的。

public static CompletableFuture<Void> runAsync(Runnable runnable,
                                                   Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
    }

//此方法是为了保证操作系统是多核的情况下走线程池,单核情况下不走线程池,由单个线程去跑相应的逻辑
static Executor screenExecutor(Executor e) {
        if (!useCommonPool && e == ForkJoinPool.commonPool())
            return asyncPool;
        if (e == null) throw new NullPointerException();
        return e;
    }



static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        e.execute(new AsyncRun(d, f));
        return d;
    }

runAsync方法中对CompletableFuture进行了一层封装,通过AsyncRun对象组装一个空的CompletableFuture与Runnable,然后将空的CompletableFuture返回,我们再来看看AsyncRun的一个结构:

static final class AsyncRun extends ForkJoinTask<Void>
            implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<Void> dep; Runnable fn;
        AsyncRun(CompletableFuture<Void> dep, Runnable fn) {
            this.dep = dep; this.fn = fn;
        }

        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) {}
        public final boolean exec() { run(); return true; }
		
		
        public void run() {
            CompletableFuture<Void> d; Runnable f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                    	//由于f
                        f.run();
                        d.completeNull();
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                d.postComplete();
            }
        }
    }

AsyncRun和AsyncSupply的实现略有不同,AsyncRun的run中,计算的执行是通过调用传入的Runnable(源码中的 f 变量)的run方法进行的。由于没有返回值,所以这里在设置CompletableFuture的值时,使用其completeNull()方法,设置一个特殊的空值标记。

AsyncRun的继承结构大致如下:
在这里插入图片描述

allOf

allOf的方法的作用是当所有给定的CompletableFutures完成时,返回一个新的CompletableFuture。如果给定的任何一个CompletableFuture异常完成,那么返回的CompletableFuture也会异常完成,并使用CompletionException将此异常作为其原因。否则,给定的CompletableFuture的结果(如果有的话)不会反映在返回的CompletableFuture中,而是可以通过单独检查它们来获得。如果没有提供CompletableFutures,则返回一个完整的CompletableFuture,其值为null。该方法的应用之一是在继续一个程序之前等待一组独立的CompletableFuture的完成,如:allOf(c1, c2, c3).join();

public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
}

static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
                                           int lo, int hi) {
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        //对传入的CompletableFuture的参数校验,如果没有通过则返回AltResult
        if (lo > hi) // empty
            d.result = NIL;
        else {
            CompletableFuture<?> a, b;
            //通过右移操作获取中值
            int mid = (lo + hi) >>> 1;
            //通过递归的方式a、b的赋值操作(这个逻辑有点抽象,大家可以通过花图去理解一下)
            if ((a = (lo == mid ? cfs[lo] :
                      andTree(cfs, lo, mid))) == null ||
                (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                      andTree(cfs, mid+1, hi)))  == null)
                throw new NullPointerException();
            //判断任务是否执行
            if (!d.biRelay(a, b)) {
                BiRelay<?,?> c = new BiRelay<>(d, a, b);
                a.bipush(b, c);
                c.tryFire(SYNC);
            }
        }
        return d;
    }
 
 //判断任务是否执行,可简单理解为:result 是 null 任务没执行,不是 null 任务已执行。
boolean biRelay(CompletableFuture<?> a, CompletableFuture<?> b) {
        Object r, s; Throwable x;
        if (a == null || (r = a.result) == null ||
            b == null || (s = b.result) == null)
            return false;
        if (result == null) {
            if (r instanceof AltResult && (x = ((AltResult)r).ex) != null)
                completeThrowable(x, r);
            else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null)
                completeThrowable(x, s);
            else
                completeNull();
        }
        return true;
    }   
//这个方法是用于对象编排
final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
        if (c != null) {
            Object r;
            while ((r = result) == null && !tryPushStack(c))
                lazySetNext(c, null); // clear on failure
            if (b != null && b != this && b.result == null) {
                Completion q = (r != null) ? c : new CoCompletion(c);
                while (b.result == null && !b.tryPushStack(q))
                    lazySetNext(q, null); // clear on failure
            }
        }
    }

//用于获取future执行完的返回值
final CompletableFuture<Void> tryFire(int mode) {
            CompletableFuture<Void> d;
            CompletableFuture<T> a;
            CompletableFuture<U> b;
            if ((d = dep) == null || !d.biRelay(a = src, b = snd))
                return null;
            src = null; snd = null; dep = null;
            return d.postFire(a, b, mode);
 }
join

完成时返回结果值,如果异常完成则抛出(未检查的)异常。为了更好地符合通用函数形式的使用,如果在CompletableFuture的完成过程中涉及的计算抛出了异常,则该方法抛出一个(未检查的)CompletionException,并将底层异常作为其原因。

public T join() {
        Object r;
        return reportJoin((r = result) == null ? waitingGet(false) : r);
    }

//此方法的的作用就是用于判断AltResult的result中是否有异常,如果有抛出来
private static <T> T reportJoin(Object r) {
        if (r instanceof AltResult) {
            Throwable x;
            if ((x = ((AltResult)r).ex) == null)
                return null;
            if (x instanceof CancellationException)
                throw (CancellationException)x;
            if (x instanceof CompletionException)
                throw (CompletionException)x;
            throw new CompletionException(x);
        }
        @SuppressWarnings("unchecked") T t = (T) r;
        return t;
    }
//等待后返回原始结果,如果可中断且已中断则返回null。
private Object waitingGet(boolean interruptible) {
        Signaller q = null;
        boolean queued = false;
        int spins = -1;
        Object r;
        while ((r = result) == null) {
        	//spins用于自旋,可不用关注
            if (spins < 0)
                spins = SPINS;
            else if (spins > 0) {
                if (ThreadLocalRandom.nextSecondarySeed() >= 0)
                    --spins;
            }
            // 必会执行的分支,会将q的值赋值new Signaller(interruptible, 0L, 0L);
            else if (q == null)
                q = new Signaller(interruptible, 0L, 0L);
            //必会执行的分支,把stack 设置为 q
            else if (!queued)
                queued = tryPushStack(q);
            //线程中断时会执行
            else if (interruptible && q.interruptControl < 0) {
                q.thread = null;
                cleanStack();
                return null;
            }
            //这个是阻塞api执行的地方,着重看这个
            else if (q.thread != null && result == null) {
                try {
                    ForkJoinPool.managedBlock(q);
                } catch (InterruptedException ie) {
                    q.interruptControl = -1;
                }
            }
        }
        if (q != null) {
            q.thread = null;
            if (q.interruptControl < 0) {
                if (interruptible)
                    r = null; // report interruption
                else
                    Thread.currentThread().interrupt();
            }
        }
        postComplete();
        return r;
    }
//运行给定的可能阻塞的任务。当在ForkJoinPool中运行时,如果有必要,这个方法可能会安排一个空闲线程被激活,以确保当前线程在blocker.block()中被阻塞时有足够的并行性。
//此方法重复调用blocker.isReleasable()和blocker.block(),直到其中一个方法返回true。
//每次调用block .block()之前都会调用blockisreleasable(),返回false。
//如果不在ForkJoinPool中运行,则此方法在行为上等同于while (!block . isrelease ()) if (block .block()) break;如果在ForkJoinPool中运行,池可能首先被扩展以确保在调用block .block()期间有足够的并行性。
public static void managedBlock(ManagedBlocker blocker)
        throws InterruptedException {
        ForkJoinPool p;
        ForkJoinWorkerThread wt;
        Thread t = Thread.currentThread();
        if ((t instanceof ForkJoinWorkerThread) &&
            (p = (wt = (ForkJoinWorkerThread)t).pool) != null) {
            WorkQueue w = wt.workQueue;
            while (!blocker.isReleasable()) {
                if (p.tryCompensate(w)) {
                    try {
                        do {} while (!blocker.isReleasable() &&
                                     !blocker.block());
                    } finally {
                        U.getAndAddLong(p, CTL, AC_UNIT);
                    }
                    break;
                }
            }
        }
        else {
            do {} while (!blocker.isReleasable() &&
                         !blocker.block());
        }
    }

上面我们对于CompletableFuture有了一个粗略的认识,想了解更多的话,推荐去看看CompletableFuture 的 allOf 方法底层原理是什么。

案例分析
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {

        ThreadPoolExecutor flowContractThreadPool = new ThreadPoolExecutor(2,
                5, 15, TimeUnit.MINUTES,
                new ArrayBlockingQueue<>(5),
                new ThreadPoolExecutor.DiscardPolicy());

        List<CompletableFuture> threadList = Lists.newArrayList();

        for (int i = 0; i < 100; i++) {
            CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
                log.info("main for");
                // do something
            }, flowContractThreadPool);
            threadList.add(future);
        }
        CompletableFuture.allOf(threadList.toArray(new CompletableFuture[threadList.size()])).join();
        //获取结果,然后做相关业务处理
        log.info("end");
    }

再回到我们的案例代码里面,案例中我们给线程池设置的拒绝策略是DiscardPolicy(造成此次线上问题的罪魁祸首),此策略的作用是当线程池中的队列满时再来任务会静默丢弃task,现在问题来了,这个丢弃的任务就有可能是某些阻塞等待线程的FutureTask,那么这些调用了get()的无限时等待api的线程将无限时阻塞了,没人去唤醒他,如下图:
在这里插入图片描述

FutureTask被丢弃的话,CompletableFuture.allOf(threadList.toArray(new CompletableFuture[threadList.size()])).join();这段代码就会一直阻塞线程获取FutureTask结果,此时结果是永远无法获取到(task丢弃,因此FutureTask 的引用无法获取数据),因此会一直夯住主线程,随着夯住的线程越来越多,tomcat的线程也会被打满,整个服务就瘫痪了。

问题处理

那么如何去处理这种现象呢?如果认真看完上面的内容的小伙伴其实已经有答案了,我这里提供两种思路去处理。第一种就是根据线上流量峰值去增大队列长度(这种方式不推荐,治标不治本);第二种是通过修改线程池的拒绝策略避免这种情况(最好是自己实现拒绝策略,这种方式对于业务的扩展更佳灵活)。

最后提出一个问题,如果是你碰到这个问题,你会怎么去处理呢?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1400548.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[小程序]样式与配置

一、外部样式导入 使用import加外部样式表的相对路径并以 ; 表示语句结束。 import "common.wxss"; 二、全局样式和局部样式 全局样式位于app.wxss中&#xff0c;会作用于整个项目中所有页面中。 局部样式位于对应的wxss文件中&#xff0c;仅作用于当前页面&#x…

WebDriverWait太强大

selenium webdriver及wait 1 implicitly包打天下2 Linkedin无法登录返回值很乱&#xff0c;怎么破&#xff1f; 1 implicitly包打天下 有了implicitly之后&#xff0c;基本上不再关注网速之类的影响。 self.driver.implicitly_wait(511)2 Linkedin无法登录返回值很乱&#xf…

Vulnhub-TECH_SUPP0RT: 1渗透

文章目录 一、前言1、靶机ip配置2、渗透目标3、渗透概括 开始实战一、信息获取二、使用smb服务获取信息三、密码破解四、获取webshell五、反弹shell六、web配置文件获取信息七、提权 一、前言 由于在做靶机的时候&#xff0c;涉及到的渗透思路是非常的广泛&#xff0c;所以在写…

ad18报错:clearance constraint

最常见的报错&#xff0c;直译过来的意思&#xff1a;间隙约束。也就是约束PCB中的电气间距&#xff0c;比如阻容各类元件的焊盘间距小于规则中的设定值&#xff0c;即报警。 Altium Designer 中的 Clearance Constraint 错误如何修改-CSDN博客 【Altium Designer21】DRC规则…

Vulnhub靶机:FunBox 2

一、介绍 运行环境&#xff1a;Virtualbox 攻击机&#xff1a;kali&#xff08;10.0.2.15&#xff09; 靶机&#xff1a;FunBox 2&#xff08;10.0.2.27&#xff09; 目标&#xff1a;获取靶机root权限和flag 靶机下载地址&#xff1a;https://download.vulnhub.com/funbo…

决策树的分类

概念 决策树是一种树形结构 树中每个内部节点表示一个特征上的判断&#xff0c;每个分支代表一个判断结果的输出&#xff0c;每个叶子节点代表一种分类结果 决策树的建立过程 1.特征选择&#xff1a;选取有较强分类能力的特征。 2.决策树生成&#xff1a;根据选择的特征生…

【代码随想录07】344.反转字符串 541. 反转字符串II 05.替换空格 151.翻转字符串里的单词 55. 右旋转字符串

目录 344. 反转字符串题目描述做题思路参考代码 541. 反转字符串 II题目描述参考代码 05. 替换数字题目描述参考代码 151. 反转字符串中的单词题目描述参考代码 55. 右旋转字符串题目描述参考代码 344. 反转字符串 题目描述 编写一个函数&#xff0c;其作用是将输入的字符串反…

HTML以及CSS相关知识总结(一)

近日就开始回顾html和css相关知识啦&#xff0c;并且会学习html5和css3的新知识&#xff0c;以下是我对记忆不太深刻的地方以及新知识点的总结&#xff1a; Web标准&#xff1a; 结构&#xff1a;用于对网页元素进行整理和分类&#xff0c;即HTML 表现&#xff1a;用于设置网页…

CentOS Linux操作系统源码安装最新Redis版本,使用JSON数据类型踩入新坑

最近有空查阅了redis官网&#xff0c;发现redis数据类型不止Strings、Lists、Sets、Hashes、Sorted sets&#xff0c;还多了几种&#xff0c;决定先试用下JSON数据类型 1、安装Redis软件 JSON数据类型&#xff0c;对Redis版本有要求&#xff0c;需要大于4.0版本。下图是华为云…

Transformer|1.4 CNN遇到的问题与窘境

文章目录 CNN遇到的问题与窘境transformer 的优势 CNN遇到的问题与窘境 判断一个人是否为美人&#xff0c;既要看她各个五官&#xff0c;也要看她各个五官占的比例和协调。 既要照顾好局部信息&#xff0c;也要照顾好全局信息。 局部信息用小的感受野进行感受&#xff0c;而全局…

【操作系统和计网从入门到深入】(六)进程间通信

前言 这个专栏其实是博主在复习操作系统和计算机网络时候的笔记&#xff0c;所以如果是博主比较熟悉的知识点&#xff0c;博主可能就直接跳过了&#xff0c;但是所有重要的知识点&#xff0c;在这个专栏里面都会提到&#xff01;而且我也一定会保证这个专栏知识点的完整性&…

python算法与数据结构---单调栈与实践

单调栈 单调栈是一个栈&#xff0c;里面的元素的大小按照它们所在栈的位置&#xff0c;满足一定的单调性&#xff1b; 性质&#xff1a; 单调递减栈能找到左边第一个比当前元素大的元素&#xff1b;单调递增栈能找到左边第一个比当前元素小的元素&#xff1b; 应用场景 一般用…

19.云原生CICD之ArgoCD入门

云原生专栏大纲 文章目录 ArgoCDArgoCD 简介GitOps介绍Argo CD 的工作流程argocd和jinkens对比kustomize介绍ArgoCD和kustomize关系 安装argocdargocd控制台介绍首页应用创建表单SYNC OPTIONS&#xff08;同步选项&#xff09;SYNC POLICY&#xff08;同步策略&#xff09; 应…

视频异常检测论文笔记

看几篇中文的学习一下别人的思路 基于全局-局部自注意力网络的视频异常检测方法主要贡献&#xff1a;网络结构注意力模块结构&#xff1a; 融合自注意力和自编码器的视频异常检测主要贡献&#xff1a;网络结构Transformer模块动态图 融合门控自注意力机制的生成对抗网络视频异常…

Kafka框架详解

Kafka 1、Kafka介绍 ​ Kafka是最初由linkedin公司开发的&#xff0c;使用scala语言编写&#xff0c;kafka是一个分布式&#xff0c;分区的&#xff0c;多副本的&#xff0c;多订阅者的消息队列系统。 2、Kafka相比其他消息队列的优势 ​ 常见的消息队列&#xff1a;Rabbit…

【Docker篇】详细讲解容器相关命令

&#x1f38a;专栏【Docker】 &#x1f354;喜欢的诗句&#xff1a;更喜岷山千里雪 三军过后尽开颜。 &#x1f386;音乐分享【如愿】 &#x1f384;欢迎并且感谢大家指出小吉的问题&#x1f970; 文章目录 &#x1f6f8;容器&#x1f339;相关命令&#x1f354;案例⭐创建并运…

大模型微调实战笔记

大模型三要素 1.算法&#xff1a;模型结构&#xff0c;训练方法 2.数据&#xff1a;数据和模型效果之间的关系&#xff0c;token分词方法 3.算力&#xff1a;英伟达GPU&#xff0c;模型量化 基于大模型对话的系统架构 基于Lora的模型训练最好用&#xff0c;成本低好上手 提…

Mysql流程控制函数

1概述 Mysql中的流程控制函数非常重要&#xff0c;可以根据不同的条件&#xff0c;执行不同的流程转换&#xff0c;可以在SQL语句中实现不同的条件选择。MySQL中的流程处理函数主要包括IF()、IFNULL()和CASE()函数。 1.1 IF函数 SELECT IF(1 > 0, 正确, 错误);1.2 IFNULL…

ROS第 12 课 Launch 启动文件的使用方法

文章目录 第 12 课 Launch 启动文件的使用方法1.本节前言2.Lanuch 文件基本语法2.2 参数设置2.3 重映射嵌套 3.实操练习 第 12 课 Launch 启动文件的使用方法 1.本节前言 我们在前面的教程里面通过命令行来尝试运行新的节点。但随着创建越来越复杂的机器人系统中&#xff0c;打…

idea运行卡顿优化方案

文章目录 前言一、调整配置1. idea.properties2. idea.vmoptions3.heap size4.Plugins5.Inspections 总结 前言 本人电脑16G内存&#xff0c;处理器i7 10代&#xff0c;磁盘空间也够用&#xff0c;整体配置够用&#xff0c;但运行idea会很卡&#xff0c;记录优化过程&#xff…