Spark 3.1.1 shuffle fetch 导致shuffle错位的问题

news2025/1/25 4:32:25

背景

最近从数据仓库小组那边反馈了一个问题,一个SQL任务出来的结果不正确,重新运行一次之后就没问题了,具体的SQL如下:

select 
    col1,
    count(1) as cnt
from table1
where dt = '20230202' 
group by col1
having count(1) > 1

这个问题是偶发的,在其运行的日志中会发现如下三类日志:

FetchFailed 
TaskKilled (another attempt succeeded)
ERROR (org.apache.spark.network.shuffle.RetryingBlockFetcher:231) - Failed to fetch block shuffle_4865_2481
283_286, and will not retry (3 retries)

最终在各种同事的努力下,找到了一个Jira:SPARK-34534

分析

直接切入主题,找到对应的类OneForOneBlockFetcher,该类会被NettyBlockTransferService(没开启ESS)和ExternalBlockStoreClient(开启ESS)调用,其中start方法:

public void start() {
    client.sendRpc(message.toByteBuffer(), new RpcResponseCallback() {
      @Override
      public void onSuccess(ByteBuffer response) {
        try {
          streamHandle = (StreamHandle) BlockTransferMessage.Decoder.fromByteBuffer(response);
          logger.trace("Successfully opened blocks {}, preparing to fetch chunks.", streamHandle);

          // Immediately request all chunks -- we expect that the total size of the request is
          // reasonable due to higher level chunking in [[ShuffleBlockFetcherIterator]].
          for (int i = 0; i < streamHandle.numChunks; i++) {
            if (downloadFileManager != null) {
              client.stream(OneForOneStreamManager.genStreamChunkId(streamHandle.streamId, i),
                new DownloadCallback(i));
            } else {
              client.fetchChunk(streamHandle.streamId, i, chunkCallback);
            }
          }
        } catch (Exception e) {
          logger.error("Failed while starting block fetches after success", e);
          failRemainingBlocks(blockIds, e);
        }
      }

      @Override
      public void onFailure(Throwable e) {
        logger.error("Failed while starting block fetches", e);
        failRemainingBlocks(blockIds, e);
      }
    });
  }

其中的message的初始化在构造方法中:

 if (!transportConf.useOldFetchProtocol() && isShuffleBlocks(blockIds)) {
   this.message = createFetchShuffleBlocksMsg(appId, execId, blockIds);
 } else {
   this.message = new OpenBlocks(appId, execId, blockIds);
 }

其中transportConf.useOldFetchProtocol 也就是 spark.shuffle.useOldFetchProtocol配置(默认是false),如果是shuffle block的话,就会运行到:createFetchShuffleBlocksMsg方法,对于为什么存在这么一个判断,具体参考SPARK-27665
关键的就是 createFetchShuffleBlocksMsg 方法:
这个方法的作用就是: 构建一个FetchShuffleBlocks(appId, execId, shuffleId, mapIds, reduceIdArr, batchFetchEnabled) 对象,其中里面的值
如图:
在这里插入图片描述
其中这里有一点需要注意:

 long[] mapIds = Longs.toArray(mapIdToReduceIds.keySet());
 reduceIdArr[i] = Ints.toArray(mapIdToReduceIds.get(mapIds[i]));

这里面对MapIdReduceId 进行了重组(在获得streamHandle的时候内部会根据reduceIdArr构建blocks索引,下文中会说到)会导致和成员变量blockIds的顺序不一致,为什么两者不一致会导致问题呢?
原因在于任务的fetch失败会导致重新进行fetch,如下:

  client.fetchChunk(streamHandle.streamId, i, chunkCallback);

chunkCallback的代码如下:

private class ChunkCallback implements ChunkReceivedCallback {
    @Override
    public void onSuccess(int chunkIndex, ManagedBuffer buffer) {
      // On receipt of a chunk, pass it upwards as a block.
      listener.onBlockFetchSuccess(blockIds[chunkIndex], buffer);
    }

    @Override
    public void onFailure(int chunkIndex, Throwable e) {
      // On receipt of a failure, fail every block from chunkIndex onwards.
      String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length);
      failRemainingBlocks(remainingBlockIds, e);
    }
  }

String[] remainingBlockIds = Arrays.copyOfRange(blockIds, chunkIndex, blockIds.length),此处的chunckIndex就是shuffle blocks的索引下标,也就是下文中numBlockIds组成的数组下标,
但是这个和createFetchShuffleBlocksMsg输出的顺序是不一致的,所以如果发生问题重新fetch的时候,数据有错位,具体可以看:
ShuffleBlockFetcherIterator中的

    if (req.size > maxReqSizeShuffleToMem) {
      shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
        blockFetchingListener, this)
    } else {
      shuffleClient.fetchBlocks(address.host, address.port, address.executorId, blockIds.toArray,
        blockFetchingListener, null)
    }

其中blockFetchingListener回调方法onBlockFetchSuccess会把fetch的block数据和shuffleBlockId一一对应上

ESS端构建blocks的信息

在start方法中,client.sendRpc向对应的ESS发送对应的请求shuffle数据信息,ESS会重新构建blocks的信息,组成StreamHandle(streamId, numBlockIds)返回给请求端:
具体为ExternalBlockHandler的handleMessage方法:

if (msgObj instanceof FetchShuffleBlocks) {
          FetchShuffleBlocks msg = (FetchShuffleBlocks) msgObj;
          checkAuth(client, msg.appId);
          numBlockIds = 0;
          if (msg.batchFetchEnabled) {
            numBlockIds = msg.mapIds.length;
          } else {
            for (int[] ids: msg.reduceIds) {
              numBlockIds += ids.length;
            }
          }
          streamId = streamManager.registerStream(client.getClientId(),
            new ShuffleManagedBufferIterator(msg), client.getChannel());
。。。
callback.onSuccess(new StreamHandle(streamId, numBlockIds).toByteBuffer());

这里的numBlockIds就是OneForOneBlockFetcher中的streamHandle.numChunks
如图:在这里插入图片描述

没有开启ESS端的构建blocks的信息

这里和上面的一样,只不过对应的方法为NettyBlockRpcServerreceive:

      case fetchShuffleBlocks: FetchShuffleBlocks =>
        val blocks = fetchShuffleBlocks.mapIds.zipWithIndex.flatMap { case (mapId, index) =>
          if (!fetchShuffleBlocks.batchFetchEnabled) {
            fetchShuffleBlocks.reduceIds(index).map { reduceId =>
              blockManager.getLocalBlockData(
                ShuffleBlockId(fetchShuffleBlocks.shuffleId, mapId, reduceId))
            }
          } else {
            val startAndEndId = fetchShuffleBlocks.reduceIds(index)
            if (startAndEndId.length != 2) {
              throw new IllegalStateException(s"Invalid shuffle fetch request when batch mode " +
                s"is enabled: $fetchShuffleBlocks")
            }
            Array(blockManager.getLocalBlockData(
              ShuffleBlockBatchId(
                fetchShuffleBlocks.shuffleId, mapId, startAndEndId(0), startAndEndId(1))))
          }
        }

        val numBlockIds = if (fetchShuffleBlocks.batchFetchEnabled) {
          fetchShuffleBlocks.mapIds.length
        } else {
          fetchShuffleBlocks.reduceIds.map(_.length).sum
        }

        val streamId = streamManager.registerStream(appId, blocks.iterator.asJava,
          client.getChannel)
        logTrace(s"Registered streamId $streamId with $numBlockIds buffers")
        responseContext.onSuccess(
          new StreamHandle(streamId, numBlockIds).toByteBuffer)

这里的numBlockIds就是OneForOneBlockFetcher中的streamHandle.numChunks
如图:
在这里插入图片描述
所以在以上两种情况下,只要有重新fetch数据的操作,就会存在数据的错位,导致数据的不准确

解决

直接git cherry-pick对应的commit就行:

git cherry-pick 4e438196114eff2e1fc4dd726fdc1bda1af267da

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/346450.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot 整合JWT实现基于自定义注解的-登录请求验证拦截(保姆级教学,附:源码)

学习目标&#xff1a; Spring Boot 整合JWT实现基于自定义注解的 登录请求接口拦截 例&#xff1a; 一篇掌握 JWT 入门知识1.1 在学习SpringBoot 整合JWT之前&#xff0c;我们先来说说JWT进行用户身份验证的流程 1:客户端使用用户名和密码请求登录 2:服务端收到请求&#xf…

spring中@Autowire和@Resource的区别在哪里?

介绍今天使用Idea写代码的时候&#xff0c;看到之前的项目中显示有warning的提示&#xff0c;去看了下&#xff0c;是如下代码?Autowire private JdbcTemplate jdbcTemplate;提示的警告信息Field injection is not recommended Inspection info: Spring Team recommends: &quo…

AntDB-M设计之内存结构

亚信科技专注通信行业多年&#xff0c;AntDB数据库从诞生开始&#xff0c;就面对通信级的大数据量应用场景挑战&#xff0c;在性能、稳定性、规模化等方面获得了超过10年的通信核心业务系统验证&#xff0c;性能峰值达到每秒百万的通信核心交易量。AntDB-M&#xff08;AntDB内存…

CMMI有哪几个级别,每个级别有哪些其特征

CMMI 一共分五个级别&#xff0c;一级低&#xff0c;五级高&#xff0c;一般初次认证CMMI从三级或者二级开始。 1、CMMI一级&#xff0c;完成级。 在完成级水平上&#xff0c;企业对项目的目标与要做的努力很清晰。项目的目标得以实现。一般来说&#xff0c;公司的初始阶段就是…

工作记录------@Accessors(chain = true)引起的BUG,Excel导入时获取不到值

工作记录------Accessors(chain true)引起的BUG&#xff0c;Excel导入时获取不到值 如题所示 背景&#xff1a;在进行文件excel文件导入时&#xff0c;发现实体类获取到的属性值都为null。 框架&#xff1a;com.alibaba.excel 2.2.0的版本。 结论&#xff1a;首先说下结论 如…

2021年职业院校技能大赛“网络安全”项目江西省A模块

目录 一、竞赛时间 三、竞赛任务书内容 &#xff08;一&#xff09;拓扑图 &#xff08;二&#xff09;A模块基础设施设置/安全加固&#xff08;200分&#xff09; A-1&#xff1a;登录安全加固 1.密码策略&#xff08;Web&#xff09; a.最小密码长度不少于8个字符&…

C++入门基础

本章内容&#xff1a; 一、C前言 1. 什么是C C语言是结构化和模块化的语言&#xff0c;适合处理较小规模的程序。对于复杂的问题&#xff0c;规模较大的程序&#xff0c;需要高度的抽象和建模时&#xff0c;C语言则不合适。为了解决软件危机&#xff0c; 20世纪80年代&#x…

笔记本连接wifi,浏览器访问页面,显示访问被拒绝

打开chrome、edge浏览器访问第1个第2个页面正常&#xff0c;后面再打开页面显示异常。 但手机连接正常&#xff0c;笔记本连接异常&#xff0c;起初完全没有怀疑是wifi问题 以为用了vpn软件问题&#xff0c;认为中了病毒。杀毒&#xff0c;并没有中毒。 1、关闭vpn代理&#…

基于Java+Spring+vue+element商城销售平台设计和实现

博主介绍&#xff1a;✌全网粉丝20W,csdn特邀作者、博客专家、CSDN新星计划导师、java领域优质创作者,博客之星、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专…

【Call for papers】ICCV-2023(CCF-A/人工智能/2023年3月8日截稿)

ICCV is the premier international computer vision event comprising the main conference and several co-located workshops and tutorials. 文章目录1.会议信息2.时间节点1.会议信息 会议介绍&#xff1a; ICCV是主要的国际计算机视觉活动&#xff0c;包括主要会议和几个…

【LeetCode】剑指 Offer 03. 数组中重复的数字 -- Java Version

题目链接&#xff1a; https://leetcode.cn/problems/shu-zu-zhong-zhong-fu-de-shu-zi-lcof/ 1. 题目介绍&#xff08;03. 数组中重复的数字&#xff09; 找出数组中重复的数字。 在一个长度为 n 的数组 nums 里的所有数字都在 0&#xff5e;n-1 的范围内。数组中某些数字是重…

linux篇【15】:应用层-网络https协议

目录 一.HTTPS介绍 1.HTTPS 定义 2.HTTP与HTTPS &#xff08;1&#xff09;端口不同&#xff0c;是两套服务 &#xff08;2&#xff09;HTTP效率更高&#xff0c;HTTPS更安全 3.加密&#xff0c;解密&#xff0c;密钥 概念 4.为什么要加密&#xff1f; 5.常见的加密方式…

TransE及其实现

TransE 该模型将关系和实体表示为同一空间中的向量&#xff0c;给定事实&#xff08;h,r,s&#xff09;&#xff08;h,r,s&#xff09;&#xff08;h,r,s&#xff09;关系 rrr 的向量 rrr被解释为头实体向量 hhh与尾实体向量 ttt之间的平移,因此嵌入实体hhh和ttt可以通过平移向…

分享82个HTML电子产品模板,总有一款适合您

分享82个HTML电子产品模板&#xff0c;总有一款适合您 82个HTML电子产品模板下载链接&#xff1a;https://pan.baidu.com/s/106NtZkrVefSFGGS54xk-kA?pwdbvn8 提取码&#xff1a;bvn8 Python采集代码下载链接&#xff1a;采集代码.zip - 蓝奏云 import os import shutil …

TongWeb8防止System.exit代码导致的进程停止

现象&#xff1a;当应用中存在System.exit 、Runtime.exit代码执行时&#xff0c;会导致TongWeb进程停止&#xff0c;从而产生如下日志&#xff1a;2023-02-14 09:47:36 [WARN] - The web application [webtest01] is still processing a request that has yet to finish. This…

LeetCode 19.删除链表的倒数第 N 个结点

原题链接 难度&#xff1a;middle\color{orange}{middle}middle 题目描述 给你一个链表&#xff0c;删除链表的倒数第 nnn 个结点&#xff0c;并且返回链表的头结点。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], n 2 输出&#xff1a;[1,2,3,5]示例 2&#x…

快速部署个人导航页:美好的一天从井然有序开始

很多人都习惯使用浏览器自带的收藏夹来管理自己的书签&#xff0c;然而收藏夹存在着一些问题。 经过长时间的累积&#xff0c;一些高频使用的重要网站和偶尔信手收藏的链接混在了一起&#xff0c;收藏夹因为内容过多而显得杂乱无章&#xff1b;收藏夹没有什么美观可言&#xf…

【java】Spring Boot --40 个 Spring Boot 常用注解(建议收藏)

本文目录一、Spring Web MVC 注解Spring Web MVC 注解RequestMappingRequestBodyGetMappingPostMappingPutMappingDeleteMappingPatchMappingControllerAdviceResponseBodyExceptionHandlerResponseStatusPathVariableRequestParamControllerRestControllerModelAttributeCross…

sqlServer 2019 开发版(Developer)下载及安装

下载软件 官网只有2022的&#xff0c;2019使用百度网盘进行下载 安装下崽器 选择自定义安装 选择语言、以及安装位置 点击“安装” 安装 SQL Server 可能的故障 以上步骤安装后会弹出以上界面&#xff0c;如果未弹出&#xff0c;手动去安装目录下点击 SETUP.EXE 文件…

数据分析与SAS学习笔记1

数据分析的六层模型&#xff1a; 1&#xff09;数据源层&#xff1a;数据分析的数据源&#xff1b;DBA&#xff1b;初加工&#xff1b;对数据源按某些规则进行抽取&#xff0c;ETL&#xff1b; 2&#xff09;数据仓库层&#xff1a;OLAP的功能&#xff0c;联机事务处理。OLTP、…