2.2 如何使用FlinkSQL读取写入到文件系统(HDFS\Local\Hive)

news2025/1/22 21:48:23

目录

1、文件系统 SQL 连接器

2、如何指定文件系统类型

3、如何指定文件格式

4、读取文件系统

4.1 开启 目录监控 

4.2 可用的 Metadata

5、写出文件系统

5.1 创建分区表

5.2 滚动策略、文件合并、分区提交

5.3 指定 Sink Parallelism

6、示例_通过FlinkSQL读取kafka在写入hive表

6.1、创建 kafka source表用于读取kafka

6.2、创建 hdfs sink表用于写出到hdfs

6.3、insert into 写入到 hdfs_sink_table

6.4、查询 hdfs_sink_table

6.5、创建hive表,指定local


1、文件系统 SQL 连接器

文件系统连接器允许从本地分布式文件系统进行读写数据

官网链接:文件系统 SQL 连接器


2、如何指定文件系统类型

创建表时通过 'path' = '协议名称:///path' 来指定 文件系统类型

参考官网:文件系统类型

CREATE TABLE filesystem_table (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  -- 本地文件系统
  'path' = 'file:///URI',
  -- HDFS文件系统
  'path' = 'hdfs://URI',
  -- 阿里云对象存储 
  'path' = 'oss://URI',
  'format' = 'json'
);

3、如何指定文件格式

FlinkSQL 文件系统连接器支持多种format,来读取和写入文件

比如当读取的source格式为 csv、json、Parquet... 可以在建表是指定相应的格式类型

来对数据进行解析后映射到表中的字段中

CREATE TABLE filesystem_table_file_format (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  -- 指定文件格式类型
  'format' = 'json|csv|orc|raw'
);

4、读取文件系统

FlinkSQL可以将单个文件或整个目录的数据读取到单个表中

注意:

        1、当读取目录时,对目录中的文件进行 无序的读取

        2、默认情况下,读取文件时为批处理模式,只会扫描配置路径一遍后就会停止

             当开启目录监控(source.monitor-interval)时,才是流处理模式

4.1 开启 目录监控 

通过设置 source.monitor-interval 属性来开启目录监控,以便在新文件出现时继续扫描

注意:

        只会对指定目录内新增文件进行读取,不会读取更新后的旧文件

-- 目录监控
drop table filesystem_source_table;
CREATE TABLE filesystem_source_table (
  id INT,
  name STRING,
  `file.name` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1016',
  'format' = 'json',
  'source.monitor-interval' = '3' -- 开启目录监控,设置监控时间间隔
);

-- 持续读取
select * from filesystem_source_table;

4.2 可用的 Metadata

使用FLinkSQL读取文件系统中的数据时,支持对 metadata 进行读取

注意: 所有 metadata 都是只读的

-- 可用的Metadata
drop table filesystem_source_table_read_metadata;
CREATE TABLE filesystem_source_table_read_metadata (
  id INT,
  name STRING,
  `file.path` STRING NOT NULL METADATA,
  `file.name` STRING NOT NULL METADATA,
  `file.size` BIGINT NOT NULL METADATA,
  `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
  'format' = 'json'
);

select * from filesystem_source_table_read_metadata;

运行结果:


5、写出文件系统

5.1 创建分区表

FlinkSQL支持创建分区表,并且通过 insert into(追加) insert overwrite(覆盖) 写入数据

-- 创建分区表
drop table filesystem_source_table_partition;
CREATE TABLE filesystem_source_table_partition (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
  'partition.default-name' = 'default_partition',
  'format' = 'json'
);

-- 动态分区写入
insert into filesystem_source_table_partition
SELECT * FROM (VALUES
  (1,'a','20231010')
, (2,'b','20231010')
, (3,'c','20231011')
, (4,'d','20231011')
, (5,'e','20231012')
, (6,'f','20231012')
) AS user1 (id,name,ds);

-- 静态分区写入
insert into filesystem_source_table_partition partition(ds = '20231010')
SELECT * FROM (VALUES
  (1,'a')
, (2,'b')
, (3,'c')
, (4,'d')
, (5,'e')
, (6,'f')
) AS user1 (id,name);

-- 查询分区表数据
select * from filesystem_source_table_partition where ds = '20231010';

5.2 滚动策略、文件合并、分区提交

可以看之前的博客:flink写入文件时分桶策略

官网链接:官网分桶策略


5.3 指定 Sink Parallelism

当使用FlinkSQL写出到文件系统时,可以通过 sink.parallelism 设置sink算子的并行度

注意:当且仅当上游的 changelog 模式为 INSERT-ONLY 时,才支持配置 sink parallelism。否则,程序将会抛出异常

CREATE TABLE hdfs_sink_table (
  `log` STRING,
  `dt` STRING,  -- 分区字段,天
  `hour` STRING  -- 分区字段,小时
) partitioned by (dt,`hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
  'sink.parallelism' = '2', -- 指定sink算子并行度
  'format' = 'raw'
);

6、示例_通过FlinkSQL读取kafka在写入hive表

需求:

        使用FlinkSQL将kafka数据写入到hdfs指定目录中

        根据kafka的timestamp进行分区(按小时分区)

6.1、创建 kafka source表用于读取kafka

-- TODO 创建读取kafka表时,同时读取kafka元数据字段
drop table kafka_source_table;
CREATE TABLE kafka_source_table(
  `log` STRING,
  `timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- 消息的时间戳
) WITH (
  'connector' = 'kafka',
  'topic' = '20231017',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'raw'
);

6.2、创建 hdfs sink表用于写出到hdfs

drop table hdfs_sink_table;
CREATE TABLE hdfs_sink_table (
  `log` STRING,
  `dt` STRING,  -- 分区字段,天
  `hour` STRING  -- 分区字段,小时
) partitioned by (dt,`hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
  'sink.parallelism' = '2', -- 指定sink算子并行度
  'format' = 'raw'
);

6.3、insert into 写入到 hdfs_sink_table

-- 流式 sql,插入文件系统表
insert into hdfs_sink_table
select 
	log
	,DATE_FORMAT(`timestamp`,'yyyyMMdd') as dt
	,DATE_FORMAT(`timestamp`,'HH') as `hour`
from kafka_source_table;

6.4、查询 hdfs_sink_table

-- 批式 sql,使用分区修剪进行选择
select * from hdfs_sink_table;

6.5、创建hive表,指定local

create table `kafka_to_hive` (
`log` string comment '日志数据')
 comment '埋点日志数据' PARTITIONED BY (dt string,`hour` string) 
row format delimited fields terminated by '\t' lines terminated by '\n' stored as orc
LOCATION 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka';

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1102136.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

力扣:133. 克隆图(Python3)

题目: 给你无向连通图中一个节点的引用,请你返回该图的深拷贝(克隆)。 图中的每个节点都包含它的值 val(int) 和其邻居的列表(list[Node])。 class Node {public int val;public Lis…

Excel提高工作效率常用功能

定位快捷键使用 CtrlG或者F5 根据不同类别插入空行 例:以下表,以部门为单位,每个部门后插入空白行 部门姓名出勤基本工资岗位津贴公体加班绩效基数工龄应发合计财务部姓名73115002101710财务部姓名11116006003401502363财务部姓名5271000…

求解八皇后问题

一、实验目的 利用回溯法搜索或爬山法找到八皇后问题的一个可行解。 二、实验内容 有一个 8 8 的棋盘,现在要将8个皇后放到棋盘上,满足:对于每一个皇后,在 自己所在的行、列、两个对角线都没有其他皇后。求所有满足的摆放方式…

大模型技术实践(五)|支持千亿参数模型训练的分布式并行框架

在上一期的大模型技术实践中,我们介绍了增加式方法、选择式方法和重新参数化式方法三种主流的参数高效微调技术(PEFT)。微调模型可以让模型更适合于我们当前的下游任务,但当模型过大或数据集规模很大时,单个加速器&…

基于springboot实现酒店管理系统平台项目【项目源码+论文说明】

摘要 21世纪的今天,随着社会的不断发展与进步,人们对于信息科学化的认识,已由低层次向高层次发展,由原来的感性认识向理性认识提高,管理工作的重要性已逐渐被人们所认识,科学化的管理,使信息存…

Flow深入浅出系列之在ViewModels中使用Kotlin Flows

Flow深入浅出系列之在ViewModels中使用Kotlin FlowsFlow深入浅出系列之更聪明的分享 Kotlin FlowsFlow深入浅出系列之使用Kotlin Flow自动刷新Android数据的策略 Flow深入浅出系列之在ViewModels中使用Kotlin Flows Flow出现后,LiveData仍然可以用,并且…

基于springboot实现家具网站设计与实现平台项目【项目源码+论文说明】

摘要 随着移动互联网技术的深入发展,电子商务也不断的完善,线上销售额不断提高,网络消费成为人民日常生活的一部分。并且随着电子商务的发展,也呈现出多元化方向,各种农村电商、生鲜电商、家具电商等,带动…

AI 悄然变天:这家平台为何能俘获众多明星大模型「芳心」?

整个AI领域,GPT-4 发布无疑成为载入 AI 史册的大事件。但其还留下来一些发展空间,其不可能把所有的事情都做完。比如,涉及小数、分数的运算,GPT-4 可能给不出正确答案(其多位乘法运算准确率仅为 4.3%)。 可…

变电站监控无人值守:电力数字化、智能化趋势、技术与应用

随着电力行业的快速发展,变电站监控系统的升级和改造已成为行业的重要议题。其中,实现无人值守的监控模式成为现代变电站运行的关键。 一、变电站监控无人值守的趋势 随着科技的不断进步,电力行业正在逐步实现智能化、自动化的转型。变电…

centos 7.9 源码安装htop

1.下载源码 wget http://sourceforge.net/projects/htop/files/latest/download 2.上传到tmp目录,并解压 tar xvzf htop-1.0.2.tar.gz mv htop-1.0.2 /opt/ 进入到 cd /opt/htop-1.0.2/ 3.编译并安装 ./configure && make && make install 4.…

ESDA in PySal (6):评估空间异方差的局部模式:LOSH

ESDA in PySal (6):评估空间异方差的局部模式:LOSH 在下面的笔记本中,我们回顾了 Ord 和 Getis (2012) 提出的局部空间异方差 (LOSH) 统计量 ( H i H_i Hi​) -y)。 LOSH 旨在作为分析空间过程平均水平的本地统计数据的补充。 LO…

el-checkbox-group变成竖着的样式

加 style"display: block; padding-top: 10px; margin-left: 27px" <el-checkbox:indeterminate"isIndeterminate"v-model"checkAll"change"handleCheckAllChange">全选&#xff08;{{ memberList.length }}&#xff09;</el…

从Github中下载部分文件

我们经常回去Github中下载代码&#xff0c;但仓库中存在很多project代码。但我们如果只需要某一个或几个项目的代码&#xff0c;此时应该如何操作呢&#xff1f; 这里介绍两款工具&#xff0c;可以从仓库中下载部分文件的小工具: DownGit 和 GitZip 1. DownGit downGit 国内镜…

基于springboot实现大学生社团活动平台项目【项目源码+论文说明】

摘要 21世纪的今天&#xff0c;随着社会的不断发展与进步&#xff0c;人们对于信息科学化的认识&#xff0c;已由低层次向高层次发展&#xff0c;由原来的感性认识向理性认识提高&#xff0c;网络管理工作的重要性已逐渐被人们所认识&#xff0c;科学化的管理&#xff0c;使信…

线程池执行流程

源码分析 execute&#xff08;提交&#xff09;方法源码&#xff1a; public void execute(Runnable command) {if (command null)throw new NullPointerException();int c ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c ct…

python打开.npy文件的常见报错及解决

import numpy as npdata np.load("texture_data_256.npy") print(data) 解决办法&#xff1a; import numpy as npdata np.load("texture_data_256.npy",allow_pickleTrue) print(data) 再次运行后出现乱码&#xff01;&#xff01;&#xff01; 由于…

山西电力市场日前价格预测【2023-10-18】

日前价格预测 预测说明&#xff1a; 如上图所示&#xff0c;预测明日&#xff08;2023-10-18&#xff09;山西电力市场全天平均日前电价为348.72元/MWh。其中&#xff0c;最高日前电价为505.50元/MWh&#xff0c;预计出现在18: 00。最低日前电价为288.10元/MWh&#xff0c;预计…

安科瑞能耗监测系统在新集卫生院综合楼、急诊楼的设计与应用

安科瑞 崔丽洁 摘要&#xff1a;针对医院建筑能耗高且能源管理不合理的问题&#xff0c;利用计算机网络技术、通讯技术、计量控制技术等信息化技术&#xff0c;实现能源资源分类分项计量和能源资源运行监管功能&#xff0c;清晰描述建筑内总的用能现状&#xff1b;实时监测各供…

什么是著作权?对此你了解多少?

在当今信息爆炸的时代&#xff0c;著作权成为一个备受关注的话题。创作是人类文明的重要组成部分&#xff0c;而著作权是创作者对自己作品的劳动和智慧的一种保护。很多人还不太了解著作权&#xff0c;那么希望看完此文&#xff0c;你会对它有一个新的认识。 一、著作权的概念 …

入职后快速配置mac方便快速上手业务for研测向

文章目录 下载基本工具配置 mac 基本修改 mac 密码apple id 登录mac 手势操作 配置开发环境homebrewgitjava/goland/pythonmavenrpcadbmysqlredis前端环境 软件配置配置软件通知chrome 配置配置 jetbrains 插件chrome 插件配置 iterm2配置邮箱视频软件配置软件登录 公司相关配置…