使用 Apache Flink 开发实时 ETL

news2024/12/24 11:42:08

Apache Flink 是大数据领域又一新兴框架。它与 Spark 的不同之处在于,它是使用流式处理来模拟批量处理的,因此能够提供亚秒级的、符合 Exactly-once 语义的实时处理能力。Flink 的使用场景之一是构建实时的数据通道,在不同的存储之间搬运和转换数据。本文将介绍如何使用 Flink 开发实时 ETL 程序,并介绍 Flink 是如何保证其 Exactly-once 语义的。

示例程序

让我们来编写一个从 Kafka 抽取数据到 HDFS 的程序。数据源是一组事件日志,其中包含了事件发生的时间,以时间戳的方式存储。我们需要将这些日志按事件时间分别存放到不同的目录中,即按日分桶。时间日志示例如下:

{"timestamp":1545184226.432,"event":"page_view","uuid":"ac0e50bf-944c-4e2f-bbf5-a34b22718e0c"}
{"timestamp":1545184602.640,"event":"adv_click","uuid":"9b220808-2193-44d1-a0e9-09b9743dec55"}
{"timestamp":1545184608.969,"event":"thumbs_up","uuid":"b44c3137-4c91-4f36-96fb-80f56561c914"}

产生的目录结构为:

/user/flink/event_log/dt=20181219/part-0-1
/user/flink/event_log/dt=20181220/part-1-9

创建 Flink 项目

官方提供了快速生成 Flink 项目的模板,可以直接运行下面命令,这里我使用的是 flink 1.9 版本

 $ mvn archetype:generate                              \
      -DarchetypeGroupId=org.apache.flink              \
      -DarchetypeArtifactId=flink-quickstart-scala     \
      -DarchetypeVersion=1.9.0

将生成好的代码导入到 IDE 中,可以看到名为 StreamingJob 的文件,我们由此开始编写程序。

Kafka 数据源

Flink 对 Kafka 数据源提供了 原生支持,我们需要选择正确的 Kafka 依赖版本,将其添加到 POM 文件中:

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
FlinkKafkaConsumer010 consumer = new FlinkKafkaConsumer010 < > (
    "flink_test", new SimpleStringSchema(), props);
DataStream stream = env.addSource(consumer);

Flink 会连接本地的 Kafka 服务,读取 flink_test 主题中的数据,转换成字符串后返回。除了 SimpleStringSchema,Flink 还提供了其他内置的反序列化方式,如 JSON、Avro 等,我们也可以编写自定义逻辑。

流式文件存储

StreamingFileSink 替代了先前的 BucketingSink,用来将上游数据存储到 HDFS 的不同目录中。它的核心逻辑是分桶,默认的分桶方式是 DateTimeBucketAssigner,即按照处理时间分桶。处理时间指的是消息到达 Flink 程序的时间,这点并不符合我们的需求。因此,我们需要自己编写代码将事件时间从消息体中解析出来,按规则生成分桶的名称:

public class EventTimeBucketAssigner implements BucketAssigner < String, String > {@
    Override
    public String getBucketId(String element, Context context) {
        JsonNode node = mapper.readTree(element);
        long date = (long)(node.path("timestamp").floatValue() * 1000);
        String partitionValue = new SimpleDateFormat("yyyyMMdd").format(new Date(date));
        return "dt=" + partitionValue;
    }
}

上述代码会使用 Jackson 库对消息体进行解析,将时间戳转换成日期字符串,添加前缀后返回。如此一来,StreamingFileSink 就能知道应该将当前记录放置到哪个目录中了。完整代码可以参考 GitHub(链接)。

StreamingFileSink sink = StreamingFileSink
    .forRowFormat(new Path("/tmp/kafka-loader"), new SimpleStringEncoder())
    .withBucketAssigner(new EventTimeBucketAssigner())
    .build();
stream.addSink(sink);

forRowFormat 表示输出的文件是按行存储的,对应的有 forBulkFormat,可以将输出结果用 Parquet 等格式进行压缩存储。

关于 StreamingFileSink 还有一点要注意,它只支持 Hadoop 2.7 以上的版本,因为需要用到高版本文件系统提供的 truncate 方法来实现故障恢复,这点下文会详述。

开启检查点

代码编写到这里,其实已经可以通过 env.execute() 来运行了。但是,它只能保证 At-least-once 语义,即消息有可能会被重复处理。要做到 Exactly-once,我们还需要开启 Flink 的检查点功能:

env.enableCheckpointing(60 _000);
env.setStateBackend((StateBackend) new FsStateBackend("/tmp/flink/checkpoints"));
env.getCheckpointConfig().enableExternalizedCheckpoints(
    ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);

检查点(Checkpoint)是 Flink 的故障恢复机制,同样会在下文详述。代码中,我们将状态存储方式由 MemoryStateBackend 修改为了 FsStateBackend,即使用外部文件系统,如 HDFS,来保存应用程序的中间状态,这样当 Flink JobManager 宕机时,也可以恢复过来。Flink 还支持 RocksDBStateBackend,用来存放较大的中间状态,并能支持增量的状态更新。

提交与管理脚本

Flink 程序可以直接在 IDE 中调试。我们也可以搭建一个本地的 Flink 集群,并通过 Flink CLI 命令行工具来提交脚本:

bin/flink run -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar

脚本的运行状态可以在 Flink 仪表盘中查看:

使用暂存点来停止和恢复脚本

当需要暂停脚本、或对程序逻辑进行修改时,我们需要用到 Flink 的暂存点机制(Savepoint)。暂存点和检查点类似,同样保存的是 Flink 各个算子的状态数据(Operator State)。不同的是,暂存点主要用于人为的脚本更替,而检查点则主要由 Flink 控制,用来实现故障恢复。flink cancel -s 命令可以在停止脚本的同时创建一个暂存点:

$ bin/flink cancel -s /tmp/flink/savepoints 1253cc85e5c702dbe963dd7d8d279038
Cancelled job 1253cc85e5c702dbe963dd7d8d279038. Savepoint stored in file:/tmp/flink/savepoints/savepoint-1253cc-0df030f4f2ee.

具体到我们的 ETL 示例程序,暂存点中保存了当前 Kafka 队列的消费位置、正在写入的文件名等。当需要从暂存点恢复执行时,可以使用 flink run -s 传入目录位置。Flink 会从指定偏移量读取消息队列,并处理好中间结果文件,确保没有缺失或重复的数据。

flink run -s /tmp/flink/savepoints/savepoint-1253cc-0df030f4f2ee -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar

在 YARN 上运行

要将脚本提交到 YARN 集群上运行,同样是使用 flink run 命令。首先将代码中指定文件目录的部分添加上 HDFS 前缀,如 hdfs://localhost:9000/,重新打包后执行下列命令:

$ export HADOOP_CONF_DIR=/path/to/hadoop/conf
$ bin/flink run -m yarn-cluster -c com.shzhangji.flinksandbox.kafka.KafkaLoader target/flink-sandbox-0.1.0.jar
Submitted application application_1545534487726_0001

Flink 仪表盘会在 YARN Application Master 中运行,我们可以通过 ResourceManager 界面进入。返回的应用 ID 可以用来管理脚本,添加 -yid 参数即可:

bin/flink cancel -s hdfs://localhost:9000/tmp/flink/savepoints -yid application_1545534487726_0001 84de00a5e193f26c937f72a9dc97f386

Flink 如何保证 Exactly-once 语义

Flink 实时处理程序可以分为三个部分,数据源、处理流程、以及输出。不同的数据源和输出提供了不同的语义保证,Flink 统称为 连接器。处理流程则能提供 Exactly-once 或 At-least-once 语义,需要看检查点是否开启。

实时处理与检查点

Flink 的检查点机制是基于 Chandy-Lamport 算法的:Flink 会定时在数据流中安插轻量的标记信息(Barrier),将消息流切割成一组组记录;当某个算子处理完一组记录后,就将当前状态保存为一个检查点,提交给 JobManager,该组的标记信息也会传递给下游;当末端的算子(通常是 Sink)处理完这组记录并提交检查点后,这个检查点将被标记为“已完成”;当脚本出现问题时,就会从最后一个“已完成”的检查点开始重放记录。

如果算子有多个上游,Flink 会使用一种称为“消息对齐”的机制:如果某个上游出现延迟,当前算子会停止从其它上游消费消息,直到延迟的上游赶上进度,这样就保证了算子中的状态不会包含下一批次的记录。显然,这种方式会引入额外的延迟,因此除了这种 EXACTLY_ONCE 模式,我们也可将检查点配置为 AT_LEAST_ONCE,以获得更高的吞吐量。具体方式请参考 官方文档。

可重放的数据源

当出错的脚本需要从上一个检查点恢复时,Flink 必须对数据进行重放,这就要求数据源支持这一功能。Kafka 是目前使用得较多的消息队列,且支持从特定位点进行消费。具体来说,FlinkKafkaConsumer 类实现了 CheckpointedFunction 接口,会在检查点中存放主题名、分区名、以及偏移量:

abstract class FlinkKafkaConsumerBase implements CheckpointedFunction {
    public void initializeState(FunctionInitializationContext context) {
        OperatorStateStore stateStore = context.getOperatorStateStore();
        this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor < > (
            OFFSETS_STATE_NAME,
            TypeInformation.of(new TypeHint < Tuple2 < KafkaTopicPartition, Long >> () {})));
        if (context.isRestored()) {
            for (Tuple2 < KafkaTopicPartition, Long > kafkaOffset: unionOffsetStates.get()) {
                restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
            }
        }
    }
    public void snapshotState(FunctionSnapshotContext context) {
        unionOffsetStates.clear();
        for (Map.Entry < KafkaTopicPartition, Long > kafkaTopicPartitionLongEntry: currentOffsets.entrySet()) {
            unionOffsetStates.add(Tuple2.of(kafkaTopicPartitionLongEntry.getKey(),
                kafkaTopicPartitionLongEntry.getValue()));
        }
    }
}

当数据源算子从检查点或暂存点恢复时,我们可以在 TaskManager 的日志中看到以下信息,表明当前消费的偏移量是从算子状态中恢复出来的:

2018-12-23 10:56:47,380 INFO FlinkKafkaConsumerBase
Consumer subtask 0 will start reading 2 partitions with offsets in restored state:
{KafkaTopicPartition{topic='flink_test', partition=1}=725,
KafkaTopicPartition{topic='flink_test', partition=0}=721}

恢复写入中的文件

程序运行过程中,StreamingFileSink 首先会将结果写入中间文件,以 . 开头、in-progress 结尾。这些中间文件会在符合一定条件后更名为正式文件,取决于用户配置的 RollingPolicy,默认策略是基于时间(60 秒)和基于大小(128 MB)。当脚本出错或重启时,中间文件会被直接关闭;在恢复时,由于检查点中保存了中间文件名和成功写入的长度,程序会重新打开这些文件,切割到指定长度(Truncate),然后继续写入。这样一来,文件中就不会包含检查点之后的记录了,从而实现 Exactly-once。

以 Hadoop 文件系统举例,恢复的过程是在 HadoopRecoverableFsDataOutputStream 类的构造函数中进行的。它会接收一个 HadoopFsRecoverable 类型的结构,里面包含了中间文件的路径和长度。这个对象是 BucketState 的成员,会被保存在检查点中。

HadoopRecoverableFsDataOutputStream(FileSystem fs, HadoopFsRecoverable recoverable) {
    this.tempFile = checkNotNull(recoverable.tempFile());
    truncate(fs, tempFile, recoverable.offset());
    out = fs.append(tempFile);
}

结论

Apache Flink 构建在实时处理之上,从设计之初就充分考虑了中间状态的保存,而且能够很好地与现有 Hadoop 生态环境结合,因而在大数据领域非常有竞争力。它还在高速发展之中,近期也引入了 Table API、流式 SQL、机器学习等功能,像阿里巴巴这样的公司也在大量使用和贡献代码。Flink 的应用场景众多,有很大的发展潜力,值得一试。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/544950.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

<组件封装:Vue + elementUi 通过excel文件实现 “ 批量导入 ” 表单数据,生成对应新增信息 >

Vue elementUi 通过excel文件实现 “ 批量导入 ” 表单数据&#xff0c;生成对应新增信息 &#x1f449; 前言&#x1f449; 一、封装组件对应API及绑定事件> Attributes> Event &#x1f449; 二、实现案例> HTML父组件模板> 子组件模板 &#x1f449; 三、效果演…

线程相关基础知识

一、相关概念 1.1 cpu 中央处理器&#xff08;central processing unit, 简称cpu &#xff09;&#xff0c;计算机系统的 运算 和 控制 核心 1.2 cpu核心数和线程数 cpu核心数指cpu 内核数量&#xff0c;如双核、四核、八核。 cpu线程数是一种逻辑的概念&#xff0c;就是模…

基于 SpringBoot + Redis 实现分布式锁

大家好&#xff0c;我是余数&#xff0c;这两天温习了下分布式锁&#xff0c;然后就顺便整理了这篇文章出来。文末附有源码链接&#xff0c;需要的朋友可以自取。 至于什么是分布式锁&#xff0c;这里不做赘述&#xff0c;不了解的可以自行去查阅资料。 文章目录 实现要点项目…

android13 FLAG_BLUR_BEHIND 壁纸高斯模糊,毛玻璃背景方案设计-千里马framework实战

hi,粉丝朋友们! 今天有个学员朋友&#xff0c;问到了一个高斯模糊相关问题&#xff0c;这个高斯模糊相关的需求我相对还是比较熟悉&#xff0c;下面来重点讲解一下新版本高斯模糊相关的实现。 更多framework干货知识手把手教学 Log.i("qq群"&#xff0c;“422901085…

[230528] 托福阅读真题|TPO66 13/30|整卷得分22/30|9:45~10:45|15:40~16:40

The Actor and the Audience P1 rehearsev 排练&#xff1b;排演anticipate v 预期&#xff1b;预料&#xff1b;预见 audiencen 观众brilliantadj 灿烂的&#xff1b;绝妙的rehearsaln 排练&#xff1b;预演&#xff1b;排演crumblev 崩塌stage frightn 怯场&#xff08;演员…

自动化测试框架?这应该是全网最全自动化框架总结了,你要的都有...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 Python自动化测试&…

学术加油站|基于LSM-tree存储系统的内存管理,最大限度降低I/O成本

本文系北京理工大学科研助理牛颂登所著&#xff0c;本篇也是 OceanBase 学术系列稿件第 10 篇。欢迎访问 OceanBase 官网获取更多信息&#xff1a;https://www.oceanbase.com/ 「牛颂登&#xff1a;北京理工大学科研助理&#xff0c;硕士期间在电子科技大学网络空间安全研究院从…

资深老鸟总结,Selenium自动化测试实战小技巧,不要再走弯路了...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 Selenium4自动化测…

数据库小技能:数据报表

文章目录 I 需求1.1 补贴II 实现思路2.1 生成资金调节报表数据III Dto3.1 报表基本查询IV 接口I 需求 代理商调节活动汇总商户调节活动汇总激励金日月汇总数据源:活动流水表(上游回调) 1.1 补贴 调节活动补贴= D0补贴+T1补贴。(比如交易金额满足1000,转T1) 补贴金额 =…

图扑数字孪生智慧灯杆,“多杆合一”降本增效

前言 随着智慧城市建设的不断深入&#xff0c;智慧灯杆作为城市基础设施的重要组成部分&#xff0c;正在成为城市智能化和绿色化的重要手段之一。 效果展示 图扑智慧灯杆系统在城市道路照明领域引入信息化手段&#xff0c;通过构建路灯物联网&#xff0c;实现了现代化的路灯按…

线性代数 --- Gram-Schmidt, 格拉姆-施密特正交化(下)

Gram-Schmidt正交化过程 到目前为止&#xff0c;我们都是在反复强调“对于无解的方程组Axb而言&#xff0c;如果矩阵A是标准正交矩阵的话&#xff0c;就怎么怎么好了。。。。”。因为&#xff0c;不论是求投影还是计算最小二乘的正规方程&#xff0c;他们都包含了。当A为标准正…

yolov4论文解读

数据层面上的数据增强 四张照片拼接成一张进行训练 相当于增大了batch-size&#xff0c;更适合于单GPU。 Mosaic data augmentation 马赛克数据增强 self-adversarial training(SAT) 自我对抗训练 DropBlock Label Smoothing 损失函数 由IOU改进到CIOU 网络结构 CSPNet&…

Win10 WLAN驱动正常但仍然不显示无线网络解决办法

Win10 WLAN驱动正常但仍然不显示无线网络解决办法 写作背景过程解决方案结尾 写作背景 本菜鸡重置了电脑的网络&#xff0c;然后重新启动后 WLAN 不见了&#xff0c;连不了 WIFI 了&#xff0c;很疑惑&#xff0c;后来经过一番搜索找到了问题所在&#xff0c;写下本篇文章以记…

Spark/Flink广播实现作业配置动态更新

前言 在实时计算作业中&#xff0c;往往需要动态改变一些配置&#xff0c;举几个栗子&#xff1a; 实时日志ETL服务&#xff0c;需要在日志的格式、字段发生变化时保证正常解析&#xff1b;实时NLP服务&#xff0c;需要及时识别新添加的领域词与停用词&#xff1b;实时风控服…

访问学者J1签证面签的七个问题

作为访问学者&#xff0c;申请J1签证面签时可能会遇到一些常见问题。下面知识人网小编将介绍七个访问学者面签可能遇到的问题&#xff0c;并提供相应的答案。 问题一&#xff1a;您将在美国进行何种类型的学术研究&#xff1f; 答案&#xff1a;我将在美国从事学术研究&#x…

普冉PY32L020单片机简介,主频最高48MHZ

PY32L020单片机是一颗32 位 ARM Cortex-M0内核&#xff0c;宽电压工作范围的 MCU。这颗MCU的价格跟八位单片机相差不大&#xff0c;性价比可以说是非常的高了。来看看PY32L020的配置吧。 PY32L020单片机产品特性&#xff1a; 内核&#xff1a; — 32 位 ARM Cortex - M0 — 最…

飞浆AI studio人工智能课程学习(2)-Prompt优化思路|十个技巧高效优化Prompt|迭代法|Trick法|通用法|工具辅助

文章目录 优化思路上节课的例子问题分析思路解析 Prompt优化技巧Prompt优化原理 十个技巧高效优化Prompt迭代法Trick法工具法通用技巧│定基础通用技巧│做强调需求强调怎么做&#xff1f; 通用技巧│提预设Trick法│戴高帽原理 Trick法│说好话以基础计算为例: Trick法│给提示…

小红书数据分析:如何用ChatGPT输出爆文笔记

ChatGPT的热度依旧不减&#xff0c;随着技术升级&#xff0c;越来越多更高级的玩法被发掘。今天我们就来聊聊&#xff0c;如何用ChatGPT写出小红书风格的文章。 首先&#xff0c;小红书笔记制作分为两个步骤&#xff1a; 1、找选题 2、写小红书风格的笔记 我们用例子说话&a…

全国自考本科通过率仅7%,为什么还有这么多人报考?

根据教育部官网公布的《2021年全国教育事业发展统计公报》得知&#xff1a;2021年&#xff0c;全国高等教育自学考试学历教育报考625.78万人次&#xff0c;取得毕业证书48.94万人。也就是说2021年我国自考平均通过率大概在7%左右。 自考通过率为什么这么低&#xff1f; ①自考…

Android平台外部编码数据(H264/H265/AAC/PCMA/PCMU)实时预览播放技术实现

开发背景 好多开发者可能疑惑&#xff0c;外部数据实时预览播放&#xff0c;到底有什么用&#xff1f; 是的&#xff0c;一般场景是用不到的&#xff0c;我们在开发这块前几年已经开发了非常稳定的RTMP、RTSP直播播放模块&#xff0c;不过也遇到这样的场景&#xff0c;部分设…