Flink 类型机制 及 Stream API和Table API类型推断和转换

news2025/1/11 11:53:32

注:本文使用flink 版本是0.13

一、类型体系

Flink 有两大API (1)stream API 和 (2)Table API ,分别对应TypeInformation 和 DataType类型体系。

1.1 TypeInformation系统

TypeInformation系统是使用Stream一定会用到的。TypeInformation 以下简称TypeInfo。
TypeInfo 本质就是一对一的类型映射。在java中一个typeInfo就对应着一个确定的java类型。所以在stream api 中某些情况下。给定数据flink可以根据数据自动推断出TypeInfo。
但现在Table API大行其道,Flink已经有意在用DataType替代TypeInfo了。所以Flink中有 DataType To TypeInfo 的API(org.apache.flink.table.runtime.typeutils.InternalTypeInfo虽然只能将DataType转为Table API所规定的TypeInfo),却没有提供 TypeInfo To DataType 的API。
在这里插入图片描述
在Stream API中默认使用的承载行数据的类型是org.apache.flink.types.Row
注意:创建流后如果是复杂类型,比Row类型,非标准的Pojo类型,必须明确告诉Flink是审美类型,Flink无法自动推断出的。
(1)可以在创建流时候提供如org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#addSource(org.apache.flink.streaming.api.functions.source.SourceFunction<OUT>, java.lang.String, org.apache.flink.api.common.typeinfo.TypeInformation<OUT>)
(2) 可以在使用function时候提供,如org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator#returns(org.apache.flink.api.common.typeinfo.TypeInformation<T>)
其中return方法参数也有三种(非常重要):

// 比如 DataStream<Integer> myInts = env.fromElements(1, 2, 3, 4, 5); 提供 Integer.class就可以了。当然不提供FLink也会推断出的。
public SingleOutputStreamOperator<T> returns(Class<T> typeClass);
// 用于创安含有泛型的TypeInfo
// 如:TypeInformation<Tuple2<String, Double>> info = TypeInformation.of(new TypeHint<Tuple2<String, Double>>(){});
public SingleOutputStreamOperator<T> returns(TypeHint<T> typeHint);
// TypeInfo的创建方式
// 1. 基础类型从org.apache.flink.api.common.typeinfo.BasicTypeInfo选择即可。
// 2. 不含泛型的类型可从org.apache.flink.api.common.typeinfo.TypeInformation#of方法创建。
// 3. DataStream<Row> 可由 org.apache.flink.api.java.typeutils.RowTypeInfo 创建
// 4. DataStream<RowData> 只能由 org.apache.flink.table.runtime.typeutils.InternalTypeInfo#of(org.apache.flink.table.types.logical.RowType)等方法创建。注意创建出来字段的Field TypeInfo的也都是Flink Table API预先指定好的。
public SingleOutputStreamOperator<T> returns(TypeInformation<T> typeInfo) ;

(3)可以实现接口org.apache.flink.api.java.typeutils.ResultTypeQueryable提供。

1.2 DataType系统

Table API 中所有的类型都是围绕着DataType构建的。如org.apache.flink.table.api.Schemaorg.apache.flink.table.catalog.ResolvedSchema
前者代表Table api中的Table对象的表结构,后者代表从catalog中获取的表结构。
DataType本质由两部分组成:

    protected final LogicalType logicalType;
    protected final Class<?> conversionClass;

logicalType 即逻辑类型,是和数据库中的类型对应的。
conversionClass 即运行时java类型,是实际承载数据的类型。

可以说DataType 与物理类型也是一对一的关系,并有conversionClass确定。
举个例子:
DataType与数据库中日期对应的类型是DateType ,它有一个支持conversionClass 的列表。分别支持 Date , LocalDate ,Integer。其中 如果使用Integer作为实际承载数据的类型,此时存储的值是与1970-01-01的天数差值。

@PublicEvolving
public final class DateType extends LogicalType {
    private static final String FORMAT = "DATE";
    private static final Set<String> NULL_OUTPUT_CONVERSION = conversionSet(new String[]{Date.class.getName(), LocalDate.class.getName(), Integer.class.getName()});
    private static final Class<?> DEFAULT_CONVERSION;
 }

如果需要创建DataType,Flink提供的入口是org.apache.flink.table.api.DataTypes 类,提供了各种DataType类型的创建方法。需要注意的是创建的类型都使用了默认 conversionClass ,其就是LogicalType 实现类中指定的DEFAULT_CONVERSION。如果需要指定运行时的类型就需要使用org.apache.flink.table.types.AbstractDataType#bridgedTo方法。

而在Table API中默认使用的承载行数据类型是org.apache.flink.table.data.RowData,是一个接口。常用的实现类是org.apache.flink.table.data.GenericRowData

在Table API中实际承载数据的类型必须使用FLink指定的类型。比如Date类型必须使用Integer,而不能使用java的LocalDate类型。具体类型可以从方法 org.apache.flink.table.types.utils.DataTypeUtils#toInternalDataType(org.apache.flink.table.types.DataType) 得出。

1.3 Stream API 和 Table API相互转换中的类型

转换为核心是 围绕着实际承载数据的类型 即DataType的conversionClass 应为
转换过程均由TableAPI的核心org.apache.flink.table.api.bridge.java.StreamTableEnvironment完成。
刚刚提及了Schema本质就是DataType,并可由DataType创建org.apache.flink.table.api.Schema.Builder#fromRowDataType 。以下就将Schema代指为DataType了。

如下各个方法中
Stream <–相互转化–>Table 中可以指定DataType,也可以不指定DataType(Schema)。

1.3.1 Stream To Table

1.3.1.1 不指定DataType

Flink会从DataStream的TypeInfo中推断DataType类型。
比如 TypeInfo是 org.apache.flink.api.common.typeinfo.BasicTypeInfo#BYTE_TYPE_INFO 或者TypeInfomation.of(Integer.class) 在创建流时候 是Integer类型,则Flink会自动推断出需要使用DataTypes.INT() 创建的对象并把其实际的承载类conversionClass指定为Integer.class

具体可以参考:flink 类型推断 的 data-type-extraction 章节:

# 注意在scala中不要使用primitives 类型需要使用包装类型。因为原始类型不允许为空。
If you intend to implement classes in Scala, it is recommended to use boxed types (e.g. java.lang.Integer) instead of Scala’s primitives. Scala’s primitives (e.g. Int or Double) are compiled to JVM primitives (e.g. int/double) and result in NOT NULL semantics
# 对于没有被列举的类型,是需要额外提供类型的。比如使用@DataTypeHint
Other JVM bridging classes mentioned in this document require a @DataTypeHint annotation.

下图是官方的java类型推断成为FLink DataType的类型
在这里插入图片描述

1.3.1.2 指定DataType

DataType是有LogicalType的,指定了DataType也就指定了LogicalType逻辑类型。
比如现在Row中有一个字段的TypeInfo还是 org.apache.flink.api.common.typeinfo.BasicTypeInfo#BYTE_TYPE_INFO 或者TypeInfomation.of(Integer.class) 也就是java中的int 或 Integer。但指定DataType时候使用DataTypes.DATE().bridgedTo(Integer.class)。此时就已经告诉Flink这里我虽然给你提供的是Integer,但实际代表的逻辑是Date日期类型数据了。以后就可以使用Table API所有关于Date日期的转换方法了。

1.3.2 Table To Stream

Table中都是包含DataType的,可从方法获得,如DataType dt = tbl.getResolvedSchema().toPhysicalRowDataType();

1.3.2.1 不指定DataType

不指定情况比较简单Flink Table API 每种LogicalType逻辑类型都有默认的java类型。
如:

TableDataTypeJavaType(conversionClass)代表内容
intInteger.class
bigintLong.class
dateInteger与1900-01-01天数差
timeInteger.class当天的毫秒数

所以不指定情况下,得到的DataStream中的原Date日期类型的数据一定会转为Integer.class 。并不是java中常用的 LocalDate.class ,也不是 LogicalType.DEFAULT_CONVERSION所规定的DEFAULT_CONVERSION = LocalDate.class;

1.3.2.2 指定DataType

指定情况就会在DataStream中获得想要的java类型了。Flink会在此过程中给提供类型转换服务。
如在表中能获取到DataType,如 DataType dt = tbl.getResolvedSchema().toPhysicalRowDataType();
但在toDataStream或toChangelogStream 可以提供DataType。
当两个DataType不一样的时候Flink就会将Table中的java(物理类型)转换为 提供的。

如下流程:
(1)使用 DataStream<Integer>
(2)指定DataTypes.DATE().bridgedTo(Integer.class)创建Table。此时table中的java数据类型仍然是Integer.class。
(3)使用Table创建Stream,并指定 DataTypes.DATE().bridgedTo(LocalDate.class) ,此时得到的流DataStream<LocalDate>
借此流程就实现了DataStream<Integer>Stream<LocalDate> 的转化。

public interface StreamTableEnvironment extends TableEnvironment {

<T> Table fromDataStream(DataStream<T> dataStream);

<T> Table fromDataStream(DataStream<T> dataStream, Schema schema);

Table fromChangelogStream(DataStream<Row> dataStream);

Table fromChangelogStream(DataStream<Row> dataStream, Schema schema);

DataStream<Row> toDataStream(Table table);

<T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType);
DataStream<Row> toChangelogStream(Table table);

DataStream<Row> toChangelogStream(Table table, Schema targetSchema);

}

总结

综上,Flink中比较容易从DataType得到 TypeInfo 。而从TypeInfo中转化为想要的逻辑类型,八成额外提供DataType(如Integer转Date)。所以,还是建议直接使用Table API中的DataType更加方便。

参考文章:
Flink之数据类型详解
Flink类型系统
聊聊Java类型擦除、Flink中使用Lambda表达式丢失信息和Flink类型暗示机制
聊聊Java泛型类型擦除及Flink类型暗示(type hint)机制
Flink数据类型&&序列化&&序列化器

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1023874.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Linux】:Kafka组件介绍

目录 环境简介 一、消息 二、主题 三、分区 四、副本 五、生产者 六、消费者 七、消费者组 八、offsets【偏移量】 环境简介 Linux内核&#xff1a;Centos7 Kafka版本&#xff1a;3.5.1 执行命令的目录位置&#xff1a;Kafka安装目录的bin目录下&#xff1a;/usr/loca…

uvm源码解读-sequence,sequencer,driver三者之间的握手关系1

1.start item 1.start_item();sequencer.wait_for_grant(prior);this.pre_do(1);需要指出&#xff0c;这里明确说明了wait_for_grant和send_request之间不能有任何延迟&#xff0c;所以在mid_do这个任务里千万不能有任何延迟。 task uvm_sequencer_base::wait_for_grant(uvm…

MySQL进阶篇2-索引的创建和使用

索引 mkdir mysql tar -xvf mysqlxxxxx.tar -c myql cd mysql rpm -ivh .....rpm yum install openssl-develsystemctl start mysqldgerp temporary password /var/log/mysqld.logmysql -u root -p mysql> show variables like validate_password.% set global validate_…

maven本地安装jar包

在实际开发中&#xff0c;有些jar包不能通过公共库下载&#xff0c;只能本地安装。可以按照以下步骤操作&#xff1a; 1、安装命令&#xff1a; mvn install:install-file -DgroupIdcom.chinacreator.sm -DartifactIdfbm-sm-common -Dversion0.0.1 -Dpackagingjar -Dfile../n…

基于Java+SpringBoot+Vue+协同过滤算法的电影推荐系统(亮点:智能推荐、协同过滤算法、在线支付、视频观看)

协同过滤算法的电影推荐系统 一、前言二、我的优势2.1 自己的网站2.2 自己的小程序&#xff08;小蔡coding&#xff09;2.3 有保障的售后2.4 福利 三、开发环境与技术3.1 MySQL数据库3.2 Vue前端技术3.3 Spring Boot框架3.4 微信小程序 四、功能设计4.1 主要功能描述 五、系统实…

微服务学习(七):docker安装Mysql

微服务学习&#xff08;七&#xff09;&#xff1a;docker安装Mysql 1、拉取镜像 docker pull mysql2、查看安装的镜像 docker images3、安装mysql docker run -p 3306:3306 --name mysql \ -v /mydata/mysql/log:/var/log/mysql \ -v /mydata/mysql/data:/var/lib/mysql \…

【HarmonyOS】元服务卡片router实现跳转到指定页面

【关键字】 元服务卡片、router跳转不同页面 【写在前面】 本篇文章主要介绍开发元服务卡片时&#xff0c;如何实现从卡片中点击事件跳转到指定的应用内页面功能。此处以JS UI开发服务卡片为例&#xff0c;JS卡片支持组件设置action&#xff0c;包括router事件和message事件&…

Python配置与测试利器:Hydra + pytest的完美结合

简介&#xff1a;Hydra 和 pytest 可以一起使用&#xff0c;基于 Hydra Pytest 的应用可以轻松地管理复杂配置&#xff0c;并编写参数化的单元测试&#xff0c;使得Python开发和测试将变得更为高效。 安装&#xff1a; pip install hydra-core pytest案例源码&#xff1a;my…

系统架构设计师(第二版)学习笔记----软件工程

【原文链接】系统架构设计师&#xff08;第二版&#xff09;学习笔记----软件工程 文章目录 一、软件工程1.1 软件危机的表现1.2 软件工程的内容 二、软件过程模型2.1 软件的声明周期2.2 瀑布模型2.3 瀑布模型的缺点2.4 原型模型2.5 原型模型开发阶段2.6 开发原型的途径2.7 螺旋…

【音视频】ffplay源码解析-PacketQueue队列

包队列架构位置 对应结构体源码 MyAVPacketList typedef struct MyAVPacketList {AVPacket pkt; //解封装后的数据struct MyAVPacketList *next; //下一个节点int serial; //播放序列 } MyAVPacketList;PacketQueue typedef struct PacketQueue {MyAVPacketList …

纯js实现html指定页面导出word

因为最近做了范文网站需要&#xff0c;所以要下载为word文档&#xff0c;如果php进行处理&#xff0c;很吃后台服务器&#xff0c;所以想用前端进行实现。查询github发现&#xff0c;确实有这方面的插件。 js导出word文档所需要的两个插件&#xff1a; FileSaver.js jquery.w…

企业备份解决方案:保护您的企业虚拟机安全!

在目前这个高度数据化的信息时代中&#xff0c;企业对数据的依赖程度更高&#xff0c;以便进行高效的运营和理智的决策。然而&#xff0c;硬件的故障、自然的灾害以及网络的攻击等无法预料的情况&#xff0c;可能会带来大规模的数据丢失&#xff0c;进而造成经济的损失&#xf…

vscode 编译工程问题总结

1.安装NuGet Package出错 The “path” argument must be of type string or an instance of Buffer of URL Received undefined 解决方法&#xff1a; 账号登录&#xff0c;重启vscode &#xff08;1&#xff09;找到登录 &#xff08;2&#xff09;选择一个登录方式登录 …

打造本地紧密链接的开源社区——KCC@长沙开源读书会openKylin爱好者沙龙圆满举办...

2023年9月9日&#xff0c;由开源社联合 openKylin 社区举办的 KCC长沙开源读书会&openKylin 爱好者沙龙&#xff0c;在长沙圆满举办。这是 KCC长沙首次正式进入公众视野&#xff0c;开展开源交流活动&#xff0c;也是 openKylin 社区长沙首场线下沙龙。长沙地区及其周边的众…

目标分类笔记(二): 利用PaddleClas的框架来完成多标签分类任务(从数据准备到训练测试部署的完整流程)

文章目录 一、演示多分类效果二、PaddleClas介绍三、代码获取四、数据集获取五、环境搭建六、数据格式分析七、模型训练7.1 其他训练指标 八、模型预测九、模型评估十、PaddleClas相关博客 一、演示多分类效果 二、PaddleClas介绍 PaddleClas主要构件&#xff1a; PP-ShiTu&a…

测试与FastAPI应用数据之间的差异

【squids.cn】 全网zui低价RDS&#xff0c;免费的迁移工具DBMotion、数据库备份工具DBTwin、SQL开发工具等 当使用两个不同的异步会话来测试FastAPI应用程序与数据库的连接时&#xff0c;可能会出现以下错误&#xff1a; 在测试中&#xff0c;在数据库中创建了一个对象&#x…

指针笔试题讲解-----让指针简单易懂(2)

目录 回顾上篇重点 &#xff1a; 一.笔试题 ( 1 ) 二.笔试题 ( 2 ) 科普进制知识点 (1) 二进制 (2) 八进制 (3)十六进制 三.笔试题&#xff08; 3 &#xff09; 四.笔试题&#xff08; 4 &#xff09; 五.笔试题&#xff08; 5 &#xff09; 六.笔试题&#xff08; …

Word中的图片保存后变模糊怎么解决

目录 1.介绍 2.原因 3.解决方案 Word是由微软公司开发的一款文字处理软件&#xff0c;它是Microsoft Office套件的一部分。Word提供了丰富的功能和工具&#xff0c;使用户能够创建、编辑和格式化文档。它支持各种文本处理任务&#xff0c;包括编写信函、报告、论文、简历等。…

C# Onnx Yolov8 Detect Poker 扑克牌识别

效果 项目 代码 using Microsoft.ML.OnnxRuntime; using Microsoft.ML.OnnxRuntime.Tensors; using OpenCvSharp; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using System.Linq; using System…

OVS-DPDK学习

安装教程&#xff1a; https://docs.openvswitch.org/en/latest/intro/install/dpdk/ https://docs.openvswitch.org/en/latest/howto/dpdk/ overview和应用 https://www.intel.com/content/www/us/en/developer/articles/technical/open-vswitch-with-dpdk-overview.html OVS…