背景
要提起ArrowConverters,就得说起Arrow这个项目,该项目的初衷是加速进程间的数据交换,从目前的社区发展以及它的周边来看,其实是一个很不错的项目。
那为什么Spark要引入Arrow呢?其实还得从Pyspark中python和jvm的交互方式上说起,目前pyspark采用的py4j与spark jvm进行交互,而数据的交换采用的是jvm和python两个进程间的数据交换(感兴趣的同学可以参考PySpark架构),这个时候引进Arrow恰到好处。
闲说杂谈
spark具体采用的是Arrow IPC,
而IPC中用到了flatbuffers这种高效获取序列化数据的组件,再加上IPC采用的是Java NIO的ByteBuffer零拷贝的方式以及RecordBatch列批的方式,大大提升了进程间的数据交换效率。关于NIO的零拷贝参考NIO效率高的原理之零拷贝与直接内存映射
具体细节
直接到ArrowConverters的类中:
主要看两个方法:toBatchIterator和fromBatchIterator
- ArrowConverters.toBatchIterator
private[sql] def toBatchIterator(
rowIter: Iterator[InternalRow],
schema: StructType,
maxRecordsPerBatch: Long,
timeZoneId: String,
context: TaskContext): ArrowBatchIterator = {
new ArrowBatchIterator(
rowIter, schema, maxRecordsPerBatch, timeZoneId, context)
}
这个主要是把spark内部的InternalRow转换为ArrowRecordBatches,方法直接就是返回ArrowBatchIterator类型(Iterator[Array[Byte]]类型)的迭代器:
- ArrowConverters.fromBatchIterator
private[sql] def fromBatchIterator(
arrowBatchIter: Iterator[Array[Byte]],
schema: StructType,
timeZoneId: String,
context: TaskContext): Iterator[InternalRow] = new InternalRowIteratorWithoutSchema(
arrowBatchIter, schema, timeZoneId, context
)
这个主要是把序列化的ArrowRecordBatche转换为Spark内部的InternalRow,这里也是直接返回了InternalRowIteratorWithoutSchema类型的迭代器,这里就涉及到了内存的零拷贝,具体的方法如下:
override def nextBatch(): (Iterator[InternalRow], StructType) = {
val arrowSchema = ArrowUtils.toArrowSchema(schema, timeZoneId)
val root = VectorSchemaRoot.create(arrowSchema, allocator)
resources.append(root)
val arrowRecordBatch = ArrowConverters.loadBatch(arrowBatchIter.next(), allocator)
val vectorLoader = new VectorLoader(root)
vectorLoader.load(arrowRecordBatch)
arrowRecordBatch.close()
(vectorSchemaRootToIter(root), schema)
}
其中涉及的调用链如下:
ArrowConverters.loadBatch
||
\/
MessageSerializer.deserializeRecordBatch
||
\/
readMessageBody
||
\/
ReadChannel.readFully
||
\/
buffer.nioBuffer
||
\/
getDirectBuffer
最后的getDirectBuffer直接返回的是DirectByteBuffer直接内存,这样可以避免了JVM内存到native内存的数据拷贝,尤其是在大数据场景下,提升的效率更加明显,且减少了用户态和内核态的切换次数。
-
怎么运用到python与spark jvm的交互中
调用网上的Pyspark的架构图
参考具体conversion.py中部分代码如下:
jrdd = self._sc._serialize_to_jvm(arrow_data, ser, reader_func, create_RDD_server) jdf = self._jvm.PythonSQLUtils.toDataFrame(jrdd, schema.json(), jsqlContext)
主要在self._jvm.PythonSQLUtils.toDataFrame这个方法中,python调用spark中方法,把序列化的*Iterator[Array[Byte]]*传给jvm执行,具体的细节,读者可以自行参考源代码.
其他
在最新发布的Spark-3.4.0中有一项SPIP,也是采用了Arrow IPC作为数据传输的格式。
当然Arrow Flight SQL也将是一个很好的技术点。