Spark 3.4.x Server Client模式下的数据传输实现

news2025/1/12 7:47:14

背景

在Spark中python和jvm的通信杂谈–ArrowConverter中,我们提到Spark 3.4.x中是Client和Server之间的数据传输是采用Arrow IPC的,那具体是怎么实现的呢?

分析

直接上代码ClientE2ETestSuite

  test("createDataFrame from complex type schema") {
    val schema = new StructType()
      .add(
        "c1",
        new StructType()
          .add("c1-1", StringType)
          .add("c1-2", StringType))
    val data = Seq(Row(Row(null, "a2")), Row(Row("b1", "b2")), Row(null))
    val result = spark.createDataFrame(data.asJava, schema).collect()
    assert(result === data)
  }
  • 涉及到Client和Server交互的语句就是val result = spark.createDataFrame(data.asJava, schema).collect()
    其中createDataFrame如下:
  def createDataFrame(rows: java.util.List[Row], schema: StructType): DataFrame = {
    createDataset(RowEncoder.encoderFor(schema), rows.iterator().asScala).toDF()
  }

  ...

  def toDF(): DataFrame = new Dataset(sparkSession, plan, UnboundRowEncoder)
  

createDataFrame就是把data转换为Arrow IPC Stream,具体的方法为:

ConvertToArrow
  def apply[T](
      encoder: AgnosticEncoder[T],
      data: Iterator[T],
      timeZoneId: String,
      bufferAllocator: BufferAllocator): ByteString = {
    val arrowSchema = ArrowUtils.toArrowSchema(encoder.schema, timeZoneId)
    val root = VectorSchemaRoot.create(arrowSchema, bufferAllocator)
    val writer: ArrowWriter = ArrowWriter.create(root)
    val unloader = new VectorUnloader(root)
    val bytes = ByteString.newOutput()
    val channel = new WriteChannel(Channels.newChannel(bytes))

    try {
      // Convert and write the data to the vector root.
      val serializer = ExpressionEncoder(encoder).createSerializer()
      data.foreach(o => writer.write(serializer(o)))
      writer.finish()

      // Write the IPC Stream
      MessageSerializer.serialize(channel, root.getSchema)
      val batch = unloader.getRecordBatch
      try MessageSerializer.serialize(channel, batch)
      finally {
        batch.close()
      }
      ArrowStreamWriter.writeEndOfStream(channel, IpcOption.DEFAULT)

      // Done
      bytes.toByteString
    } finally {
      root.close()
    }
  }

这里的逻辑就是转换为Arrow IPC格式的字节流

注意这里涉及到的DataFrame,Dataset不是sql/core下的数据类型,而是connector/client下的新的数据结构
这里的collect方法如下:

  def collect(): Array[T] = withResult { result =>
    result.toArray
  }
  ...
    private[sql] def withResult[E](f: SparkResult[T] => E): E = {
    val result = collectResult()
    try f(result)
    finally {
      result.close()
    }
  }
  ...
  def collectResult(): SparkResult[T] = sparkSession.execute(plan, encoder)
  • 其中withResult中下的collectResult是向Server端执行Plan,并返回结果
  • toArray如下:
    def toArray: Array[T] = {
     val result = encoder.clsTag.newArray(length)
     val rows = iterator
     var i = 0
     while (rows.hasNext) {
       result(i) = rows.next()
       assert(i < numRecords)
       i += 1
     }
     result
    }
    
    • 其中iterator就是调用processResponses方法,该processResponses方法就是把Arrow序列字节流转换为ColumnarBatch[Array[ArrowColumnVector]],和ArrowConverters.fromBatchIterator一样。
    • 剩下的result(i)=rows.next就是迭代返回数据

整体的更加可以参考High-Level-Design Spark Connect中的交互图:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/694040.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

nginxconfig-部属

一、下载源码&#xff08;master&#xff09; GitHub - digitalocean/nginxconfig.io: ⚙️ NGINX config generator on steroids &#x1f489; 二、编译 npm install npm run build 三、将编译后的dist目录下的文件拷贝到nginx托管服务器下的目录&#xff1a;/data/wwwroot…

zabbix微信告警

环境&#xff1a;点击查看 注册一个企业微信&#xff08;官网&#xff09; 进入后台管理 拉一个用户 创建一个应用 下载脚本&#xff08;这步保留个人看法&#xff09; [rootchenshuyi requests-2.18.3]# wget https://github.com/X-Mars/Zabbix-Alert-WeChat.git --2021-0…

原淘宝npm域名即将停止解析,请切换至新域名

https://developer.aliyun.com/mirror/NPM?spma2c6h.13651102.J_4121707010.2.3e221b11kPiuyz 设置查看npm 国内镜像地址 C:\>npm config get registry http://registry.npmmirror.com/C:\>npm config set registry http://registry.npmmirror.com/C:\>npm config …

windows安装python开发工具pycharm

下载地址 PyCharm: the Python IDE for Professional Developers by JetBrains 点击下载 安装 双击exe安装等待安装完成即可 设置python环境 添加本地python环境 选择python.exe 所在路径即可&#xff0c;2.x版本和3.x版本都可&#xff0c;根据需要进行调整

清华大学发布《2023年GPT赋能通信行业应用白皮书》(132页)

加gzh 回复“gpt” 获取《2023年AIGC(GPT-4)赋能通信行业应用白皮书》完整版 摘要&#xff1a;在ChatGPT/GPT-4席卷全球的热潮中&#xff0c;人们已经深刻认识到人工智能作为经济社会发展中一项变革性技术与关键性力量&#xff0c;将为全球产业带来的巨大飞跃和突破式发展&a…

SVN 多项目地址指向方法

前言 我们在实际的开发中往往可能管理着多个项目&#xff0c;多个项目都用SVN管理着&#xff0c;如果遇到SVN地址变更&#xff0c;以前我们需要对每个项目一一进行SVN重新定位&#xff0c;项目少还好&#xff0c;一旦项目很多并且SVN地址经常变的情况下&#xff0c;进行地址映…

【致敬未来的攻城狮计划】活动总结

活动总结 一转眼攻城狮计划就已经到了最后一天了。 5月我还处在一个迷茫期&#xff0c;那时候刚刚入坑嵌入式&#xff0c;只学了几款电子积木&#xff0c;对整个体系也不是很清楚&#xff0c;也不知道应该学些什么。因为刚刚转到硬件领域&#xff0c;也缺少相关经历和能力证明…

本地离线安装SeleniumIDE(Chrome)

一、插件下载 现需要准备一台可以连接外网的电脑&#xff0c;由于受到chrome的限制&#xff0c;我们可以选择搭梯子进行直接安装相应插件&#xff0c;但考虑到部分新手不会翻墙&#xff0c;本次提供一个不需翻墙的方法。 进入https://www.crx4chrome.com/crx/181591/网页内&…

推荐几款可以大幅提高开发效率的vscode插件 | 京东云技术团队

1、Vue 2 Snippets 这是一款基于vue2的代码片段提示插件&#xff0c;对于使用vue2的开发者特别友好&#xff0c;可大幅提高我们的编码速度。 他的能力非常强大&#xff0c;具体还需要我们去看他的文档&#xff0c;解锁更多能力。 2、Path Autocomplete 这是一款路劲提示插件…

ES(elasticsearch)删除指定索引

场景 需要删除指定的索引 语法 执行命令 DELETE /索引名比如&#xff1a;DELETE /mysql-status_-2023.06 执行结果&#xff1a; 判断索引是否删除成功 执行命令 HEAD /索引名比如&#xff1a;HEAD /mysql-status_-2023.06 执行结果&#xff1a; 说明已经删除完毕 总结…

kotlin单例

两种方式都可以 更推荐方式一 方式一 class TraceDataManager private constructor() {companion object {private val TAG TraceDataManager::class.java.simpleNameVolatileprivate var instance: TraceDataManager? nullfun getInstance(): TraceDataManager {return i…

web入门——springboot、HTTP协议

这里写目录标题 springboot入门 http协议概念以及特点请求协议格式请求头的一些含义解释 响应协议格式具体的状态码以及响应头的一些含义解释 协议解析 springboot 入门 http协议 概念以及特点 请求协议 格式 包括请求行、请求头、请求体三部分&#xff08;请求体是pos请求方…

HTTP代理——提高网页抓取效率的秘诀

在日益数字化的时代&#xff0c;网页抓取对于各行各业的数据获取变得越来越重要。而在这个过程中&#xff0c;HTTP代理服务器成为了提高网页抓取效率的秘密武器。 为什么这么说呢&#xff0c;这要从&#xff0c;HTTP代理的功能来说。 1. 缓存机制 代理服务器可以缓存已经访问…

考研算法31天:归并排序 【归并排序,分治】

算法介绍 归并算法其过程分为三步&#xff1a; 1.分&#xff1a;递归到最下面 2.治&#xff1a;两个元素之间排序。 3。归&#xff1a;递归到最下层然后返回&#xff0c;从两个元素变成四个元素再排序。 如下图所示&#xff1a; 动态图如下&#xff1a; 算法题目 算法ac代…

Selenium教程__使用Select类对象处理下拉框(15)

select标签的下拉框可以使用selenium的 Select模拟下拉框选择操作。 Select需要导入才能使用&#xff0c;导入路径如下 from selenium.webdriver.support.ui import Select 下面以hao123(https://www.hao123.com) 演示下拉框操作 演示代码如下 import time from selenium i…

状态压缩DP—蒙德里安的梦想

题目链接&#xff1a;AcWing 291. 蒙德里安的梦想 问题描述 分析 这是一道经典的状态压缩DP问题&#xff0c;横着或者竖着排列1*2的方块 可以发现&#xff0c;横(竖)着的合法排列方案数就是问题的解&#xff0c;因为横(竖)着的合法排列后竖(横)着的只有一种排列方法, 这里我们…

ChatGPT是否可以写出一篇论文

利用AI反哺教育和学术&#xff0c;在训练它写论文的过程中你学到的&#xff0c;比你自己写一篇论文学到的更多。让工具回归工具&#xff0c;让我们变成更好的我们&#xff01; 第一步&#xff1a;现象确认 第二步&#xff1a;学术概念化 第三步&#xff1a;定位优质的学术资源 …

chatgpt赋能python:Python调用主函数语句

Python调用主函数语句 Python是一种高级编程语言&#xff0c;语法简单&#xff0c;易于学习和使用。在Python程序中&#xff0c;主函数是程序的入口&#xff0c;是程序的核心。本文将介绍Python调用主函数语句的相关知识。 什么是主函数 在Python程序中&#xff0c;主函数也…

Git 配置ssh验证 签名

首先你得装了git&#xff0c;linux 自带的&#xff0c;Windows自己下载配置一下。 注意 Windows下要用gitbash输入命令行&#xff0c;如果是Linux 就在默认命令行输入命令即可 大致思路如下&#xff08;不一定对&#xff0c;因为有段时间没弄了&#xff09;&#xff1a; 1. 生…

【软考网络管理员】2023年软考网管初级常见知识考点(27)- 多媒体基础知识

涉及知识点 A/D转换&#xff0c;常见音频格式&#xff0c;容量计算&#xff0c;图形图像格式&#xff0c;动画和视频&#xff0c;常见视频格式&#xff0c;媒体的种类&#xff0c;软考网络管理员常考知识点&#xff0c;软考网络管理员网络安全&#xff0c;网络管理员考点汇总。…