Flink流批一体计算(18):PyFlink DataStream API之计算和Sink

news2025/1/23 13:05:38

目录

1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。

2. File Sink

File Sink

Format Types 

Row-encoded Formats 

Bulk-encoded Formats 

桶分配

滚动策略

3. 如何输出结果

集合数据到客户端,execute_and_collect方法将收集数据到客户端内存

将结果发送到DataStream sink connector

将结果发送到Table & SQL sink connector

4. 执行 PyFlink DataStream API 作业。


1. 在上节数据流上执行转换操作,或者使用 sink 将数据写入外部系统。

本教程使用 FileSink 将结果数据写入文件中。

def split(line):
    yield from line.split()

# compute word count
ds = ds.flat_map(split) \
    .map(lambda i: (i, 1), output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
    .key_by(lambda i: i[0]) \
    .reduce(lambda i, j: (i[0], i[1] + j[1]))

ds.sink_to(
    sink=FileSink.for_row_format(
        base_path=output_path,
        encoder=Encoder.simple_string_encoder())
    .with_output_file_config(
        OutputFileConfig.builder()
        .with_part_prefix("prefix")
        .with_part_suffix(".ext")
        .build())
    .with_rolling_policy(RollingPolicy.default_rolling_policy())
    .build()
)

sink_to函数,将DataStream数据发送到自定义sink connector,仅支持FileSink,可用于batch和streaming执行模式。

2. File Sink

Streaming File Sink是Flink1.7中推出的新特性,是为了解决如下的问题:

大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中。

Streaming File Sink就可以用来将分区文件写入到支持 Flink FileSystem 接口的文件系统中,支持Exactly-Once语义。这种sink实现的Exactly-Once都是基于Flink checkpoint来实现的两阶段提交模式来保证的,主要应用在实时数仓、topic拆分、基于小时分析处理等场景下。

Streaming File Sink 是社区优化后添加的connector,推荐使用。

Streaming File Sink更灵活,功能更强大,可以自己实现序列化方法

Streaming File Sink有两个方法可以输出到文件:行编码格式forRowFormat 和  块编码格式forBulkFormat。

forRowFormat 比较简单,只提供了SimpleStringEncoder写文本文件,可以指定编码。

由于流数据本身是无界的,所以,流数据将数据写入到分桶(bucket)中。默认使用基于系统时间(yyyy-MM-dd--HH)的分桶策略。在分桶中,又根据滚动策略,将输出拆分为 part 文件。

Flink 提供了两个分桶策略,分桶策略实现了

org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner 接口:

BasePathBucketAssigner,不分桶,所有文件写到根目录;

DateTimeBucketAssigner,基于系统时间(yyyy-MM-dd--HH)分桶。

除此之外,还可以实现BucketAssigner接口,自定义分桶策略。

Flink 提供了两个滚动策略,滚动策略实现了

org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy 接口:

DefaultRollingPolicy 当超过最大桶大小(默认为 128 MB),或超过了滚动周期(默认为 60 秒),或未写入数据处于不活跃状态超时(默认为 60 秒)的时候,滚动文件;

OnCheckpointRollingPolicy 当 checkpoint 的时候,滚动文件。

File Sink

File Sink 将传入的数据写入存储桶中。考虑到输入流可以是无界的,每个桶中的数据被组织成有限大小的 Part 文件。 完全可以配置为基于时间的方式往桶中写入数据,比如可以设置每个小时的数据写入一个新桶中。这意味着桶中将包含一个小时间隔内接收到的记录。

桶目录中的数据被拆分成多个 Part 文件。对于相应的接收数据的桶的 Sink 的每个 Subtask,每个桶将至少包含一个 Part 文件。将根据配置的滚动策略来创建其他 Part 文件。 对于 Row-encoded Formats默认的策略是根据 Part 文件大小进行滚动,需要指定文件打开状态最长时间的超时以及文件关闭后的非活动状态的超时时间。 对于 Bulk-encoded Formats 在每次创建 Checkpoint 时进行滚动,并且用户也可以添加基于大小或者时间等的其他条件。

重要:  STREAMING 模式下使用 FileSink 需要开启 Checkpoint 功能。 文件只在 Checkpoint 成功时生成。如果没有开启 Checkpoint 功能,文件将永远停留在 in-progress 或者 pending 的状态,并且下游系统将不能安全读取该文件数据。

Format Types 

FileSink 不仅支持 Row-encoded 也支持 Bulk-encoded,例如 Apache Parquet 这两种格式可以通过如下的静态方法进行构造:

  • Row-encoded sink: FileSink.forRowFormat(basePath, rowEncoder)
  • Bulk-encoded sink: FileSink.forBulkFormat(basePath, bulkWriterFactory)

不论创建 Row-encoded Format 或者 Bulk-encoded Format Sink 时,都必须指定桶的路径以及对数据进行编码的逻辑。

Row-encoded Formats 

Row-encoded Format 需要指定一个 Encoder,在输出数据到文件过程中被用来将单个行数据序列化为 Outputstream

除了 bucket assignerRowFormatBuilder 还允许用户指定以下属性:

  • Custom RollingPolicy :自定义滚动策略覆盖 DefaultRollingPolicy
  • bucketCheckInterval (默认值 = 1 min) :基于滚动策略设置的检查时间间隔
data_stream = ...
sink = FileSink \
    .for_row_format(OUTPUT_PATH, Encoder.simple_string_encoder("UTF-8")) \
    .with_rolling_policy(RollingPolicy.default_rolling_policy(
        part_size=1024 ** 3, rollover_interval=15 * 60 * 1000, inactivity_interval=5 * 60 * 1000)) \
    .build()
data_stream.sink_to(sink)

这个例子中创建了一个简单的 Sink,默认的将记录分配给小时桶。 例子中还指定了滚动策略,当满足以下三个条件的任何一个时都会将 In-progress 状态文件进行滚动:

  • 包含了至少15分钟的数据量
  • 从没接收延时5分钟之外的新纪录
  • 文件大小已经达到 1GB(写入最后一条记录之后)

Bulk-encoded Formats 

Bulk-encoded Sink 的创建和 Row-encoded 的相似,但不需要指定 Encoder,而是需要指定 BulkWriter.Factory BulkWriter 定义了如何添加和刷新新数据以及如何最终确定一批记录使用哪种编码字符集的逻辑。

Flink 内置了5 BulkWriter 工厂类:

  • ParquetWriterFactory
  • AvroWriterFactory
  • SequenceFileWriterFactory
  • CompressWriterFactory
  • OrcBulkWriterFactory

重要 Bulk-encoded Format 仅支持一种继承了 CheckpointRollingPolicy 类的滚动策略。 在每个 Checkpoint 都会滚动。另外也可以根据大小或处理时间进行滚动。

桶分配

桶的逻辑定义了如何将数据分配到基本输出目录内的子目录中。

Row-encoded Format Bulk-encoded Format使用了 DateTimeBucketAssigner 作为默认的分配器。 默认的分配器 DateTimeBucketAssigner 会基于使用了格式为 yyyy-MM-dd--HH 的系统默认时区来创建小时桶。日期格式(  桶大小)和时区都可以手动配置。

还可以在格式化构造器中通过调用 .withBucketAssigner(assigner) 方法指定自定义的 BucketAssigner

Flink 内置了两种 BucketAssigners

  • DateTimeBucketAssigner :默认的基于时间的分配器
  • BasePathBucketAssigner :分配所有文件存储在基础路径上(单个全局桶)

PyFlink 只支持 DateTimeBucketAssigner  BasePathBucketAssigner 

滚动策略

RollingPolicy 定义了何时关闭给定的 In-progress Part 文件,并将其转换为 Pending 状态,然后在转换为 Finished 状态。 Finished 状态的文件,可供查看并且可以保证数据的有效性,在出现故障时不会恢复。  STREAMING 模式下,滚动策略结合 Checkpoint 间隔(到下一个 Checkpoint 成功时,文件的 Pending 状态才转换为 Finished 状态)共同控制 Part 文件对下游 readers 是否可见以及这些文件的大小和数量。在 BATCH 模式下,Part 文件在 Job 最后对下游才变得可见,滚动策略只控制最大的 Part 文件大小。

Flink 内置了两种 RollingPolicies

  • DefaultRollingPolicy
  • OnCheckpointRollingPolicy

PyFlink 只支持 DefaultRollingPolicy  OnCheckpointRollingPolicy 

3. 如何输出结果

Print

ds.print()

Collect results to client

集合数据到客户端,execute_and_collect方法将收集数据到客户端内存

with ds.execute_and_collect() as results:

    for result in results:

        print(result)

将结果发送到DataStream sink connector

add_sink函数,将DataStream数据发送到sink connector,此函数仅支持FlinkKafkaProducer, JdbcSink和StreamingFileSink,仅在streaming执行模式下使用

from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors import FlinkKafkaProducer
from pyflink.common.serialization import JsonRowSerializationSchema

serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
    type_info=Types.ROW([Types.INT(), Types.STRING()])).build()

kafka_producer = FlinkKafkaProducer(
    topic='test_sink_topic',
    serialization_schema=serialization_schema,
    producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})

ds.add_sink(kafka_producer)

sink_to函数,将DataStream数据发送到自定义sink connector,仅支持FileSink,可用于batch和streaming执行模式

from pyflink.datastream.connectors import FileSink, OutputFileConfig
from pyflink.common.serialization import Encoder

output_path = '/opt/output/'
file_sink = FileSink \
    .for_row_format(output_path, Encoder.simple_string_encoder()) \  .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
    .build()
ds.sink_to(file_sink)

将结果发送到Table & SQL sink connector

Table & SQL connectors也被用于写入DataStream. 首先将DataStream转为Table,然后写入到 Table & SQL sink connector.

from pyflink.common import Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
# option 1:the result type of ds is Types.ROW
def split(s):
    splits = s[1].split("|")
    for sp in splits:
        yield Row(s[0], sp)

ds = ds.map(lambda i: (i[0] + 1, i[1])) \
       .flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \
       .key_by(lambda i: i[1]) \
       .reduce(lambda i, j: Row(i[0] + j[0], i[1]))

# option 1:the result type of ds is Types.TUPLE
def split(s):
    splits = s[1].split("|")
    for sp in splits:
        yield s[0], sp

ds = ds.map(lambda i: (i[0] + 1, i[1])) \
       .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
       .key_by(lambda i: i[1]) \
       .reduce(lambda i, j: (i[0] + j[0], i[1]))

# emit ds to print sink
t_env.execute_sql("""
        CREATE TABLE my_sink (
          a INT,
          b VARCHAR
        ) WITH (
          'connector' = 'print'
        )
    """)

table = t_env.from_data_stream(ds)
table_result = table.execute_insert("my_sink")

4. 执行 PyFlink DataStream API 作业。

PyFlink applications 是懒加载的,并且只有在完全构建之后才会提交给集群上执行。

要执行一个应用程序,你只需简单地调用 env.execute()。

env.execute()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/929764.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python爬虫分布式架构问题汇总

在使用Python爬虫分布式架构中可能出现以下的问题,我们针对这些问题,列出相应解决方案: 1、任务重复执行 在分布式环境下,多个爬虫节点同时从消息队列中获取任务,可能导致任务重复执行的问题。 解决方案:…

十三、pikachu之暴力破解

文章目录 1、暴力破解概述2、基于表单的暴力破解3、验证码的绕过3.1 验证码的认证流程3.2 验证码绕过(on client)3.3 验证码绕过(on server)3.4 token防爆破? 1、暴力破解概述 “暴力破解”是一攻击具手段,…

L1-035 情人节(Python实现) 测试点全过

题目 以上是朋友圈中一奇葩贴:“2月14情人节了,我决定造福大家。第2个赞和第14个赞的,我介绍你俩认识…………咱三吃饭…你俩请…”。现给出此贴下点赞的朋友名单,请你找出那两位要请客的倒霉蛋。 输入格式 输入按照点赞的先后顺…

Python数据分析 | 各种图表对比总结

本期将带领大家一起对在数据可视化的过程中常用的一些图表进行下总结: 条形图 【适用场景】 适用场合是二维数据集(每个数据点包括两个值x和y),但只有一个维度需要比较,用于显示一段时间内的数据变化或显示各项之间的…

thinkphp6 入门(1)--安装、路由规则、多应用模式

一、安装thinkphp6 具体参考官方文档 安装 ThinkPHP6.0完全开发手册 看云 下面仅列举重要步骤 ThinkPHP6.0的环境要求如下: PHP > 7.2.5 1. 安装Composer 2. 安装稳定版thinkphp 如果你是第一次安装的话,在命令行下面,切换到你的WE…

“R语言+遥感“水环境综合评价方法

详情点击链接:"R语言遥感"水环境综合评价方法 一:R语言 1.1 R语言特点(R语言) 1.2 安装R(R语言) 1.3 安装RStudio(R语言) (1)下载地址 &…

语言模型(language model)

文章目录 引言1. 什么是语言模型2. 语言模型的主要用途2.1 言模型-语音识别2.2 语言模型-手写识别2.3 语言模型-输入法 3. 语言模型的分类4. N-gram语言模型4.1 N-gram语言模型-平滑方法4.2 ngram代码4.3 语言模型的评价指标4.4 两类语言模型的对比 5. 神经网络语言模型6. 语言…

开发一款AR导览导航小程序多少钱?ar地图微信小程序 ar导航 源码

随着科技的不断发展,增强现实(AR)技术在不同领域展现出了巨大的潜力。AR导览小程序作为其中的一种应用形式,为用户提供了全新的观赏和学习体验。然而,开发一款高质量的AR导览小程序需要投入大量的时间、人力和技术资源…

C语言 数字在升序数组中出现的次数

目录 1.题目描述 2.题目分析 2.1遍历数组方法 2.2二分查找方法 2.3代码示例 数字在升序数组中出现的次数 这道题可以用遍历数组和二分查找来处理 1.题目描述 2.题目分析 题目中有一个关键信息,非降序数组,我们可以使用if语句来处理这个问题 if(…

记录一次Modbus通信的置位错误

老套路,一图胜千言,框图可能有点随意,后面我会解释 先描述下背景,在Modbus线程内有一个死循环,一直在读8个线圈的状态,该线程内读到的消息会直接发送给UI线程,UI线程会解析Modbus数据帧&#xf…

Hightopo 使用心得(6)- 3D场景环境配置(天空球,雾化,辉光,景深)

在前一篇文章《Hightopo 使用心得(5)- 动画的实现》中,我们将一个直升机模型放到了3D场景中。同时,还利用动画实现了让该直升机围绕山体巡逻。在这篇文章中,我们将对上一篇的场景进行一些环境上的丰富与美化。让场景更…

【排序】快速排序——为什么这个排序最快?

以从小到大的顺序进行说明。 定义 快排是Hoare在1962年(彼时的中国,是三年困难时期,好好学习建设祖国!)提出的基于二叉树结构的排序。 为什么说是基于二叉树? 因为这种排序每次选出一个基准值,…

35、下载、安装 jdk11 记录,Idea中把项目从 jdk8 换 jdk 11

之前一直用jdk8,现在改成 11的试试看 登录官网下载这个11 https://www.oracle.com/cn/java/technologies/downloads/#java11-windows 下载jdk的oracle官网 需要自己注册oracle账户 修改环境变量的 JAVA_HOME Path 路径这里原本添加8的时候有了,不…

构建高性能云原生大数据处理平台:融合人工智能优化数据分析流程

文章目录 架构要点优势与应用案例研究:基于云原生大数据平台的智能营销分析未来展望:大数据与人工智能的融合结论 🎈个人主页:程序员 小侯 🎐CSDN新晋作者 🎉欢迎 👍点赞✍评论⭐收藏 ✨收录专栏…

【MySQL系列】MySQL复合查询的学习 _ 多表查询 | 自连接 | 子查询 | 合并查询

「前言」文章内容大致是对MySQL复合查询的学习。 「归属专栏」MySQL 「主页链接」个人主页 「笔者」枫叶先生(fy) 目录 一、基本查询回顾二、多表查询三、自连接四、子查询4.1 单行子查询4.2 多行子查询4.3 多列子查询4.4 在from子句中使用子查询 五、合并查询 一、基本查询回顾…

RabbitMQ的镜像队列

镜像队列 如果 RabbitMQ 集群中只有一个 Broker 节点,那么该节点的失效将导致整体服务的临时性不可用,并且也可能会导致消息的丢失。可以将所有消息都设置为持久化,并且对应队列的durable 属性也设置为 true ,但是这样仍然无法…

ubuntu学习(五)----读取文件以及光标的移动

1、读取文件函数原型介绍 ssize_t read(int fd,void*buf,size_t count) 参数说明: fd: 是文件描述符 buf:为读出数据的缓冲区; count: 为每次读取的字节数(是请求读取的字节数,读上来的数据保存在缓冲区buf中,同时文…

jmeter递增压测线程组配置

jmeter递增压测线程组配置 新建线程组线程组参数详解及填写其他指标设置 新建线程组 操作位置如图: 线程组参数详解及填写 其他指标设置 其他指标设置可参考另一篇文章: 链接: jmeter 在linux服务器中执行性能测试、监听服务器资源指标

单例模式的相关知识

饿汉模式 package Thread; class Singleton{private static Singleton instance new Singleton();public static Singleton getInstance(){return instance;}private Singleton(){} }public class demo1 {public static void main(String[] args) {Singleton S1 Singleton.ge…

Sentinel dashboard无法查询到应用的限流配置问题以及解决

一。问题引入 使用sentinle-dashboard控制台 项目整体升级后,发现控制台上无法看到流控规则了 之前的问题是无法注册上来 现在是注册上来了。结果看不到流控规则配置了。 关于注册不上来的问题,可以看另一篇文章 https://blog.csdn.net/a15835774652/…