ElasticSearch - 批量更新bulk死锁问题排查 | 京东云技术团队

news2025/1/13 7:43:09

一、问题系统介绍

  1. 监听商品变更MQ消息,查询商品最新的信息,调用BulkProcessor批量更新ES集群中的商品字段信息;

  2. 由于商品数据非常多,所以将商品数据存储到ES集群上,整个ES集群共划分了256个分片,并根据商品的三级类目ID进行分片路由。

比如一个SKU的商品名称发生变化,我们就会收到这个SKU的变更MQ消息,然后再去查询商品接口,将商品的最新名称查询回来,再根据这个SKU的三级分类ID进行路由,找到对应的ES集群分片,然后更新商品名称字段信息。

由于商品变更MQ消息量巨大,为了提升更新ES的性能,防止出现MQ消息积压问题,所以本系统使用了BulkProcessor进行批量异步更新。

ES客户端版本如下:

        <dependency>
            <artifactId>elasticsearch-rest-client</artifactId>
            <groupId>org.elasticsearch.client</groupId>
            <version>6.5.3</version>
        </dependency>

BulkProcessor配置伪代码如下:

        //在这里调用build()方法构造bulkProcessor,在底层实际上是用了bulk的异步操作
        this.fullDataBulkProcessor = BulkProcessor.builder((request, bulkListener) ->
                fullDataEsClient.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
                // 1000条数据请求执行一次bulk
                .setBulkActions(1000)
                // 5mb的数据刷新一次bulk
                .setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
                // 并发请求数量, 0不并发, 1并发允许执行
                .setConcurrentRequests(1)
                // 固定1s必须刷新一次
                .setFlushInterval(TimeValue.timeValueSeconds(1L))
                // 重试5次,间隔1s
                .setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
                .build();

二、问题怎么发现的

  1. 618大促开始后,由于商品变更MQ消息非常频繁,MQ消息每天的消息量更是达到了日常的数倍,而且好多商品还变更了三级类目ID;

  2. 系统在更新这些三级类目ID发生变化的SKU商品信息时,根据修改后的三级类目ID路由后的分片更新商品信息时发生了错误,并且重试了5次,依然没有成功;

  3. 因为在新路由的分片上没有这个商品的索引信息,这些更新请求永远也不会执行成功,系统的日志文件中也记录了大量的异常重试日志。

  4. 商品变更MQ消息也开始出现了积压报警,MQ消息的消费速度明显赶不上生产速度。

  5. 观察MQ消息消费者的UMP监控数据,发现消费性能很平稳,没有明显波动,但是调用次数会在系统消费MQ一段时间后出现断崖式下降,由原来的每分钟几万调用量逐渐下降到个位数。

  6. 在重启应用后,系统又开始消费,UMP监控调用次数恢复到正常水平,但是系统运行一段时间后,还是会出现消费暂停问题,仿佛所有消费线程都被暂停了一样。

三、排查问题的详细过程

首先找一台暂停消费MQ消息的容器,查看应用进程ID,使用jstack命令dump应用进程的整个线程堆栈信息,将导出的线程堆栈信息打包上传到 https://fastthread.io/ 进行线程状态分析。分析报告如下:

通过分析报告发现有124个处于BLOCKED状态的线程,然后可以点击查看各线程的详细堆栈信息,堆栈信息如下:

连续查看多个线程的详细堆栈信息,MQ消费线程都是在waiting to lock <0x00000005eb781b10> (a org.elasticsearch.action.bulk.BulkProcessor),然后根据0x00000005eb781b10去搜索发现,这个对象锁正在被另外一个线程占用,占用线程堆栈信息如下:

这个线程状态此时正处于WAITING状态,通过线程名称发现,该线程应该是ES客户端内部线程。正是该线程抢占了业务线程的锁,然后又在等待其他条件触发该线程执行,所以导致了所有的MQ消费业务线程一直无法获取BulkProcessor内部的锁,导致出现了消费暂停问题。

但是这个线程elasticsearch[scheduler][T#1]为啥不能执行? 它是什么时候启动的? 又有什么作用?

就需要我们对BulkProcessor进行深入分析,由于BulkProcessor是通过builder模块进行创建的,所以深入builder源码,了解一下BulkProcessor的创建过程。

public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
        Objects.requireNonNull(consumer, "consumer");
        Objects.requireNonNull(listener, "listener");
        final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
        return new Builder(consumer, listener,
                (delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
                () -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
    }

内部创建了一个时间调度执行线程池,线程命名规则和上述持有锁的线程名称相似,具体代码如下:

static ScheduledThreadPoolExecutor initScheduler(Settings settings) {
        ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
                EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
        scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        scheduler.setRemoveOnCancelPolicy(true);
        return scheduler;
    }

最后在build方法内部执行了BulkProcessor的内部有参构造方法,在构造方法内部启动了一个周期性执行的flushing任务,代码如下

 BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
                  int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
                  Scheduler scheduler, Runnable onClose) {
        this.bulkActions = bulkActions;
        this.bulkSize = bulkSize.getBytes();
        this.bulkRequest = new BulkRequest();
        this.scheduler = scheduler;
        this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
        // Start period flushing task after everything is setup
        this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
        this.onClose = onClose;
    }
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
        if (flushInterval == null) {
            return new Scheduler.Cancellable() {
                @Override
                public void cancel() {}

                @Override
                public boolean isCancelled() {
                    return true;
                }
            };
        }
        final Runnable flushRunnable = scheduler.preserveContext(new Flush());
        return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
    }
class Flush implements Runnable {

        @Override
        public void run() {
            synchronized (BulkProcessor.this) {
                if (closed) {
                    return;
                }
                if (bulkRequest.numberOfActions() == 0) {
                    return;
                }
                execute();
            }
        }
    }

通过源代码发现,该flush任务就是在创建BulkProcessor对象时设置的固定时间flush逻辑,当setFlushInterval方法参数生效,就会启动一个后台定时flush任务。flush间隔,由setFlushInterval方法参数定义。该flush任务在运行期间,也会抢占BulkProcessor对象锁,抢到锁后,才会执行execute方法。具体的方法调用关系源代码如下:

/**
     * Adds the data from the bytes to be processed by the bulk processor
     */
    public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
                                          @Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
        bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
        executeIfNeeded();
        return this;
    }

    private void executeIfNeeded() {
        ensureOpen();
        if (!isOverTheLimit()) {
            return;
        }
        execute();
    }

    // (currently) needs to be executed under a lock
    private void execute() {
        final BulkRequest bulkRequest = this.bulkRequest;
        final long executionId = executionIdGen.incrementAndGet();

        this.bulkRequest = new BulkRequest();
        this.bulkRequestHandler.execute(bulkRequest, executionId);
    }

而上述代码中的add方法,则是由MQ消费业务线程去调用,在该方法上同样有一个synchronized关键字,所以消费MQ业务线程会和flush任务执行线程直接会存在锁竞争关系。具体MQ消费业务线程调用伪代码如下:

 @Override
 public void upsertCommonSku(CommonSkuEntity commonSkuEntity) {
            String source = JsonUtil.toString(commonSkuEntity);
            UpdateRequest updateRequest = new UpdateRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            updateRequest.doc(source, XContentType.JSON);
            IndexRequest indexRequest = new IndexRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
            indexRequest.source(source, XContentType.JSON);
            updateRequest.upsert(indexRequest);
            updateRequest.routing(commonSkuEntity.getCat3().toString());
            fullbulkProcessor.add(updateRequest);
}  

通过以上对线程堆栈分析,发现所有的业务线程都在等待elasticsearch[scheduler][T#1]线程释放BulkProcessor对象锁,但是该线程确一直没有释放该对象锁,从而出现了业务线程的死锁问题。

结合应用日志文件中出现的大量异常重试日志,可能与BulkProcessor的异常重试策略有关,然后进一步了解BulkProcessor的异常重试代码逻辑。由于业务线程中提交BulkRequest请求都统一提交到了BulkRequestHandler对象中的execute方法内部进行处理,代码如下:

public final class BulkRequestHandler {
    private final Logger logger;
    private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
    private final BulkProcessor.Listener listener;
    private final Semaphore semaphore;
    private final Retry retry;
    private final int concurrentRequests;

    BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
                       BulkProcessor.Listener listener, Scheduler scheduler, int concurrentRequests) {
        assert concurrentRequests >= 0;
        this.logger = Loggers.getLogger(getClass());
        this.consumer = consumer;
        this.listener = listener;
        this.concurrentRequests = concurrentRequests;
        this.retry = new Retry(backoffPolicy, scheduler);
        this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
    }

    public void execute(BulkRequest bulkRequest, long executionId) {
        Runnable toRelease = () -> {};
        boolean bulkRequestSetupSuccessful = false;
        try {
            listener.beforeBulk(executionId, bulkRequest);
            semaphore.acquire();
            toRelease = semaphore::release;
            CountDownLatch latch = new CountDownLatch(1);
            retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
                @Override
                public void onResponse(BulkResponse response) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, response);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    try {
                        listener.afterBulk(executionId, bulkRequest, e);
                    } finally {
                        semaphore.release();
                        latch.countDown();
                    }
                }
            });
            bulkRequestSetupSuccessful = true;
            if (concurrentRequests == 0) {
                latch.await();
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } catch (Exception e) {
            logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
            listener.afterBulk(executionId, bulkRequest, e);
        } finally {
            if (bulkRequestSetupSuccessful == false) {  // if we fail on client.bulk() release the semaphore
                toRelease.run();
            }
        }
    }

    boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
        if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
            semaphore.release(this.concurrentRequests);
            return true;
        }
        return false;
    }
}

BulkRequestHandler通过构造方法初始化了一个Retry任务对象,该对象中也传入了一个Scheduler,且该对象和flush任务中传入的是同一个线程池,该线程池内部只维护了一个固定线程。而execute方法首先会先根据Semaphore来控制并发执行数量,该并发数量在构建BulkProcessor时通过参数指定,通过上述配置发现该值配置为1。所以每次只允许一个线程执行该方法。即MQ消费业务线程和flush任务线程,同一时间只能有一个线程可以执行。然后下面在了解一下重试任务是如何执行的,具体看如下代码:

 public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest,
                            ActionListener<BulkResponse> listener) {
        RetryHandler r = new RetryHandler(backoffPolicy, consumer, listener, scheduler);
        r.execute(bulkRequest);
    }

RetryHandler内部会执行提交bulkRequest请求,同时也会监听bulkRequest执行异常状态,然后执行任务重试逻辑,重试代码如下:

private void retry(BulkRequest bulkRequestForRetry) {
            assert backoff.hasNext();
            TimeValue next = backoff.next();
            logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
            Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
            scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);
        }

RetryHandler将执行失败的bulk请求重新交给了内部scheduler线程池去执行,通过以上代码了解,该线程池内部只维护了一个固定线程,同时该线程池可能还会被另一个flush任务去占用执行。所以如果重试逻辑正在执行的时候,此时线程池内的唯一线程正在执行flush任务,则会阻塞重试逻辑执行,重试逻辑不能执行完成,则不会释放Semaphore,但是由于并发数量配置的是1,所以flush任务线程需要等待其他线程释放一个Semaphore许可后才能继续执行。所以此处形成了循环等待,导致Semaphore和BulkProcessor对象锁都无法释放,从而使得所有的MQ消费业务线程都阻塞在获取BulkProcessor锁之前。

同时,在GitHub的ES客户端源码客户端上也能搜索到类似问题,例如: https://github.com/elastic/elasticsearch/issues/47599 ,所以更加印证了之前的猜想,就是因为bulk的不断重试从而引发了BulkProcessor内部的死锁问题。

四、如何解决问题

既然前边已经了解到了问题产生的原因,所以就有了如下几种解决方案:

1.升级ES客户端版本到7.6正式版,后续版本通过将异常重试任务线程池和flush任务线程池进行了物理隔离,从而避免了线程池的竞争,但是需要考虑版本兼容性。

2.由于该死锁问题是由大量异常重试逻辑引起的,可以在不影响业务逻辑的情况取消重试逻辑,该方案可以不需要升级客户端版本,但是需要评估业务影响,执行失败的请求可以通过其他其他方式进行业务重试。

如有疏漏不妥之处,欢迎指正!

作者:京东零售 曹志飞

来源:京东云开发者社区

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/720475.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Elasticsearch脚本查询

Elasticsearch脚本查询 什么/为什么 Scripting是Elasticsearch支持的一种专门用于复杂场景下支持自定义编程的强大的脚本功能&#xff0c;ES支持多种脚本语言&#xff0c;如painless&#xff0c;其语法类似于Java,也有注释、关键字、类型、变量、函数等&#xff0c;其就要相对…

AI绘画:Stable Diffusion 终极炼丹宝典:从入门到精通

本文收集于教程合集&#xff1a;AIGC从入门到精通教程汇总 我是小梦&#xff0c;以浅显易懂的方式&#xff0c;与大家分享那些实实在在可行之宝藏。 历经耗时数十个小时&#xff0c;总算将这份Stable Diffusion的使用教程整理妥当。 从最初的安装与配置&#xff0c;细至界面…

fdisk和df -h的区别以及如何看懂和提取信息

前几天要查看linux系统磁盘大小&#xff0c;但是发现fdisk和df -h出来的大小和信息不一样&#xff0c;了解了一下linux的磁盘分区和内存大小&#xff0c;查阅了相关资料&#xff0c;总结以下信息&#xff1a; 一、相关理念 在计算机中&#xff0c;存放信息的主要存储设备就是…

天猫数据分析工具(天猫实时数据)

后疫情时代&#xff0c;聚会、聚餐与送礼热度上涨&#xff0c;酒类产品既作为送礼首选又作为佐餐饮品的热门选手也受此影响迎来消费小高峰。在此背景下&#xff0c;白酒市场也开始复苏并不断加快速度。 根据鲸参谋电商数据分析平台的相关数据显示&#xff0c;2023年1月份至4月…

小程序 事件委托给父元素scrollview 获取不到子元素view的绑定的dataset值

点击事件委托到父元素&#xff1a;scrollview 减少多次循环绑定 &#xff1a; 通过点击事件的dataset判断了点击哪个子元素。 常能见到e.target或者e.currentTarget。 简单来说&#xff0c;currentTarget就是当前对象&#xff0c;target就是整个对象&#xff08;包含子元素&…

如何实现移动端侧边目录栏收缩,并监听点击目录栏以外则自动收缩

父组件&#xff0c;index界面&#xff0c;注意此时expend按钮在父组件中 <template><el-container><el-aside class"Aside"><MAside expendClick"expendClick" :message"message" /></el-aside><div class&qu…

2、CCesium二次开发环境搭建

CCesium是使用c和opengl实现的桌面三维地球&#xff0c;所以进行二次开发需要搭建c的开发环境。 在windows系统上c开发可以使用vs或cmake和mingw clion开发。本人使用mingwclion&#xff0c;如果使用其他ide那我也帮不了你。cmake是构建项目的&#xff0c;clion使用2020.1版本…

Lesson3-1:OpenCV图像处理---几何变换

几何变换 学习目标 掌握图像的缩放&#xff0c;平移&#xff0c;旋转等了解数字图像的仿射变换和透射变换 1 图像缩放 缩放是对图像的大小进行调整&#xff0c;即使图像放大或缩小。 API cv2.resize(src,dsize,fx0,fy0,interpolationcv2.INTER_LINEAR)参数&#xff1a; s…

数据结构 | 二叉排序树

一、数据结构定义 /* 二叉排序树 */ typedef int TreeType; typedef struct BSTNode {TreeType data;struct BSTNode* lchild, * rchild; }*BSTree, BSTNode;二、方法概览 BSTNode* CreateTreeNode(TreeType data); // 创建二叉树结点 BSTNode* InsertTree(TreeType data, BS…

Vulnhub靶机PWNLAB:INIT writeup

靶机介绍 靶机下载&#xff1a;https://www.vulnhub.com/entry/matrix-2,279/ ​ 个人微信公众号&#xff1a;网络安全学习爱好者 信息搜集 arp扫描存活主机 ​​​ 根据MAC地址比较靶机IP为​192.168.30.131 ​​ nmap扫描全端口及端口服、版本 ​​​ 目录扫描123…

通信相关知识(三) 接入网

接入网的定界 接入网的功能 用户口功能、业务口功能、核心功能、传送功能、接入网系统管理功能。 ADSL 非对称数字用户线路&#xff08;ADSL&#xff0c;Asymmetric Digital Subscriber Line&#xff09;是数字用户线路&#xff08;xDSL&#xff0c;Digital Subscriber Lin…

【Java从入门到大牛】Java基础语法

&#x1f525; 本文由 程序喵正在路上 原创&#xff0c;CSDN首发&#xff01; &#x1f496; 系列专栏&#xff1a;Java从入门到大牛 &#x1f320; 首发时间&#xff1a;2023年7月5日 &#x1f98b; 欢迎关注&#x1f5b1;点赞&#x1f44d;收藏&#x1f31f;留言&#x1f43e…

黑客(自学笔记)

黑客&#xff0c;对很多人来说充满诱惑力。很多人可以发现这门领域如同任何一门领域&#xff0c;越深入越敬畏&#xff0c;知识如海洋&#xff0c;黑客也存在一些等级&#xff0c;参考知道创宇 CEO ic&#xff08;世界顶级黑客团队 0x557 成员&#xff09;的分享如下&#xff1…

第一章:项目架构演变

1、在设计系统时&#xff0c;应该多思考 墨菲定律 1. 任何事都没有表面上看起来那么简单。 2. 所有的事做起来都会比你预计的时间长。 3. 可能出错的事总会出错。 4. 如果你担心某种情况发生&#xff0c;那么它就更有可能发生。 2、在划分时&#xff0c;也要思考康威定律。…

centos7安装zookeeper的环境变量配置导致用户登录不了系统

废话不多说&#xff0c;我修改的/etc/profile,如果这个文件改错会造成所有用户都登录不了系统。 第一步&#xff1a;解决进不了系统 1.在登陆界面按&#xff1a;alt ctrlf2进入命令模式&#xff0c;输入密码登录后再输入&#xff1a; /usr/bin/sudo /usr/bin/vi /etc/profile …

Apache Doris 在拈花云科的统一数据中台实践,One Size Fits All

作者&#xff5c;NearFar X Lab 团队 洪守伟、陈超、周志银、左益、武超 整理&#xff5c;SelectDB 内容团队 导读&#xff1a; 无锡拈花云科技服务有限公司&#xff08;以下简称拈花云科&#xff09;是由中国创意文旅集成商拈花湾文旅和北京滴普科技有限公司共同孵化组建的。…

微信小程序Vant组件配置及使用

Vant Weapp 官网文档&#xff1a;介绍 - Vant Weapp (gitee.io) Vant Weapp GitHub地址&#xff1a;youzan/vant-weapp: 轻量、可靠的小程序 UI 组件库 (github.com) 本教程使用下载代码方式引入vant组件 1. 下载vant组件源码 通过git下载vant源码 git clone https://github…

【适配器模式】—— 每天一点小知识

&#x1f4a7; 适配器模式 \color{#FF1493}{适配器模式} 适配器模式&#x1f4a7; &#x1f337; 仰望天空&#xff0c;妳我亦是行人.✨ &#x1f984; 个人主页——微风撞见云的博客&#x1f390; &#x1f433; 《数据结构与算法》专栏的文章图文并茂&#x1f995;…

Kafka学习笔记(基础篇)

目录 Kafka简介 消息队列 Kafka的应用场景 消息队列的两种模型 Kafka集群搭建 Kafka的生产者/消费者/工具 Kafka的基准测试工具 Kafka Java API开发 生产者程序开发 消费者程序开发 生产者使用异步方式生产消息 Kafka中的重要概念 消费者组 幂等性 事务编程 Ka…

英文单词的3σ值

最近做log的nlp&#xff0c;发现日志当中有一些很长的但是无意义的词汇&#xff0c;很影响训练模型&#xff0c;这边想通过单次长度去排除那些无意义词汇&#xff0c;去查了gpt英文单次的3σ值&#xff0c;记录下