Spark中python和jvm的通信杂谈--ArrowConverter

news2025/1/10 16:16:27

背景

要提起ArrowConverters,就得说起Arrow这个项目,该项目的初衷是加速进程间的数据交换,从目前的社区发展以及它的周边来看,其实是一个很不错的项目。
那为什么Spark要引入Arrow呢?其实还得从Pyspark中python和jvm的交互方式上说起,目前pyspark采用的py4j与spark jvm进行交互,而数据的交换采用的是jvmpython两个进程间的数据交换(感兴趣的同学可以参考PySpark架构),这个时候引进Arrow恰到好处。

闲说杂谈

spark具体采用的是Arrow IPC,
IPC中用到了flatbuffers这种高效获取序列化数据的组件,再加上IPC采用的是Java NIO的ByteBuffer零拷贝的方式以及RecordBatch列批的方式,大大提升了进程间的数据交换效率。关于NIO的零拷贝参考NIO效率高的原理之零拷贝与直接内存映射

具体细节

直接到ArrowConverters的类中:
主要看两个方法:toBatchIteratorfromBatchIterator

  • ArrowConverters.toBatchIterator
  private[sql] def toBatchIterator(
      rowIter: Iterator[InternalRow],
      schema: StructType,
      maxRecordsPerBatch: Long,
      timeZoneId: String,
      context: TaskContext): ArrowBatchIterator = {
    new ArrowBatchIterator(
      rowIter, schema, maxRecordsPerBatch, timeZoneId, context)
  }

这个主要是把spark内部的InternalRow转换为ArrowRecordBatches,方法直接就是返回ArrowBatchIterator类型(Iterator[Array[Byte]]类型)的迭代器:

  • ArrowConverters.fromBatchIterator
  private[sql] def fromBatchIterator(
      arrowBatchIter: Iterator[Array[Byte]],
      schema: StructType,
      timeZoneId: String,
      context: TaskContext): Iterator[InternalRow] = new InternalRowIteratorWithoutSchema(
    arrowBatchIter, schema, timeZoneId, context
  )

这个主要是把序列化的ArrowRecordBatche转换为Spark内部的InternalRow,这里也是直接返回了InternalRowIteratorWithoutSchema类型的迭代器,这里就涉及到了内存的零拷贝,具体的方法如下:

    override def nextBatch(): (Iterator[InternalRow], StructType) = {
      val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
      val root = VectorSchemaRoot.create(arrowSchema, allocator)
      resources.append(root)
      val arrowRecordBatch = ArrowConverters.loadBatch(arrowBatchIter.next(), allocator)
      val vectorLoader = new VectorLoader(root)
      vectorLoader.load(arrowRecordBatch)
      arrowRecordBatch.close()
      (vectorSchemaRootToIter(root), schema)
    }

其中涉及的调用链如下:

ArrowConverters.loadBatch
   ||
   \/
MessageSerializer.deserializeRecordBatch
   ||
   \/
readMessageBody
  ||
  \/
ReadChannel.readFully
  ||
  \/
buffer.nioBuffer
  ||
  \/
getDirectBuffer

最后的getDirectBuffer直接返回的是DirectByteBuffer直接内存,这样可以避免了JVM内存到native内存的数据拷贝,尤其是在大数据场景下,提升的效率更加明显,且减少了用户态和内核态的切换次数。

  • 怎么运用到python与spark jvm的交互中
    调用网上的Pyspark的架构图
    在这里插入图片描述

    参考具体conversion.py中部分代码如下:

    jrdd = self._sc._serialize_to_jvm(arrow_data, ser, reader_func, create_RDD_server)
    jdf = self._jvm.PythonSQLUtils.toDataFrame(jrdd, schema.json(), jsqlContext)
    

    主要在self._jvm.PythonSQLUtils.toDataFrame这个方法中,python调用spark中方法,把序列化的*Iterator[Array[Byte]]*传给jvm执行,具体的细节,读者可以自行参考源代码.

其他

在最新发布的Spark-3.4.0中有一项SPIP,也是采用了Arrow IPC作为数据传输的格式。
当然Arrow Flight SQL也将是一个很好的技术点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/672511.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

io.netty学习(十)Netty 程序引导类

目录 前言 引导程序类 AbstractBootStrap 抽象类 Bootstrap 类 ServerBootstrap 类 引导服务器 1、实例化引导程序类 2、设置 EventLoopGroup 3、指定 Channel 类型 4、指定 ChannelHandler 5、设置 Channel 选项 6、绑定端口启动服务 引导客户端 1、实例化引导程…

设计模式之代理模式笔记

设计模式之代理模式笔记 说明Proxy(代理)目录代理模式静态代理示例类图买火车票的接口火车站类代售点类测试类 JDK动态代理买火车票的接口火车站类获取代理对象的工厂类测试类 CGLIB动态代理火车站类代理工厂类测试类 三种代理对比优缺点 说明 记录下学习设计模式-代理模式的写…

windows pwn

环境搭建 checksec winchecksec winchecksec 是 windows 版的 checksec ,不过有时候结果不太准确。 checksec(x64dbg) x64dbg 的插件 checksec 检查效果比较准确,并且可以连同加载的 dll 一起检测。 将 release 的插件按 3…

RK3288 Android8.1添加EC25

首先拿到供应商提供的so库,将so放到vendor\rockchip\common\phone\lib下 修改对应的phone.mk,将so库移动指定位置(Android7以下移动到system/lib,android8以后移动到vendor/lib) CUR_PATH : vendor/rockchip/common#############…

mysql避免重复插入记录insert ignore 、on duplicate key update、replace into

星标▲Java学习之道▲一起成长,一起学习~ 哈喽,大家好,我是阿淼。今天梳理一下mysql中避免重复插入记录的集中操作。 1序 回顾以前写的项目,发现在规范的前提下,还是可以做点骚操作的。 假如项目使用的MySQL&#xff0…

基于Informer的股票价格预测(量化交易综述)

摘要 股票市场是金融市场中不可或缺的组成部分。准确预测股票趋势对于投资者和市场参与者具有重要意义,因为它们可以指导投资决策、优化投资组合以及降低金融风险。而且可以提升国家国际地位以及金融风险控制能力,还可以促进股票市场发展以及资源优化利…

Java常用类库与技巧

1、String,StringBuffer,StringBuilder的区别? 2、Java异常 异常处理机制主要回答了三个问题 What:异常类型回答了什么被抛出?Where:异常堆栈跟踪回答了在哪抛出?Why:异常信息回答…

PowerDesigner165安装

PowerDesigner安装及解析 一、PowerDesigner安装1.双击开始安装2.一路“Next”3.选择地区4.安装路径5.按图勾选6.一路“Next”7.安装中8.安装完成 二、解析三、使用 一、PowerDesigner安装 1.双击开始安装 2.一路“Next” 3.选择地区 选择软件安装所属地区,一定要选择“Hong …

vue3-实战-12-管理后台-权限管理之菜单管理模块-首页-主题颜色-暗黑模式

目录 1-列表页面功能开发 1.1-需求原型分析 1.2-接口和数据类型定义 1.3-获取服务端数据渲染页面 2-新增编辑菜单 2.1-原型需求分析 2.2-表单数据收集和页面结构开发 2.3-提交或者取消 3-删除菜单 4-首页开发 5-暗黑模式的切换和主题颜色 5.1-暗黑模式 5.2-主题颜…

three.js几何体的_UV_、法向属性以及BufferGeometry类介绍

一、几何体的_UV_以及法向属性 UV属性是一组二维坐标,每个顶点都有一个对应的UV坐标。在三维模型上贴上二维的纹理贴图时,需要将所有顶点映射到纹理上的对应位置。UV属性的取值范围一般是[0,1],表示纹理上的相对位置。通过修改UV属性&#xf…

Shell - 02_shell变量

一、shell的自定义变量 1.定义变量:变量名变量值 如:num10 2.引用变量:$变量名 如:i$num 把变量 num 的值付给变量 i 3.显示变量:使用 echo 命令可以显示单个变量取值 如:echo $num 4.清除变量&…

如何写好接口自动化测试脚本

谈到接口测试,大家关注更多的是哪个工具更优秀,更好用。但是很少人关注到接口测试用例的设计问题,也很少人会去写接口用例,都代码化了嘛,还写什么用例,是吧? 这样真的对么?我们是不…

Web3通过 MetaMask简单演示对ganache虚拟环境账号进行管理操作

上文 Web3通过ganache运行起一个本地虚拟区块链 我们通过ganache在本地运行起了一个虚拟的区块链环境 那么 接下来 我们就要用 MetaMask 来管理这个东西了 如果您还没有安装 可以访问文章Web3 将 MetaMask添加入谷歌浏览器 扩展程序中和Web3开发准备工作 手把手带你创建自己的 …

行业报告 | 人工智能时代的营销新趋势

原创 | 文 BFT机器人 01 科技推动时代发展进步 随着电子计算机的发明和使用,打开了人类知识的全方位信息时空,人类由此从工业文明走进信息文明,渐渐地网络成为了人们进行社会活动的基本平台。 智能手机的出现将人们剩余的碎片化时间也连接到了…

从尾到头打印链表

输入一个链表的头节点&#xff0c;按链表从尾到头的顺序返回每个节点的值&#xff08;用数组返回&#xff09;。 如输入{1,2,3}的链表如下图: ​ 返回一个数组为[3,2,1] 0 < 链表长度 < 10000 示例1 输入&#xff1a; {1,2,3} 返回值&#xff1a; [3,2,1]示例2 输入…

springboot集成J-IM+vue实现简单的聊天功能

前言&#xff1a;看了demo自己摸索着集成了一下&#xff0c;特此记录 一、引入依赖 <!-- jim-server --> <dependency><groupId>org.j-im</groupId><artifactId>jim-server</artifactId><version>3.0.0.v20200501-RELEASE&l…

【系统开发】尚硅谷 - 谷粒商城项目笔记(六):异步线程池

文章目录 异步线程池讲解简单线程池常见的四种线程池进阶线程池为什么使用线程池异步编排基本用法其他API线程串行化两任务组合都完成时一个完成时 多任务组合 异步线程池讲解 简单线程池 public class Test01 {public static void main(String[] args) {// 声明一个有10个线…

Java——集合

文章目录 1、集合概述2、集合类体系结构Collection集合体系 3、Collection集合常用API3、Collection集合的遍历方式方式一&#xff1a;迭代器方式二&#xff1a;foreach/增强for循环方式三&#xff1a;lambda表达式 4、List系列集合List集合特点和特有APILinkedList集合 5、集合…

物流园仓库智能综合监控系统

现代经济的不断发展&#xff0c;仓储物流业也在快速地发展&#xff0c;物流仓库作为物质资源的存储和转运&#xff0c;在经济生产中发挥着重大的作用&#xff0c;但是在此期间&#xff0c;随之而来的是物品丢失、被盗、损坏等一系列安全隐患事件。 物流仓库里面存储物品的多数都…

nginx: client intended to send too large body

最近上传大于1M文件的时候&#xff0c;报错nginx。 413 Request Entity Too Large 经过排查修改nginx配置 这是最简单的一个做法&#xff0c;着报错原因是nginx不允许上传配置过大的文件&#xff0c;那么件把nginx的上传大小配置调高就好。 1、打开nginx主配置文件nginx.co…