一个诡异的 Pulsar InterruptedException 异常

news2024/11/18 9:38:30

背景

今天收到业务团队反馈线上有个应用往 Pulsar 中发送消息失败了,经过日志查看得知是发送消息时候抛出了 java.lang.InterruptedException 异常。

和业务沟通后得知是在一个 gRPC 接口中触发的消息发送,大约持续了半个小时的异常后便恢复正常了,这是整个问题的背景。

前置排查

拿到该问题后首先排查下是否是共性问题,查看了其他的应用没有发现类似的异常;同时也查看了 Pulsar broker 的监控大盘,在这个时间段依然没有波动和异常;

这样可以初步排除是 Pulsar 服务端的问题。

接着便是查看应用那段时间的负载情况,从应用 QPS 到 JVM 的各个内存情况依然没发现有什么明显的变化。

Pulsar 源码排查

既然看起来应用本身和 Pulsar broker 都没有问题的话那就只能从异常本身来排查了。

首先第一步要得知具体使用的是 Pulsar-client 是版本是多少,因为业务使用的是内部基于官方 SDK 封装 springboot starter 所以第一步还得排查这个 starter 是否有影响。

通过查看源码基本排除了 starter 的嫌疑,里面只是简单的封装了 SDK 的功能而已。

org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:1027) at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:91) at 
java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) 
at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1999) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:89) ... 49 common frames omitted Caused by: org.apache.pulsar.client.api.PulsarClientException: java.lang.InterruptedException 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ(ProducerImpl.java:393) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync$original$BWm7PPlZ$accessor$i7NYMN6i(ProducerImpl.java) 
at org.apache.pulsar.client.impl.ProducerImpl$auxiliary$EfuVvJLT.call(Unknown Source) 
at org.apache.skywalking.apm.agent.core.plugin.interceptor.enhance.InstMethodsInter.intercept(InstMethodsInter.java:86) 
at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java) 
at org.apache.pulsar.client.impl.ProducerImpl.internalSendAsync(ProducerImpl.java:292) 
at org.apache.pulsar.client.impl.ProducerImpl.internalSendWithTxnAsync(ProducerImpl.java:363) 
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendWithTxnAsync(PartitionedProducerImpl.java:191) 
at org.apache.pulsar.client.impl.PartitionedProducerImpl.internalSendAsync(PartitionedProducerImpl.java:167) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:103) 
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.send(TypedMessageBuilderImpl.java:82) ... 49 common frames omitted Caused by: java.lang.InterruptedException: null
at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1343) 
at java.base/java.util.concurrent.Semaphore.acquire(Semaphore.java:318) 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:758)

接下来便只能是分析堆栈了,因为 Pulsar-client 的部分实现源码是没有直接打包到依赖中的,反编译的话许多代码行数对不上,所以需要将官方的源码拉到本地,切换到对于的分支进行查看。

这一步稍微有点麻烦,首先是代码库还挺大的,加上之前如果没有准备好 Pulsar 的开发环境的话估计会劝退一部分人;但其实大部分问题都是网络造成的,只要配置一些 Maven 镜像多试几次总会编译成功。

我这里直接将分支切换到 branch-2.8

从堆栈的顶部开始排查 TypedMessageBuilderImpl.java:91

看起来是内部异步发送消息的时候抛了异常。

接着往下看到这里:

java.lang.InterruptedException 
at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:775) at

看起来是这里没错,但是代码行数明显不对;因为 2.8 这个分支也是修复过几个版本,所以中间有修改导致代码行数与最新代码对不上也正常。

semaphore.get().acquire();

不过初步来看应该是这行代码抛出的线程终端异常,这里看起来只有他最有可能了。

为了确认是否是真的是这行代码,这个文件再往前翻了几个版本最终确认了就是这行代码没错了。

我们点开java.util.concurrent.Semaphore#acquire()的源码,

    /**
     * <li>has its interrupted status set on entry to this method; or
     * <li>is {@linkplain Thread#interrupt interrupted} while waiting
     * for a permit,
     * </ul>
     * then {@link InterruptedException} is thrown and the current thread's
     * interrupted status is cleared.
     *
     * @throws InterruptedException if the current thread is interrupted
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted() ||
            (tryAcquireShared(arg) < 0 &&
             acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }    

通过源码会发现 acquire() 函数确实会响应中断,一旦检测到当前线程被中断后便会抛出 InterruptedException 异常。

定位问题

所以问题的原因基本确定了,就是在 Pulsar 的发送消息线程被中断了导致的,但为啥会被中断还需要继续排查。

我们知道线程中断是需要调用 Thread.currentThread().interrupt(); API的,首先猜测是否 Pulsar 客户端内部有个线程中断了这个发送线程。

于是我在 pulsar-client 这个模块中搜索了相关代码:

排除掉和 producer 不相关的地方,其余所有中断线程的代码都是在有了该异常之后继续传递而已;所以初步来看 pulsar-client 内部没有主动中断的操作。

既然 Pulsar 自己没有做,那就只可能是业务做的了?

于是我在业务代码中搜索了一下:

果然在业务代码中搜到了唯一一处中断的地方,而且通过调用关系得知这段代码是在消息发送前执行的,并且和 Pulsar 发送函数处于同一线程。

大概的伪代码如下:

        List.of(1, 2, 3).stream().map(e -> {
                    return CompletableFuture.supplyAsync(() -> {
                        try {
                            TimeUnit.MILLISECONDS.sleep(10);
                        } catch (InterruptedException ex) {
                            throw new RuntimeException(ex);
                        }
                        return e;
                    });
                }
        ).collect(Collectors.toList()).forEach(f -> {
            try {
                Integer integer = f.get();
                log.info("====" + integer);
                if (integer==3){
                    TimeUnit.SECONDS.sleep(10);
                    Thread.currentThread().interrupt();
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        });
	   MessageId send = producer.newMessage().value(msg.getBytes()).send();

执行这段代码可以完全复现同样的堆栈。

幸好中断这里还打得有日志:

通过日志搜索发现异常的时间和这个中断的日志时间点完全重合,这样也就知道根本原因了。

因为业务线程和消息发送线程是同一个,在某些情况下会执行 Thread.currentThread().interrupt();,其实单纯执行这行函数并不会发生什么,只要没有去响应这个中断,也就是 Semaphore 源码中的判断了线程中断的标记:

    public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
        if (Thread.interrupted() ||
            (tryAcquireShared(arg) < 0 &&
             acquire(null, arg, true, true, false, 0L) < 0))
            throw new InterruptedException();
    }

但恰好这里业务中断后自己并没有去判断这个标记,导致 Pulsar 内部去判断了,最终抛出了这个异常。

总结

所以归根结底还是这里的代码不合理导致的,首先是自己中断了线程但也没使用,从而导致有被其他基础库使用的可能,所以会造成了一些不可预知的后果。

再一个是不建议在业务代码中使用 Thread.currentThread().interrupt(); 这类代码,第一眼根本不知道是要干啥,也不易维护。

其实本质上线程中断也是线程间通信的一种手段,有这类需求完全可以换为内置的 BlockQueue 这类函数来实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/374980.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL数据库中的函数怎样使用?

函数 是指一段可以直接被另一段程序调用的程序或代码。 也就意味着&#xff0c;这一段程序或代码在MySQL中已经给我们提供了&#xff0c;我们要做的就是在合适的业务场景调用对应的函数完成对应的业务需求即可。 那么&#xff0c;函数到底在哪儿使用呢?我们先来看两个场景&…

前端开发:JS的节流与防抖

前言 在前端实际开发中&#xff0c;有关JS原生的节流和防抖处理也是很重要的点&#xff0c;关于底层和原理的掌握使用&#xff0c;尤其是在性能优化方面甚为重要。作为前端开发的进阶内容&#xff0c;在实际开发过程中节流和防抖通常都是项目优化的必要手段&#xff0c;而且也是…

【Project】项目管理软件学习笔记

一、前言使用Project制定项目计划步骤大致如下&#xff1a;以Project2013为例&#xff0c;按照上图步骤指定项目计划。二、实施2.1 创建空白项目点击文件——新建——空白项目&#xff0c;即完成了空白项目的创建&#xff0c;在此我把该项目保存为60mm项目管理.mpp&#xff0c;…

深入浅出1588v2(PTP)里的时间同步原理

1.时间同步1.1 单步同步(OneStep)单步同步最为简单&#xff0c;master向slave发送一个sync的同步包&#xff0c;同步包里带有这条信息发送时master的当前时间t1&#xff0c;假如这条信息从master传输到slave需要的传输时间是D&#xff0c;那么slave收到信息时&#xff0c;maste…

芯驰(E3-gateway)开发板环境搭建

1-Windows下环境配置 可以在Windows上使用命令行或者IAR IDE编译SSDK项目。Windows编译依赖的工具已经包含在 prebuilts/windows 目录中&#xff0c;包括编译器、Python和命令行工具。 1.1.1 CMD SSDK集成 msys 工具&#xff0c;可以在Windows命令行中完成SDK的配置、编译和…

嵌入式系统硬件设计与实践(第一步下载eda软件)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 现实生活中&#xff0c;我们经常发现有的人定了很多的目标&#xff0c;但是到最后一个都没有实现。这听上去有点奇怪&#xff0c;但确实是实实在在…

Mysql数据库总结

一.MySQL 的基础1.架构图Mysql逻辑架构图主要分三层&#xff1a;&#xff08;1&#xff09;第一层负责连接处理&#xff0c;授权认证&#xff0c;安全等等 &#xff08;2&#xff09;第二层负责编译并优化SQL &#xff08;3&#xff09;第三层是存储引擎。Mysql 服务器的默认端…

Welcome to TryHackMe --- 我在TryHackMe学习的第90天

我在TryHackMe学习的第90天 自发的thm玩家交流企鹅群&#xff1a;751273347 TryHackMe是一个及其优秀的道德嗨客学习平台 这三个月里&#xff0c;我在TryHackMe都学了什么 TryHackMe的几个路径我觉得是按照oscp出的&#xff0c;所以理论上讲我应该差不多有oscp水准&#xff…

ElasticSearch修改索引字段类型

一、Es报MapperParsingException异常 线上功能报错&#xff0c;一看日志是往es中添加数据报错&#xff0c;错误日志如下&#xff1a; org.elasticsearch.index.mapper.MapperParsingException: failed to parse field [categoryId] of type [integer] in document with id 16…

软件技术知识库必备的功能清单及注意事项!

文档是一个迭代过程。它可能需要根据客户反馈进行改进&#xff0c;或者可能需要折射文档中已包含的某些内容。知识库可以包括客户的常见问题或对解决方案的更多参考&#xff0c;这些解决方案可能需要包括这些解决方案&#xff0c;以提高效率、生产力并降低公司成本&#xff0c;…

百趣代谢组学分享—揭示水稻“生长-防御”平衡调控机制!

湖南农业大学科研团队揭示水稻“生长—防御”平衡调控机制&#xff01; 文章标题&#xff1a;Rice cellulose synthase-like protein OsCSLD4 coordinates the trade-off between plant growth and defense 发表期刊&#xff1a;Frontiers in Plant Science 影响因子&#x…

【个人总结】超详细Neo4j安装下载

【个人总结】超详细Neo4j安装下载一、下载1.1 Jdk下载1.2 Neo4j下载&#xff1a;二、安装配置2.1 解压2.2 配置三、启动Neo4j一、下载 1.1 Jdk下载 下载neo4j之前&#xff0c;需要下载jdk&#xff0c;这里默认已经下载过jdk,&#xff0c;若未下载可参考之前文章&#xff1a;h…

17.标准库特殊设施

文章目录标准库特殊设施17.1tuple类型17.1.1定义和初始化tuple访问tuple的成员关系和相等运算符17.1.2使用tuple返回多个值17.2bitset类型(后续需要时再详细了解)17.3正则表达式17.4随机数bernoulli_distribution类17.5IO库再探标准库特殊设施 17.1tuple类型 tuple(定义在tupl…

Spark工作原理

1&#xff09;Spark工作原理&#xff1a; 首先看中间是一个Spark集群&#xff0c;可以理解为是Spark的 standalone集群&#xff0c;集群中有6个节点 左边是Spark的客户端节点&#xff0c;这个节点主要负责向Spark集群提交任务&#xff0c;假设在这里我们向Spark集群提交了一个任…

周赛334(前缀和、贪心+双指针、Dijkstra求最短路径、二分答案)

文章目录[6369. 左右元素和的差值](https://leetcode.cn/problems/left-and-right-sum-differences/)前缀和[6368. 找出字符串的可整除数组](https://leetcode.cn/problems/find-the-divisibility-array-of-a-string/)超长整数如何取余&#xff1f;[6367. 求出最多标记下标](ht…

9.3 IGMPv3

实验目的 熟悉IGMPv3的应用场景掌握IGMPv3的配置方法实验拓扑 实验拓扑如图9-22所示&#xff1a; 图9-22&#xff1a;IGMPv3 实验步骤 &#xff08;1&#xff09;配置IP地址 MCS1的配置 MCS1的IP地址配置如图9-23所示&#xff1a; 图9-23&#xff1a;配置MCS1的IP地址 MCS2…

结构体字节对齐、偏移量

复习下struct的大小、成员偏移量offsetof&#xff0c;说下我的理解&#xff1a; 64位下默认对齐数default8原则1&#xff1a;struct中每一个成员变量tmp的对齐数realmin{default,tmp} struct Student {int num;//0char name[8];double score; } stu; 这个结构体stu中&#x…

阿里前端二面经典手写面试题汇总

实现类的继承 实现类的继承-简版 类的继承在几年前是重点内容&#xff0c;有n种继承方式各有优劣&#xff0c;es6普及后越来越不重要&#xff0c;那么多种写法有点『回字有四样写法』的意思&#xff0c;如果还想深入理解的去看红宝书即可&#xff0c;我们目前只实现一种最理想…

rollup环境配置

VUE2.x源码学习笔记 1. rollup环境配置 首先在VScode中新建文件夹vue_sc&#xff0c;然后终端打开定位到打开的文件夹&#xff0c;输入“npm init -y”初始化配置项&#xff0c;运行成功之后文件夹新增package.json文件 继续在终端运行"npm install babel/preset-env ba…

浅析Tomcat架构上的Valve内存马(内存马系列篇十一)

写在前面 这篇也是在Tomcat容器上面构造的内存马(收回之前说的不搞Tomcat了)&#xff0c;这是建立在Tomcat的管道上面做文章的一个内存马的实现方式。这是内存马系列的第十一篇文章了。 前置 什么是Pipeline-Valve管道&#xff1f; 根据前面Tomcat架构的相关知识&#xff0…