36、Flink 的 Formats 之Parquet 和 Orc Format

news2024/11/24 17:08:24

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
21、Flink 的table API与DataStream API 集成(完整版)
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
25、Flink 的table api与sql之函数(自定义函数示例)
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
33、Flink 的Table API 和 SQL 中的时区
35、Flink 的 Formats 之CSV 和 JSON Format
36、Flink 的 Formats 之Parquet 和 Orc Format
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的


文章目录

  • Flink 系列文章
  • 一、Orc Format
    • 1、maven 依赖
    • 2、Flink sql client 建表示例
      • 1)、增加ORC文件解析的类库
      • 2)、生成ORC文件
      • 3)、建表
      • 4)、验证
    • 3、table api建表示例
      • 1)、源码
      • 2)、运行结果
      • 3)、maven依赖
    • 4、Format 参数
    • 5、数据类型映射
  • 二、Parquet Format
    • 1、maven 依赖
    • 2、Flink sql client 建表示例
      • 1)、增加parquet文件解析类库
      • 2)、生成parquet文件
      • 3)、建表
      • 4)、验证
    • 3、table api建表示例
      • 1)、源码
      • 2)、运行结果
      • 3)、maven依赖
    • 4、Format 参数
    • 5、数据类型映射


本文介绍了Flink 支持的数据格式中的ORC和Parquet,并分别以sql和table api作为示例进行了说明。
本文依赖flink、kafka、hadoop(3.1.4版本)集群能正常使用。
本文分为2个部分,即ORC和Parquet Format。
本文的示例是在Flink 1.17版本(flink 集群和maven均是Flink 1.17)中运行。

一、Orc Format

Apache Orc Format 允许读写 ORC 数据。

1、maven 依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-orc</artifactId>
  <version>1.17.1</version>
</dependency>

下面的依赖视情况而定,有些可能会出现guava的冲突,如果出现冲突可能需要把下面的maven依赖。

    	<dependency>
			<groupId>com.google.guava</groupId>
			<artifactId>guava</artifactId>
			<version>32.0.1-jre</version>
		</dependency> 

2、Flink sql client 建表示例

下面是一个用 Filesystem connector 和 Orc format 创建表格的例子

1)、增加ORC文件解析的类库

需要将flink-sql-orc-1.17.1.jar 放在 flink的lib目录下,并重启flink服务。
该文件可以在链接中下载。

2)、生成ORC文件

该步骤需要借助于原hadoop生成的文件,可以参考文章:21、MapReduce读写SequenceFile、MapFile、ORCFile和ParquetFile文件
测试数据文件可以自己准备,不再赘述。
特别需要说明的是ORC文件的SCHEMA 需要和建表的字段名称和类型保持一致。

struct<id:string,type:string,orderID:string,bankCard:string,ctime:string,utime:string>

源码

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.orc.OrcConf;
import org.apache.orc.TypeDescription;
import org.apache.orc.mapred.OrcStruct;
import org.apache.orc.mapreduce.OrcOutputFormat;

/**
 * @author alanchan
 * 读取普通文本文件转换为ORC文件
 */
public class WriteOrcFile extends Configured implements Tool {
	static String in = "D:/workspace/bigdata-component/hadoop/test/in/orc";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out/orc";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		int status = ToolRunner.run(conf, new WriteOrcFile(), args);
		System.exit(status);
	}

	@Override
	public int run(String[] args) throws Exception {
		// 设置Schema
		OrcConf.MAPRED_OUTPUT_SCHEMA.setString(this.getConf(), SCHEMA);

		Job job = Job.getInstance(getConf(), this.getClass().getName());
		job.setJarByClass(this.getClass());

		job.setMapperClass(WriteOrcFileMapper.class);
		job.setMapOutputKeyClass(NullWritable.class);
		job.setMapOutputValueClass(OrcStruct.class);

		job.setNumReduceTasks(0);

		// 配置作业的输入数据路径
		FileInputFormat.addInputPath(job, new Path(in));

		// 设置作业的输出为MapFileOutputFormat
		job.setOutputFormatClass(OrcOutputFormat.class);

		Path outputDir = new Path(out);
		outputDir.getFileSystem(this.getConf()).delete(outputDir, true);
		FileOutputFormat.setOutputPath(job, outputDir);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	// 定义数据的字段信息
//数据格式	
//	id                                  ,type    ,orderID                               ,bankCard,ctime              ,utime
//	2.0191130220014E+27,ALIPAY,191130-461197476510745,356886,,
//	2.01911302200141E+27,ALIPAY,191130-570038354832903,404118,2019/11/30 21:44,2019/12/16 14:24
//	2.01911302200143E+27,ALIPAY,191130-581296620431058,520083,2019/11/30 18:17,2019/12/4 20:26
//	2.0191201220014E+27,ALIPAY,191201-311567320052455,622688,2019/12/1 10:56,2019/12/16 11:54
	private static final String SCHEMA = "struct<id:string,type:string,orderID:string,bankCard:string,ctime:string,utime:string>";

	static class WriteOrcFileMapper extends Mapper<LongWritable, Text, NullWritable, OrcStruct> {
		// 获取字段描述信息
		private TypeDescription schema = TypeDescription.fromString(SCHEMA);
		// 构建输出的Key
		private final NullWritable outputKey = NullWritable.get();
		// 构建输出的Value为ORCStruct类型
		private final OrcStruct outputValue = (OrcStruct) OrcStruct.createValue(schema);

		protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
			// 将读取到的每一行数据进行分割,得到所有字段
			String[] fields = value.toString().split(",", 6);
			// 将所有字段赋值给Value中的列
			outputValue.setFieldValue(0, new Text(fields[0]));
			outputValue.setFieldValue(1, new Text(fields[1]));
			outputValue.setFieldValue(2, new Text(fields[2]));
			outputValue.setFieldValue(3, new Text(fields[3]));
			outputValue.setFieldValue(4, new Text(fields[4]));
			outputValue.setFieldValue(5, new Text(fields[5]));

			context.write(outputKey, outputValue);
		}
	}

}

将生成的文件上传至hdfs://server1:8020/flinktest/orctest/下。

至此,准备环境与数据已经完成。

3)、建表

需要注意的是字段的名称与类型,需要和orc文件的schema保持一致,否则读取不到文件内容。

CREATE TABLE alan_orc_order (
  id STRING,
  type STRING,
  orderID STRING,
  bankCard STRING,
  ctime STRING,
  utime STRING
) WITH (
 'connector' = 'filesystem',
 'path' = 'hdfs://server1:8020/flinktest/orctest/',
 'format' = 'orc'
);

Flink SQL> CREATE TABLE alan_orc_order (
>   id STRING,
>   type STRING,
>   orderID STRING,
>   bankCard STRING,
>   ctime STRING,
>   utime STRING
> ) WITH (
>  'connector' = 'filesystem',
>  'path' = 'hdfs://server1:8020/flinktest/orctest/',
>  'format' = 'orc'
> );
[INFO] Execute statement succeed.

4)、验证

Flink SQL> select * from alan_orc_order limit 10;
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                             id |                           type |                        orderID |                       bankCard |                          ctime |                          utime |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |            2.0191130220014E+27 |                         ALIPAY |         191130-461197476510745 |                         356886 |                                |                                |
| +I |           2.01911302200141E+27 |                         ALIPAY |         191130-570038354832903 |                         404118 |               2019/11/30 21:44 |               2019/12/16 14:24 |
| +I |           2.01911302200143E+27 |                         ALIPAY |         191130-581296620431058 |                         520083 |               2019/11/30 18:17 |                2019/12/4 20:26 |
| +I |            2.0191201220014E+27 |                         ALIPAY |         191201-311567320052455 |                         622688 |                2019/12/1 10:56 |               2019/12/16 11:54 |
| +I |                    2.01912E+27 |                         ALIPAY |         191201-216073503850515 |                         456418 |               2019/12/11 22:39 |                                |
| +I |                    2.01912E+27 |                         ALIPAY |         191201-072274576332921 |                         433668 |                                |                                |
| +I |                    2.01912E+27 |                         ALIPAY |         191201-088486052970134 |                         622538 |                2019/12/2 23:12 |                                |
| +I |                    2.01912E+27 |                         ALIPAY |         191201-492457166050685 |                         622517 |                 2019/12/1 0:42 |               2019/12/14 13:27 |
| +I |                    2.01912E+27 |                         ALIPAY |         191201-037136794432586 |                         622525 |                                |                                |
| +I |                    2.01912E+27 |                         ALIPAY |         191201-389779784790672 |                         486494 |                2019/12/1 22:25 |               2019/12/16 23:32 |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
Received a total of 10 rows

3、table api建表示例

通过table api建表,参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)

为了简单起见,本示例仅仅是通过sql建表,数据准备见上述示例。

1)、源码

下面是在本地运行的,建表的path也是用本地的。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author alanchan
 *
 */
public class TestORCFormatDemo {
	
	static String sourceSql = "CREATE TABLE alan_orc_order (\r\n" + 
			"  id STRING,\r\n" + 
			"  type STRING,\r\n" + 
			"  orderID STRING,\r\n" + 
			"  bankCard STRING,\r\n" + 
			"  ctime STRING,\r\n" + 
			"  utime STRING\r\n" + 
			") WITH (\r\n" + 
			" 'connector' = 'filesystem',\r\n" + 
			" 'path' = 'D:/workspace/bigdata-component/hadoop/test/out/orc',\r\n" + 
			" 'format' = 'orc'\r\n" + 
			")";

	public static void test1() throws Exception {
		// 1、创建运行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		// 建表
		tenv.executeSql(sourceSql);

		Table table = tenv.from("alan_orc_order"); 
		table.printSchema();
		tenv.createTemporaryView("alan_orc_order_v", table);
		tenv.executeSql("select * from alan_orc_order_v limit 10").print();;
//		table.execute().print();
		
				
		env.execute();
	}

	public static void main(String[] args) throws Exception {
		test1();
	}

}

2)、运行结果

(
  `id` STRING,
  `type` STRING,
  `orderid` STRING,
  `bankcard` STRING,
  `ctime` STRING,
  `utime` STRING
)

+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                             id |                           type |                        orderID |                       bankCard |                          ctime |                          utime |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |            2.0191130220014E+27 |                         ALIPAY |         191130-461197476510745 |                         356886 |                                |                                |
| +I |           2.01911302200141E+27 |                         ALIPAY |         191130-570038354832903 |                         404118 |               2019/11/30 21:44 |               2019/12/16 14:24 |
| +I |           2.01911302200143E+27 |                         ALIPAY |         191130-581296620431058 |                         520083 |               2019/11/30 18:17 |                2019/12/4 20:26 |
| +I |            2.0191201220014E+27 |                         ALIPAY |         191201-311567320052455 |                         622688 |                2019/12/1 10:56 |               2019/12/16 11:54 |
| +I |                    2.01912E+27 |                         ALIPAY |         191201-216073503850515 |                         456418 |               2019/12/11 22:39 |                                |
| +I |                    2.01912E+27 |                         ALIPAY |         191201-072274576332921 |                         433668 |                                |                                |
| +I |                    2.01912E+27 |                         ALIPAY |         191201-088486052970134 |                         622538 |                2019/12/2 23:12 |                                |
| +I |                    2.01912E+27 |                         ALIPAY |         191201-492457166050685 |                         622517 |                 2019/12/1 0:42 |               2019/12/14 13:27 |
| +I |                    2.01912E+27 |                         ALIPAY |         191201-037136794432586 |                         622525 |                                |                                |
| +I |                    2.01912E+27 |                         ALIPAY |         191201-389779784790672 |                         486494 |                2019/12/1 22:25 |               2019/12/16 23:32 |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
10 rows in set

3)、maven依赖

	<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-common</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-gateway -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-gateway</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_2.12</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-uber</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-runtime</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc</artifactId>
			<version>3.1.0-1.17</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.38</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
 		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-hive_2.12</artifactId>
			<version>1.17.0</version>
		</dependency>
    	<dependency>
			<groupId>com.google.guava</groupId>
			<artifactId>guava</artifactId>
			<version>32.0.1-jre</version>
		</dependency> 
		<!-- flink连接器 -->
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-connector-kafka</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-compress</artifactId>
			<version>1.24.0</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.2</version>
			<!-- <scope>provided</scope> -->
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-orc</artifactId>
			<version>1.17.1</version>
		</dependency>

		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
		  <groupId>org.apache.flink</groupId>
		  <artifactId>flink-parquet</artifactId>
		  <version>1.17.1</version>
		</dependency>
	</dependencies>

4、Format 参数

在这里插入图片描述

Orc 格式也支持来源于 Table properties 的表属性。
举个例子,你可以设置 orc.compress=SNAPPY 来允许spappy压缩。

5、数据类型映射

Orc 格式类型的映射和 Apache Hive 是兼容的。

下面的表格列出了 Flink 类型的数据和 Orc 类型的数据的映射关系。
在这里插入图片描述

二、Parquet Format

Apache Parquet 格式允许读写 Parquet 数据.

1、maven 依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-parquet</artifactId>
  <version>1.17.1</version>
</dependency>

2、Flink sql client 建表示例

以下为用 Filesystem 连接器和 Parquet 格式创建表的示例

1)、增加parquet文件解析类库

需要将flink-sql-parquet-1.17.1.jar 放在 flink的lib目录下,并重启flink服务。
该文件可以在链接中下载。

2)、生成parquet文件

该步骤需要借助于原hadoop生成的文件,可以参考文章:21、MapReduce读写SequenceFile、MapFile、ORCFile和ParquetFile文件
测试数据文件可以自己准备,不再赘述。

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroupFactory;
import org.apache.parquet.hadoop.ParquetOutputFormat;
import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Types;
import org.springframework.util.StopWatch;

/**
 * @author alanchan
 *
 */
public class WriteParquetFile extends Configured implements Tool {
	static String in = "D:/workspace/bigdata-component/hadoop/test/in/parquet";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out/parquet";

	public static void main(String[] args) throws Exception {
		StopWatch clock = new StopWatch();
		clock.start(WriteParquetFile.class.getSimpleName());

		Configuration conf = new Configuration();
		int status = ToolRunner.run(conf, new WriteParquetFile(), args);
		System.exit(status);

		clock.stop();
		System.out.println(clock.prettyPrint());
	}

	@Override
	public int run(String[] args) throws Exception {
		Configuration conf = getConf();
		// 此demo 输入数据为2列 city ip
		//输入文件格式:https://www.win.com/233434,8283140
		//							https://www.win.com/242288,8283139
		MessageType schema = Types.buildMessage().required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("city").required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8)
				.named("ip").named("pair");

		System.out.println("[schema]==" + schema.toString());

		GroupWriteSupport.setSchema(schema, conf);

		Job job = Job.getInstance(conf, this.getClass().getName());
		job.setJarByClass(this.getClass());

		job.setMapperClass(WriteParquetFileMapper.class);
		job.setInputFormatClass(TextInputFormat.class);
		job.setMapOutputKeyClass(NullWritable.class);
		// 设置value是parquet的Group
		job.setMapOutputValueClass(Group.class);
		
		FileInputFormat.setInputPaths(job, in);

		// parquet输出
		job.setOutputFormatClass(ParquetOutputFormat.class);
		ParquetOutputFormat.setWriteSupportClass(job, GroupWriteSupport.class);

		Path outputDir = new Path(out);
		outputDir.getFileSystem(this.getConf()).delete(outputDir, true);
		FileOutputFormat.setOutputPath(job, new Path(out));
        ParquetOutputFormat.setOutputPath(job, new Path(out));
//		ParquetOutputFormat.setCompression(job, CompressionCodecName.SNAPPY);
		job.setNumReduceTasks(0);

		return job.waitForCompletion(true) ? 0 : 1;
	}

	public static class WriteParquetFileMapper extends Mapper<LongWritable, Text, NullWritable, Group> {
		SimpleGroupFactory factory = null;

		protected void setup(Context context) throws IOException, InterruptedException {
			factory = new SimpleGroupFactory(GroupWriteSupport.getSchema(context.getConfiguration()));
		};

		public void map(LongWritable _key, Text ivalue, Context context) throws IOException, InterruptedException {
			Group pair = factory.newGroup();
			//截取输入文件的一行,且是以逗号进行分割
			String[] strs = ivalue.toString().split(",");
			pair.append("city", strs[0]);
			pair.append("ip", strs[1]);
			context.write(null, pair);
		}
	}
}

将生成的文件上传至hdfs://server1:8020/flinktest/parquettest/下。

3)、建表

需要注意的是字段的名称与类型,需要和parquet文件的schema保持一致,否则读取不到文件内容。

  • schema
MessageType schema = Types.buildMessage()
.required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("city")
.required(PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named("ip")
.named("pair");

// 以下是schema的内容
[schema]==message pair {
  required binary city (UTF8);
  required binary ip (UTF8);
}

  • 建表
CREATE TABLE alan_parquet_cityinfo (
  city STRING,
  ip STRING
) WITH (
 'connector' = 'filesystem',
 'path' = 'hdfs://server1:8020/flinktest/parquettest/',
 'format' = 'parquet'
);

Flink SQL> CREATE TABLE alan_parquet_cityinfo (
>   city STRING,
>   ip STRING
> ) WITH (
>  'connector' = 'filesystem',
>  'path' = 'hdfs://server1:8020/flinktest/parquettest/',
>  'format' = 'parquet'
> );
[INFO] Execute statement succeed.

4)、验证

Flink SQL> select * from alan_parquet_cityinfo limit 10;
+----+--------------------------------+--------------------------------+
| op |                           city |                             ip |
+----+--------------------------------+--------------------------------+
| +I |     https://www.win.com/237516 |                        8284068 |
| +I |     https://www.win.com/242247 |                        8284067 |
| +I |     https://www.win.com/243248 |                        8284066 |
| +I |     https://www.win.com/243288 |                        8284065 |
| +I |     https://www.win.com/240213 |                        8284064 |
| +I |     https://www.win.com/239907 |                        8284063 |
| +I |     https://www.win.com/235270 |                        8284062 |
| +I |     https://www.win.com/234366 |                        8284061 |
| +I |     https://www.win.com/229297 |                        8284060 |
| +I |     https://www.win.com/237757 |                        8284059 |
+----+--------------------------------+--------------------------------+
Received a total of 10 rows

3、table api建表示例

通过table api建表,参考文章:
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
为了简单起见,本示例仅仅是通过sql建表,数据准备见上述示例。

1)、源码

下面是在本地运行的,建表的path也是用本地的。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author alanchan
 *
 */
public class TestParquetFormatDemo {

	static String sourceSql = "CREATE TABLE alan_parquet_cityinfo (\r\n" + 
			"  city STRING,\r\n" + 
			"  ip STRING\r\n" + 
			") WITH (\r\n" + 
			" 'connector' = 'filesystem',\r\n" + 
			" 'path' = 'D:/workspace/bigdata-component/hadoop/test/out/parquet',\r\n" + 
			" 'format' = 'parquet'\r\n" + 
			");";

	public static void test1() throws Exception {
		// 1、创建运行环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		// 建表
		tenv.executeSql(sourceSql);

		Table table = tenv.from("alan_parquet_cityinfo");
		table.printSchema();
		tenv.createTemporaryView("alan_parquet_cityinfo_v", table);
		tenv.executeSql("select * from alan_parquet_cityinfo_v limit 10").print();

//		table.execute().print();

		env.execute();
	}

	public static void main(String[] args) throws Exception {
		test1();
	}

}

2)、运行结果

(
  `city` STRING,
  `ip` STRING
)

+----+--------------------------------+--------------------------------+
| op |                           city |                             ip |
+----+--------------------------------+--------------------------------+
| +I |     https://www.win.com/237516 |                        8284068 |
| +I |     https://www.win.com/242247 |                        8284067 |
| +I |     https://www.win.com/243248 |                        8284066 |
| +I |     https://www.win.com/243288 |                        8284065 |
| +I |     https://www.win.com/240213 |                        8284064 |
| +I |     https://www.win.com/239907 |                        8284063 |
| +I |     https://www.win.com/235270 |                        8284062 |
| +I |     https://www.win.com/234366 |                        8284061 |
| +I |     https://www.win.com/229297 |                        8284060 |
| +I |     https://www.win.com/237757 |                        8284059 |
+----+--------------------------------+--------------------------------+
10 rows in set

3)、maven依赖

	<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.17.0</flink.version>
	</properties>

	<dependencies>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-common</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-gateway -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-gateway</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner_2.12</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-api-java-uber -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-uber</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-runtime</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc</artifactId>
			<version>3.1.0-1.17</version>
		</dependency>
		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.38</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-hive -->
 		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-hive_2.12</artifactId>
			<version>1.17.0</version>
		</dependency>
    	<dependency>
			<groupId>com.google.guava</groupId>
			<artifactId>guava</artifactId>
			<version>32.0.1-jre</version>
		</dependency> 
		<!-- flink连接器 -->
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-connector-kafka</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress -->
		<dependency>
			<groupId>org.apache.commons</groupId>
			<artifactId>commons-compress</artifactId>
			<version>1.24.0</version>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.2</version>
			<!-- <scope>provided</scope> -->
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-orc</artifactId>
			<version>1.17.1</version>
		</dependency>

		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
		  <groupId>org.apache.flink</groupId>
		  <artifactId>flink-parquet</artifactId>
		  <version>1.17.1</version>
		</dependency>
	</dependencies>

4、Format 参数

在这里插入图片描述
Parquet 格式也支持 ParquetOutputFormat 的配置。
例如, 可以配置 parquet.compression=GZIP 来开启 gzip 压缩。

5、数据类型映射

截至Flink 1.17 版本 ,Parquet 格式类型映射与 Apache Hive 兼容,但与 Apache Spark 有所不同:

  • Timestamp:不论精度,映射 timestamp 类型至 int96。
  • Decimal:根据精度,映射 decimal 类型至固定长度字节的数组。

下表列举了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。
在这里插入图片描述
以上,介绍了Flink 支持的数据格式中的ORC和Parquet,并分别以sql和table api作为示例进行了说明。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1217671.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NewStarCTF2023 Reverse方向Week3 ez_chal WP

分析 题目&#xff1a;ez_chal 一个XTEA加密&#xff0c; V6是key&#xff0c;v5是输入&#xff0c;然后v7就是密文。 看了v6&#xff0c;要用动调。 ELF文件用ida的远程调试。 然后在kali上输入长度为32的flag 全部转换成dd 再提取密文。 EXP #include <stdio.h>…

详解如何使用Jenkins一键打包部署SpringBoot项目

目录 1、Jenkins简介 2、Jenkins的安装及配置 2.1、Docker环境下的安装​编辑 2.2、Jenkins的配置 3、打包部署SpringBoot应用 3.1、在Jenkins中创建执行任务 3.2、测试结果 1、Jenkins简介 任何简单操作的背后&#xff0c;都有一套相当复杂的机制。本文将以SpringBoot应…

Docker安装MinIO遇到的(汇总——持续更新中)

文章目录 Docker安装MinIO遇到的坑前言问题1&#xff1a;执行docker run报错Error response from daemon问题2&#xff1a;启动MinIO容器浏览器无法访问问题3&#xff1a;上传文件报错InvalidResponseException问题4&#xff1a;上传文件报错Connection refused最终的启动指令问…

锐捷练习-ospf虚链路及rip路由相互引入

一、相关知识补充 1、ospf基本概述 OSPF&#xff08;Open Shortest Path First&#xff09;是一种链路状态路由协议&#xff0c;用于在计算机网络中进行路由选择。它是内部网关协议&#xff08;IGP&#xff09;之一&#xff0c;常用于大规模企业网络或互联网服务提供商的网络…

修改服务器端Apache默认根目录

目标&#xff1a;修改默认Apache网站根目录 /var/www/html 一、找到 DocumentRoot “/var/www/html” 这一段 apache的根目录&#xff0c;把/var/www/html 这个目录改 #DocumentRoot "/var/www/html" DocumentRoot "/home/cloud/tuya_mini_h5/build" 二、…

Resolume Arena 7.15.0(VJ音视频软件)

Resolume Arena 7是一款专业的实时视觉效果软件&#xff0c;用于创造引人入胜的视频演出和灯光秀。它提供了丰富多样的功能和工具&#xff0c;可以将音频、视频和图像合成在一起&#xff0c;创造出令人惊叹的视觉效果。 Resolume Arena 7支持多种媒体格式&#xff0c;包括视频文…

C语言的动态内存管理

目录 一、malloc函数 二、free函数 三、calloc函数 四、realloc函数 五、realloc函数原地扩容和异地扩容测试 六、动态内存管理的注意事项 一、malloc函数 1.头文件&#xff1a;stdlib.h&#xff08;malloc.h&#xff09; 2.函数原型&#xff1a;void * malloc(size_t siz…

春秋云境靶场CVE-2021-41402漏洞复现(任意代码执行漏洞)

文章目录 前言一、CVE-2021-41402描述二、CVE-2021-41402漏洞复现1、信息收集1、方法一弱口令bp爆破2、方法二7kb扫路径&#xff0c;后弱口令爆破 2、找可能可以进行任意php代码执行的地方3、漏洞利用找flag 总结 前言 此文章只用于学习和反思巩固渗透测试知识&#xff0c;禁止…

【ROS导航Navigation】五 | 导航相关的消息 | 地图 | 里程计 | 坐标变换 | 定位 | 目标点和路径规划 | 激光雷达 | 相机

致谢&#xff1a;ROS赵虚左老师 Introduction Autolabor-ROS机器人入门课程《ROS理论与实践》零基础教程 参考赵虚左老师的实战教程 一、地图 nav_msgs/MapMetaData 地图元数据&#xff0c;包括地图的宽度、高度、分辨率等。 nav_msgs/OccupancyGrid 地图栅格数据&#…

shell脚本学习06(小滴课堂)

fi是结束循环的意思。 这里脚本1&#xff1a;代表着脚本和1.txt文件处于同一目录下。 脚本2为绝对路径的写法。 在使用./进行启动时&#xff0c;我们需要给文件赋予执行权限。 把文件名改为2.txt: 什么都没有返回&#xff0c;说明文件已经不存在。 可以使用脚本2 if else的方式…

48v变12v同步转换芯片

48v变12v同步转换芯片 以下是一篇关于48V变12V同步转换器WD5105ic的文章正文&#xff1a;48V变12V同步转换器WD5105ic是一种电源管理芯片&#xff0c;它可以将48V的直流电压转换为12V的直流电压。这款芯片具有广泛的应用范围&#xff0c;包括车载充电器件、电动车仪表器件、电…

【LabVIEW学习】2.for,while,事件

1.for实例&#xff08;随机输出数据100次&#xff09; 结果&#xff1a; 2.while实例&#xff08;i<50灯亮&#xff0c;大于之后灯灭&#xff09; 结果&#xff1a;&#xff08;先亮后灭&#xff09; 3.事件结构的实例&#xff08;点击按钮数据增加&#xff09;事件监听应该…

6.运行mysql容器-理解容器数据卷

运行mysql容器-理解容器数据卷 1.什么是容器数据卷2.如何使用容器数据卷2.1 数据卷挂载命令2.2 容器数据卷的继承2.3 数据卷的读写权限2.4 容器数据卷的小实验&#xff08;加深理解&#xff09;2.4.1 启动挂载数据卷的centos容器2.4.2 启动后&#xff0c;在宿主机的data目录下会…

reids管道

如何优化频繁命令往返造成的性能瓶颈&#xff1f; 如果同时需要执行大量的命令&#xff0c;那么就要等待上一条命令应答后再执行&#xff0c;这中间不仅仅多了RTT&#xff08;Round Time Trip&#xff09;&#xff0c;而且还频繁调用系统IO&#xff0c;发送网络请求&#xff0c…

SSM项目初始化流程与操作概念解释-SpringBoot简化版

文章目录 1.引入概念2.导入依赖3.项目配置4.依照SpringMVC框架构建项目 1.引入概念 例如某一个XX系统&#xff0c;该系统存在前台页面&#xff08;给用户直观看或使用&#xff09;&#xff0c;和后台页面&#xff08;给管理人员调整数据和权限&#xff09;。 这二个页面都通过…

QT自定义信号,信号emit,信号参数注册

qt如何自定义信号 使用signals声明返回值是void在需要发送信号的地方使用 emit 信号名字(参数)进行发送 在需要链接的地方使用connect进行链接 ct进行链接

pytorch.nn.Conv1d详解

通读了从论文中找的代码&#xff0c;终于找到这个痛点了&#xff01; 以下详解nn.Conv1d方法 1 参数说明 in_channels(int) – 输入信号的通道。 out_channels(int) – 卷积产生的通道。 kernel_size(int or tuple) - 卷积核的尺寸&#xff0c;经测试后卷积核的大小应为in_cha…

阅读记录【PMLR2023】The Aggregation–Heterogeneity Trade-off in Federated Learning

The Aggregation–Heterogeneity Trade-off in Federated Learning Abstract 机器学习的传统观点认为&#xff0c;训练模型的数据越多&#xff0c;模型的性能就越好。因此&#xff0c;人们开发了多种联邦学习方法来聚合尽可能多的本地样本。与这种观点相反&#xff0c;本文表…

字母不重复的子串-第15届蓝桥第二次STEMA测评Scratch真题精选

[导读]&#xff1a;超平老师的《Scratch蓝桥杯真题解析100讲》已经全部完成&#xff0c;后续会不定期解读蓝桥杯真题&#xff0c;这是Scratch蓝桥杯真题解析第158讲。 第15届蓝桥第2次STEMA测评已于2023年10月29日落下帷幕&#xff0c;编程题一共有6题&#xff0c;分别如下&am…

2023年【化工自动化控制仪表】找解析及化工自动化控制仪表试题及解析

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 化工自动化控制仪表找解析参考答案及化工自动化控制仪表考试试题解析是安全生产模拟考试一点通题库老师及化工自动化控制仪表操作证已考过的学员汇总&#xff0c;相对有效帮助化工自动化控制仪表试题及解析学员顺利通…