Flink 数据序列化

news2025/1/18 8:56:29

为 Flink 量身定制的序列化框架

大家都知道现在大数据生态非常火,大多数技术组件都是运行在JVM上的,Flink也是运行在JVM上,基于JVM的数据分析引擎都需要将大量的数据存储在内存中,这就不得不面临JVM的一些问题,比如Java对象存储密度较低等。针对这些问题,最常用的方法就是实现一个显式的内存管理,也就是说用自定义的内存池来进行内存的分配回收,接着将序列化后的对象存储到内存块中。
现在Java生态圈中已经有许多序列化框架,比如说Java serialization, Kryo,Apache Avro等等。但是Flink依然是选择了自己定制的序列化框架,那么到底有什么意义呢?若Flink选择自己定制的序列化框架,对类型信息了解越多,可以在早期完成类型检查,更好的选取序列化方式,进行数据布局,节省数据的存储空间,直接操作二进制数据。
在这里插入图片描述Flink在其内部构建了一套自己的类型系统,Flink现阶段支持的类型分类如图所示,从图中可以看到Flink类型可以分为基础类型Basic、数组Arrays、复合类型Composite、辅助类型Auxiliary、泛型和其它类型GenericFlink支持任意的Java或是Scala类型。不需要像Hadoop一样去实现一个特定的接口org.apache.hadoop.io.WritableFlink能够自动识别数据类型。
在这里插入图片描述
TypeInformation的思维导图如图所示,从图中可以看出,在Flink中每一个具体的类型都对应了一个具体的TypeInformation实现类,例如BasicTypeInformation中的IntegerTypeInformationFractionalTypeInformation都具体的对应了一个TypeInformation。然后还有BasicArrayTypeInformationCompositeType以及一些其它类型,也都具体对应了一个TypeInformation

TypeInformationFlink类型系统的核心类。对于用户自定义的Function来说,Flink需要一个类型信息来作为该函数的输入输出类型,即TypeInfomation。该类型信息类作为一个工具来生成对应类型的序列化器TypeSerializer,并用于执行语义检查,比如当一些字段在作为joingrouping的键时,检查这些字段是否在该类型中存在。

Flink 的序列化过程

Flink序列化过程中,进行序列化操作必须要有序列化器,那么序列化器从何而来?
每一个具体的数据类型都对应一个TypeInformation的具体实现,每一个TypeInformation都会为对应的具体数据类型提供一个专属的序列化器。通过 Flink的序列化过程图可以看到TypeInformation会提供一个createSerialize()方法,通过这个方法就可以得到该类型进行数据序列化操作与反序化操作的对象TypeSerializer
在这里插入图片描述对于大多数数据类型 Flink可以自动生成对应的序列化器,能非常高效地对数据集进行序列化和反序列化,比如,BasicTypeInfoWritableTypeInfo等,但针对GenericTypeInfo类型,Flink会使用Kyro进行序列化和反序列化。其中,TuplePojoCaseClass类型是复合类型,它们可能嵌套一个或者多个数据类型。在这种情况下,它们的序列化器同样是复合的。它们会将内嵌类型的序列化委托给对应类型的序列化器。

简单的介绍下Pojo的类型规则,即在满足一些条件的情况下,才会选用Pojo的序列化进行相应的序列化与反序列化的一个操作。即类必须是Public的,且类有一个public的无参数构造函数,该类(以及所有超类)中的所有非静态no-static、非瞬态no-transient字段都是public的(和非最终的final)或者具有公共gettersetter方法,该方法遵循gettersetterJava bean命名约定。当用户定义的数据类型无法识别为POJO类型时,必须将其作为GenericType处理并使用Kryo进行序列化。

Flink自带了很多TypeSerializer子类,大多数情况下各种自定义类型都是常用类型的排列组合,因而可以直接复用,如果内建的数据类型和序列化方式不能满足你的需求,Flink的类型信息系统也支持用户拓展。若用户有一些特殊的需求,只需要实现 TypeInformationTypeSerializerTypeComparator即可定制自己类型的序列化和比较大小方式,来提升数据类型在序列化和比较时的性能。

序列化就是将数据结构或者对象转换成一个二进制串的过程,在Java里面可以简单地理解成一个byte数组。而反序列化恰恰相反,就是将序列化过程中所生成的二进制串转换成数据结构或者对象的过程。下面就以内嵌型的Tuple3这个对象为例,简述一下它的序列化过程。
[点击并拖拽以移动] ​

Tuple3包含三个层面,一是int类型,一是double类型,还有一个是PersonPerson包含两个字段,一是int型的ID,另一个是 String 类型的name,它在序列化操作时,会委托相应具体序列化的序列化器进行相应的序列化操作。从图中可以看到Tuple3 会把 int类型通过IntSerializer进行序列化操作,此时int只需要占用四个字节就可以了。根据int占用四个字节,这个能够体现出Flink可序列化过程中的一个优势,即在知道数据类型的前提下,可以更好的进行相应的序列化与反序列化操作。相反,如果采用Java的序列化,虽然能够存储更多的属性信息,但一次占据的存储空间会受到一定的损耗。Person类会被当成一个Pojo对象来进行处理,PojoSerializer序列化器会把一些属性信息使用一个字节存储起来。同样,其字段则采取相对应的序列化器进行相应序列化,在序列化完的结果中,可以看到所有的数据都是由MemorySegment去支持。

MemorySegment具有什么作用呢? MemorySegmentFlink中会将对象序列化到预分配的内存块上,它代表1个固定长度的内存,默认大小为32 kbMemorySegment代表Flink中的一个最小的内存分配单元,相当于是Java的一个byte数组。 每条记录都会以序列化的形式存储在一个或多个MemorySegment中。

Flink 序列化的最佳实践

Flink常见的应用场景有四种,即注册子类型、注册自定义序列化器、添加类型提示、手动创建TypeInformation,具体如下:
【1】注册子类型: 如果函数签名只描述了超类型,但是它们实际上在执行期间使用了超类型的子类型,那么让Flink了解这些子类型会大大提高性能。可以在StreamExecutionEnvironmentExecutionEnvironment中调用.registertype (clazz) 注册子类型信息。
【2】注册自定义序列化器: 对于不适用于自己的序列化框架的数据类型,Flink会使用Kryo来进行序列化,并不是所有的类型都与Kryo无缝连接,具体注册方法在下文介绍。
【3】添加类型提示: 有时,当Flink用尽各种手段都无法推测出泛型信息时,用户需要传入一个类型提示TypeHint,这个通常只在Java API中需要。
【4】手动创建TypeInformation 在某些API调用中,这可能是必需的,因为Java的泛型类型擦除导致Flink无法推断数据类型。
其实在大多数情况下,用户不必担心序列化框架和注册类型,因为Flink已经提供了大量的序列化操作,不需要去定义自己的一些序列化器,但是在一些特殊场景下,需要去做一些相应的处理。

实践 - 类型声明: 类型声明去创建一个类型信息的对象是通过哪种方式?通常是用TypeInformation.of()方法来创建一个类型信息的对象,具体说明如下:
【1】对于非泛型类,直接传入class对象即可

PojoTypeInfo<Person> typeInfo = (PojoTypeInfo<Person>) TypeInformation.of(Person.class);

【2】对于泛型类,需要通过TypeHint来保存泛型类型信息

final TypeInfomation<Tuple2<Integer,Integer>> resultType = TypeInformation.of(new TypeHint<Tuple2<Integer,Integer>>(){});

【3】预定义常量:BasicTypeInfo,这个类定义了一系列常用类型的快捷方式,对于StringBooleanByteShortIntegerLongFloatDoubleChar等基本类型的类型声明,可以直接使用。而且Flink还提供了完全等价的Typesorg.apache.flink.api.common.typeinfo.Types。特别需要注意的是,flink-table模块也有一个Typesorg.apache.flink.table.api.Types,用于table模块内部的类型定义信息,用法稍有不同。使用IDE 的自动import时一定要小心。
【4】自定义TypeInfoTypeInfoFactory 通过自定义TypeInfo为任意类提供Flink原生内存管理(而非Kryo),使存
储更紧凑,运行时也更高效。需要注意在自定义类上使用@TypeInfo注解,随后创建相应的TypeInfoFactory并覆盖createTypeInfo()方法。

@TypeInfo(MyTupleTypeInfoFactory.class)
public class MyTuple<T0,T1>{
    public T0 myfield0;
    public T1 myfield1;
}

public class MyTupleTypeInfoFactory extends TypeInfoFactory<MyTuple>{

    @Override
    public TypeInformation<MyTuple> createTypeInfo(Type t, Map<String, TypeInfomation<?>> genericParameters){
        return new MyTupleTypeInfo(genericParameters.get("T0").genericParameters.get("T1"));
    }
}

实践 - 注册子类型

Flink认识父类,但不一定认识子类的一些独特特性,因此需要单独注册子类型。StreamExecutionEnvironmentExecutionEnvironment提供registerType()方法用来向Flink注册子类信息。

final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.registerType(typeClass);

registerType()方法内部,会使用TypeExtractor来提取类型信息,如上所示,获取到的类型信息属于PojoTypeInfo及其子类,那么需要将其注册到一起,否则统一交给Kryo去处理,Flink并不过问 ( 这种情况下性能会变差 )。

实践 - Kryo 序列化

对于Flink无法序列化的类型(例如用户自定义类型,没有registerType,也没有自定义TypeInfoTypeInfoFactory),默认会交给 Kryo处理,如果Kryo仍然无法处理(例如GuavaThriftProtobuf等第三方库的一些类),有两种解决方案:
【1】强制使用Avro来代替Kryo

env.getConfig().enableForceAvro();

【2】为Kryo增加自定义的Serializer以增强Kryo的功能

env.getConfig().addDefaultKryoSerializer(clazz, serializer);

注:如果希望完全禁用Kryo100%使用Flink的序列化机制),可以通过Kryoenv.getConfig().disableGenericTypes()的方式完成,但注意一切无法处理的类都将导致异常,这种对于调试非常有效。

Flink 通信层的序列化

FlinkTask之间如果需要跨网络传输数据记录, 那么就需要将数据序列化之后写入NetworkBufferPool,然后下层的Task读出之后再进行反序列化操作,最后进行逻辑处理。为了使得记录以及事件能够被写入 Buffer,随后在消费时再从Buffer中读出,
Flink提供了数据记录序列化器RecordSerializer与反序列化器RecordDeserializer以及事件序列化器EventSerializer

Function发送的数据被封装成SerializationDelegate,它将任意元素公开为IOReadableWritable以进行序列化,通过setInstance()来传入要序列化的数据。在Flink通信层的序列化中,有几个问题值得关注,具体如下:
【1】何时确定Function的输入输出类型?
[点击并拖拽以移动] ​

在构建StreamTransformation的时候通过TypeExtractor工具确定Function的输入输出类型。TypeExtractor类可以根据方法签名、子类信息等蛛丝马迹自动提取或恢复类型信息。
【2】何时确定Function的序列化 / 反序列化器?
构造StreamGraph时, 通过TypeInfomationcreateSerializer()方法获取对应类型的序列化器TypeSerializer,并在addOperator()的过程中执行setSerializers() 操作,设置StreamConfigTYPESERIALIZERIN1TYPESERIALIZERIN2TYPESERIALIZEROUT_1属性。
【3】何时进行真正的序列化 / 反序列化操作? 这个过程与TypeSerializer又是怎么联系在一起的呢?
构造StreamGraph时, 通过TypeInfomationcreateSerializer()方法获取对应类型的序列化器TypeSerializer,并在addOperator()的过程中执行setSerializers()操作,设置StreamConfigTYPESERIALIZERIN1TYPESERIALIZERIN2TYPESERIALIZEROUT_1属性。
【4】何时进行真正的序列化 / 反序列化操作? 这个过程与TypeSerializer又是怎么联系在一起的呢?
[点击并拖拽以移动] ​

大家都应该清楚TaskStreamTask两个概念,Task是直接受TaskManager管理和调度的,而Task又会调用StreamTask,而StreamTask中真正封装了算子的处理逻辑。在run()方法中,首先将反序列化后的数据封装成StreamRecord交给算子处理;然后将处理结果通过Collector发送给下游 ( 在构建Collector时已经确定了SerializtionDelegate),并通过RecordWriter写入器将序列化后的结果写入DataOutput;最后序列化的操作交给SerializerDelegate处理,实际还是通过TypeSerializerserialize()方法完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1331259.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

thinkphp+vue+mysql酒店客房管理系统 b1g8z

本系统包括前台界面、用户界面和管理员界面、员工界面。在前台界面里游客和用户可以浏览客房信息、公告信息等&#xff0c;用户可以预定客房&#xff0c;在用户中心界面里&#xff0c;用户可以管理预定信息&#xff0c;管理员负责用户预定的审核以及客房的发布、用户的入住等。…

装饰者模式学习

装饰器&#xff08;Decorator&#xff09;模式的定义&#xff1a;指在不改变现有对象结构的情况下&#xff0c;动态地给该对象增加一些职责&#xff08;即增加其额外功能&#xff09;的模式&#xff0c;它属于对象结构型模式。 装饰器模式的主要优点有&#xff1a; 装饰器是继…

基于Java (spring-boot)的在线考试管理系统

一、项目介绍 系统功能说明 1、系统共有管理员、老师、学生三个角色&#xff0c;管理员拥有系统最高权限。 2、老师拥有考试管理、题库管理、成绩管理、学生管理四个模块。 3、学生可以参与考试、查看成绩、试题练习、留言等功能 二、作品包含 三、项目技术 后端语言&#xff1…

【电路笔记】-串联电容器

串联电容器 文章目录 串联电容器1、概述2、示例13、示例34、总结 当电容器以菊花链方式连接在一条线上时&#xff0c;它们就串联在一起。 1、概述 对于串联电容器&#xff0c;流过电容器的充电电流 ( i C i_C iC​ ) 对于所有电容器来说都是相同的&#xff0c;因为它只有一条…

Unity中Shader缩放矩阵

文章目录 前言一、直接相乘缩放1、在属性面板定义一个四维变量&#xff0c;用xyz分别控制在xyz轴上的缩放2、在常量缓存区申明该变量3、在顶点着色器对其进行相乘&#xff0c;来缩放变换4、我们来看看效果 二、使用矩阵乘法代替直接相乘缩放的原理1、我们按如下格式得到缩放矩阵…

Koordinator 支持 K8s 与 YARN 混部,小红书在离线混部实践分享

作者&#xff1a;索增增&#xff08;小红书&#xff09;、宋泽辉&#xff08;小红书&#xff09;、张佐玮&#xff08;阿里云&#xff09; 背景介绍 Koordinator 是一个开源项目&#xff0c;基于阿里巴巴在容器调度领域多年累积的经验孵化诞生&#xff0c;目前已经支持了 K8s…

LZ码基本概念

LZ码是一种无损压缩算法&#xff0c;由Lempel和Ziv两位计算机科学家提出并命名。它是一种基于字典的压缩方法&#xff0c;可以将数据有效地压缩存储&#xff0c;同时实现高效的解压缩。 LZ码的基本概念是利用字典来存储先前遇到的字符串&#xff0c;然后用较短的代表符号来表示…

【性能优化】MySql数据库查询优化方案

阅读本文你的收获 了解系统运行效率提升的整体解决思路和方向学会MySQl中进行数据库查询优化的步骤学会看慢查询、执行计划、进行性能分析、调优 一、问题&#xff1a;如果你的系统运行很慢&#xff0c;你有什么解决方案&#xff1f; ​关于这个问题&#xff0c;我们通常首先…

mac上使用 Downie 下载网页视频

在今天的数字时代&#xff0c;视频内容在互联网上的传播变得更加普遍和便捷。然而&#xff0c;有时我们可能希望将网页上的视频保存在本地&#xff0c;以便离线观看或与他人分享。Downie 是一款强大而简便的工具&#xff0c;专门设计用于下载网页上的视频内容。本文将介绍 Down…

IP技术在网络安全防护中的重要意义

随着互联网的普及&#xff0c;网络安全问题日益凸显。作为网络通信中的重要标识&#xff0c;IP地址在网络安全防护中扮演着关键角色。近日&#xff0c;一则关于IP技术在网络安全防护措施的新闻引起了广泛关注。 据报道&#xff0c;IP技术已成为网络安全防护的重要手段之一。通过…

idea structure视图介绍

作用 idea的Structure视图可以辅助查看代码结构 如何呼出Structure视图&#xff1f; Alt 7 Ctrl F12 侧边栏点Structure 我的常用配置 1、选Show Toolbar&#xff0c;便于使用功能按钮 2、使用Float视图&#xff0c;悬浮于窗口表面&#xff0c;可以使用 ShiftEsc来退出…

学习使用echarts图表中formatter的用法,格式化数字金额,控制x轴、y轴展示长度

学习使用echarts图表中formatter的用法&#xff0c;格式化数字金额&#xff0c;控制x轴、y轴展示长度 控制金额长度两位小数&#xff0c;并去除多余.00效果图 控制文字长度完整代码 控制金额长度 series: [{name: ,type: bar,sort: none,label: { //饼图图形上的文本…

DFS与BFS算法总结

知识概览 DFS、BFS都可以对整个问题空间进行搜索&#xff0c;搜索的结构都是像一棵树。DFS会尽可能往深搜&#xff0c;当搜索到叶节点时就会回溯。而BFS每一次只会扩展一层。 DFS与BFS的区别&#xff1a; 搜索方式数据结构空间复杂度性质DFS栈O(h)&#xff0c;其中h为搜索空间…

Linux数据库主从复制(单主单从)

MySQL主从复制的优点包括&#xff1a; 1、横向扩展解决方案 - 在多个从站之间分配负载以提高性能。在此环境中&#xff0c;所有写入和更新都必须在主服务器上进行。但是&#xff0c;读取可以在一个或多个从设备上进行。该模型可以提高写入性能&#xff08;因为主设备专用于更新…

PMP项目管理 - 成本管理

系列文章目录 系统架构设计 PMP项目管理 - 整合管理 PMP项目管理 - 范围管理 PMP项目管理 - 质量管理 PMP项目管理 - 采购管理 PMP项目管理 - 资源管理 PMP项目管理 - 风险管理 PMP项目管理 - 沟通管理 现在的一切都是为将来的梦想编织翅膀&#xff0c;让梦想在现实中展翅高飞…

只用10分钟,ChatGPT就帮我写了一篇2000字文章

有了ChatGPT之后&#xff0c;于我来说&#xff0c;有两个十分明显的变化&#xff1a; 1. 人变的更懒 因为生活、工作中遇到大大小小的事情&#xff0c;都可以直接找ChatGPT来寻求答案。 2. 工作产出量更大 之前花一天&#xff0c;甚至更久才能写一篇原创内容&#xff0c;现…

FPGA设计时序约束十三、Set_Data_Check

目录 一、序言 二、Set Data Check 2.1 基本概念 2.2 设置界面 2.3 命令语法 三、工程示例 3.1 工程代码 3.2 约束设置 3.3 时序报告 四、参考资料 一、序言 通常进行时序分析时&#xff0c;会考虑触发器上时钟信号与数据信号到达的先后关系&#xff0c;从而进行setu…

模块与包、反序列化校验源码分析、断言、drf之请求、drf之响应

模块与包 什么是模块&#xff1f; 一个py文件&#xff0c;被别的py文件导入使用&#xff0c;它就是模块 如果py文件&#xff0c;直接右键运行&#xff0c;它叫脚本文件 什么是包&#xff1f; 一个文件夹&#xff0c;下有 __init__.py &#xff0c;和很多py文件&#xff0c;这个…

C++面向对象(OOP)编程-STL详解(vector)

本文主要介绍STL六大组件&#xff0c;并主要介绍一些容器的使用。 目录 1 泛型编程 2 CSTL 3 STL 六大组件 4 容器 4.1 顺序性容器 4.1.1 顺序性容器的使用场景 4.2 关联式容器 4.2.1 关联式容器的使用场景 4.3 容器适配器 4.3.1 容器适配器的使用场景 5 具体容器的…

XSKY星辰天合星海架构荣获 IT168 “2023 年度技术卓越奖”

近日&#xff0c;"2023 年度技术卓越奖"获奖名单公布&#xff0c;XSKY 星辰天合的星海架构&#xff08;XSEA&#xff0c;极速全共享架构&#xff09;获得行业 CIO/CTO 大咖、技术专家及 IT 媒体三方认可&#xff0c;成功入选&#xff01; “技术卓越奖”评选由国内著…