流批一体计算引擎-6-[Flink]的Python DataStream API程序

news2025/1/12 16:11:13

参考官方Python API文档

1 IDEA中运行Flink

从Flink 1.11版本开始, PyFlink 作业支持在 Windows 系统上运行,因此您也可以在 Windows 上开发和调试 PyFlink 作业了。

1.1 环境配置

pip3 install apache-flink==1.15.3
CMD>set PATH查看环境变量
CMD>set JAVA_HOME查看环境变量
JAVA_HOME=D:\Java\jdk

1.2 Python API

官方Python API文档

根据需要的抽象级别的不同,有两种不同的API可以在PyFlink中使用:
(1)PyFlink Table API允许你使用类似于SQL或者在Python中处理表格数据的方式编写强大的关系查询。
(2)PyFlink DataStream API允许你对Flink的核心组件state和time进行细粒度的控制,以便构建更复杂的流处理应用。

从现有的 StreamExecutionEnvironment 创建 StreamTableEnvironment,以与 DataStream API 进行互操作。

1.3 配置Flink Kafka连接

(1)在https://mvnrepository.com/里输入flink sql kafka寻找对应版本的连接器
在这里插入图片描述
(2)选择Flink对应的版本1.15.3,点击jar
在这里插入图片描述
在这里插入图片描述
(3)将该jar包放置在python的lib目录下
External Libraries->site-packages->pyflink->lib

2 PyFlink DataStream API

Flink中的数据流程序是对数据流执行转换的常规程序(例如,过滤、更新状态、定义窗口、聚合)。数据流最初是从各种源(例如消息队列、套接字流、文件)创建的。结果通过接收器返回,接收器可以将数据写入文件或标准输出(例如命令行终端)。

2.1 Python DataStream API程序的基本结构

2.2 第一步:创建StreamExecutionEnvironment

StreamExecutionEnvironment是DataStream API程序的核心概念。以下代码示例显示了如何创建StreamExecutionEnvironment:

from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()

2.3 第二步:创建DataStream

DataStream API从专用的DataStream类获取其名称,该类用于表示Flink程序中的数据集合。您可以将它们视为可以包含重复数据的不可变数据集合。这些数据可以是有限的,也可以是无界的,用于处理它们的API是相同的。

DataStream在使用方面与常规Python集合相似,但在某些关键方面有很大不同。它们是不可变的,这意味着一旦创建了它们,就不能添加或删除元素。您还可以不仅仅简单地检查内部的元素,还可以使用DataStream API操作(也称为转换)处理它们。

您可以通过在Flink程序中添加source来创建初始DataStream。然后,您可以从中派生出新的streams,并通过使用诸如map、filter等API方法来组合它们。

2.3.1 通过列表类型的对象创建

# -*- coding:utf-8 -*-
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

ds = env.from_collection(
    collection=[(1, 'aaa|bb'), (2, 'bb|a'), (3, 'aaa|a')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))
ds.print()

env.execute("tutorial_job")

在这里插入图片描述
参数type_info是可选的,如果未指定,则返回的DataStream的输出类型将为Types.PICKLED_BYTE_ARRAY()。

2.3.2 通过DataStream连接器创建

方式一:使用DataStream连接器的add_source方法创建数据流,如下所示:

# -*- coding:utf-8 -*-
from pyflink.common import JsonRowDeserializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer

env = StreamExecutionEnvironment.get_execution_environment()
# 这里使用kafka的sql连接器,因为它是一个fat jar,可以避免依赖性问题
env.add_jars("flink-sql-connector-kafka-1.15.3.jar")

deserialization_schema = JsonRowDeserializationSchema.builder() \
    .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()

kafka_consumer = FlinkKafkaConsumer(
    topics='test_source_topic',
    deserialization_schema=deserialization_schema,
    properties={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})

ds = env.add_source(kafka_consumer)
ds.print()
env.execute("tutorial_job")

Note: It currently only supports FlinkKafkaConsumer to be used as DataStream source connectors with method add_source.

Note: The DataStream created using add_source could only be executed in streaming executing mode.

方式二:You could also call the from_source method to create a DataStream using unified DataStream source connectors:

# -*- coding:utf-8 -*-
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import NumberSequenceSource

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(2)
seq_num_source = NumberSequenceSource(1, 10)
ds = env.from_source(
    source=seq_num_source,
    watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(),
    source_name='seq_num_source',
    type_info=Types.LONG())
ds.print()
env.execute("tutorial_job")

在这里插入图片描述

Note: Currently, it only supports NumberSequenceSource and FileSource as unified DataStream source connectors.

Note: The DataStream created using from_source could be executed in both batch and streaming executing mode.

2.3.3 通过Table & SQL连接器创建

Table & SQL连接器也能用来创建DataStream。首先使用Table & SQL连接器创建Table,然后将Table转化为DataStream。

# -*- coding:utf-8 -*-
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(stream_execution_environment=env)

t_env.execute_sql("""
        CREATE TABLE my_source (
          a INT,
          b VARCHAR
        ) WITH (
          'connector' = 'datagen',
          'number-of-rows' = '10'
        )
    """)

ds = t_env.to_append_stream(
    t_env.from_path('my_source'),
    Types.ROW([Types.INT(), Types.STRING()]))

ds.print()
env.execute("tutorial_job")

Note: The StreamExecutionEnvironment env should be specified when creating the StreamTableEnvironment t_env.

Note: As all the Java Table & SQL connectors could be used in PyFlink Table API, this means that all of them could also be used in PyFlink DataStream API.

2.4 第三步:DataStream转换

运算符将一个或多个DataStream转换为新的DataStream。程序可以将多种转换组合成复杂的DataStream拓扑。
下面的示例显示了一个简单的示例,说明如何使用映射转换将一个DataStream转换为另一个DataStream:

ds = ds.map(lambda a: a + 1)

2.5 第四步:DataStream and Table之间转换

# convert a DataStream to a Table
table = t_env.from_data_stream(ds, 'a, b, c')

# convert a Table to a DataStream
ds = t_env.to_append_stream(table, Types.ROW([Types.INT(), Types.STRING()]))
# or
ds = t_env.to_retract_stream(table, Types.ROW([Types.INT(), Types.STRING()]))

2.6 第五步:输出结果

2.6.1 打印结果

您可以调用print方法将DataStream的数据打印到标准输出。

ds.print()

2.6.2 将结果数据收集到客户端

您可以调用execute_and_collect方法将数据流的数据收集到客户端:

with ds.execute_and_collect() as results:
    for result in results:
        print(result)

注意:execute_and_collect方法会将数据流的数据收集到客户端的内存中,因此限制收集的行数是一个很好的做法。

2.6.3 将结果写入到DataStream sink连接器

方式一:您可以调用add_sink方法将数据流的数据发送到数据流接收器连接器:

from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors.kafka import FlinkKafkaProducer
from pyflink.datastream.formats.json import JsonRowSerializationSchema

serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
    type_info=Types.ROW([Types.INT(), Types.STRING()])).build()

kafka_producer = FlinkKafkaProducer(
    topic='test_sink_topic',
    serialization_schema=serialization_schema,
    producer_config={'bootstrap.servers': 'localhost:9092', 'group.id': 'test_group'})

ds.add_sink(kafka_producer)

注意:它目前只支持FlinkKafkaProducer和JdbcSink用作方法add_sink的数据流接收器连接器。

注意:add_sink方法只能在流执行模式下使用。

方式二:您还可以调用sink_to方法将数据流的数据发送到统一的数据流接收器连接器:

from pyflink.datastream.connectors.file_system import FileSink, OutputFileConfig
from pyflink.common.serialization import Encoder

output_path = '/opt/output/'
file_sink = FileSink \
    .for_row_format(output_path, Encoder.simple_string_encoder()) \
    .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \
    .build()
ds.sink_to(file_sink)

注意:它目前只支持FileSink作为统一的数据流接收器连接器。
注意sink_to方法可用于批处理和流式执行模式。

2.6.4 将结果写入到Table & SQL sink连接器

Table & SQL也可用于写入数据流。您需要首先将数据流转换为表,然后将其写入Table & SQL sink连接器。

from pyflink.common import Row
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)

# option 1:the result type of ds is Types.ROW
def split(s):
    splits = s[1].split("|")
    for sp in splits:
        yield Row(s[0], sp)

ds = ds.map(lambda i: (i[0] + 1, i[1])) \
       .flat_map(split, Types.ROW([Types.INT(), Types.STRING()])) \
       .key_by(lambda i: i[1]) \
       .reduce(lambda i, j: Row(i[0] + j[0], i[1]))

# option 1:the result type of ds is Types.TUPLE
def split(s):
    splits = s[1].split("|")
    for sp in splits:
        yield s[0], sp

ds = ds.map(lambda i: (i[0] + 1, i[1])) \
       .flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
       .key_by(lambda i: i[1]) \
       .reduce(lambda i, j: (i[0] + j[0], i[1]))

# emit ds to print sink
t_env.execute_sql("""
        CREATE TABLE my_sink (
          a INT,
          b VARCHAR
        ) WITH (
          'connector' = 'print'
        )
    """)

table = t_env.from_data_stream(ds)
table_result = table.execute_insert("my_sink")

2.7 第六步:提交作业

Finally, you should call the StreamExecutionEnvironment.execute method to submit the DataStream API job for execution:

env.execute()
If you convert the DataStream to a Table and then write it to a Table API & SQL sink connector, it may happen that you need to submit the job using TableEnvironment.execute method.

t_env.execute()

3 DataStream API 示例代码

从非空集合中读取数据,并将结果写入本地文件系统。

from pyflink.common.serialization import Encoder
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink

def tutorial():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    ds = env.from_collection(
        collection=[(1, 'aaa'), (2, 'bbb')],
        type_info=Types.ROW([Types.INT(), Types.STRING()]))
    ds.add_sink(StreamingFileSink
                .for_row_format('output', Encoder.simple_string_encoder())
                .build())
    env.execute("tutorial_job")

if __name__ == '__main__':
    tutorial()

(1)DataStream API应用程序首先需要声明一个执行环境
StreamExecutionEnvironment,这是流式程序执行的上下文。
后续将通过它来设置作业的属性(例如默认并发度、重启策略等)、创建源、并最终触发作业的执行。

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

(2)声明数据源
一旦创建了 StreamExecutionEnvironment 之后,可以使用它来声明数据源。
数据源从外部系统(如Apache Kafka、Rabbit MQ 或 Apache Pulsar)拉取数据到Flink作业里。
为了简单起见,本次使用元素集合作为数据源。
这里从相同类型数据集合中创建数据流(一个带有 INT 和 STRING 类型字段的ROW类型)。

ds = env.from_collection(
    collection=[(1, 'aaa'), (2, 'bbb')],
    type_info=Types.ROW([Types.INT(), Types.STRING()]))

(3)转换操作或写入外部系统
现在可以在这个数据流上执行转换操作,或者使用 sink 将数据写入外部系统。
本次使用StreamingFileSink将数据写入output文件目录中。

ds.add_sink(StreamingFileSink
   .for_row_format('output', Encoder.simple_string_encoder())
   .build())

(4)执行作业
最后一步是执行真实的 PyFlink DataStream API作业。
PyFlink applications是懒加载的,并且只有在完全构建之后才会提交给集群上执行。
要执行一个应用程序,只需简单地调用env.execute(job_name)。

env.execute("tutorial_job")

在这里插入图片描述

4 DataStream转换

三种方式支持用户自定义函数。

4.1 Lambda函数

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)

data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())

mapped_stream.print()
env.execute("tutorial_job")

4.2 python函数

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment

def my_map_func(value):
   return value + 1

def main():
   env = StreamExecutionEnvironment.get_execution_environment()
   env.set_parallelism(1)

   data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
   mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())

   mapped_stream.print()
   env.execute("tutorial_job")

if __name__ == '__main__':
   main()

4.3 接口函数

from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment, MapFunction


class MyMapFunction(MapFunction):

   def map(self, value):
      return value + 1


def main():
   env = StreamExecutionEnvironment.get_execution_environment()
   env.set_parallelism(1)

   data_stream = env.from_collection([1, 2, 3, 41, 5], type_info=Types.INT())
   mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())

   mapped_stream.print()
   env.execute("tutorial_job")

if __name__ == '__main__':
   main()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/179176.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

对JDBC驱动注册--DriverManager.registerDriver和Class.forName(driverClass)的理解

对JDBC驱动注册–DriverManager.registerDriver和Class.forName(driverClass)的理解 JDBC提供了独立于数据库的统一API,MySQL、Oracle等数据库公司都可以基于这个标准接口来进行开发。包括java.sql包下的Driver,Connection,Statement&#x…

注解方式管理Bean

1.注解方式创建对象IOC 导入依赖 aop Component(父注解) 放在类上,用于标记,告诉spring当前类需要由容器实例化bean并放入容器中 该注解有三个子注解 Controller 用于实例化controller层bean Service 用于实例化service层bean Repository 用于实例化持久层bean 当不确定是哪一…

【刷题大本营】二叉树进阶oj题(动图讲解,附代码及题目链接)

🔥🔥 欢迎来到小林的博客!!       🛰️博客主页:✈️小林爱敲代码       🛰️欢迎关注:👍点赞🙌收藏✍️留言       这篇文章给大家带来一…

RK3399平台开发系列讲解(文件系统篇)文件回写过程介绍

🚀返回专栏总目录 文章目录 一、编程接口二、回写过程2.1、周期回写2.2、强制回写2.3、系统调用sync沉淀、分享、成长,让自己和他人都能有所收获!😄 📢进程写文件时,内核的文件系统模块把数据写到文件的页缓存,没有立即写回到存储设备。文件系统模块会定期把脏页(即…

[JavaEE]线程池

专栏简介: JavaEE从入门到进阶 题目来源: leetcode,牛客,剑指offer. 创作目标: 记录学习JavaEE学习历程 希望在提升自己的同时,帮助他人,,与大家一起共同进步,互相成长. 学历代表过去,能力代表现在,学习能力代表未来! 目录: 1. 线程池是什么? 2. 线程池的实现原理 3. 标准…

Eureka集群构建步骤

目录 一、Eureka集群原理说明 二、EurekaServer集群环境构建步骤 三、将支付服务8001微服务发布到上面2台Eureka集群配置中 四、将订单服务80微服务发布到上面2台Eureka集群配置中 五、测试01 六、支付服务提供者8001集群环境构建 七、负载均衡 八、测试02 一、Eureka集…

论文投稿指南——中文核心期刊推荐(建筑科学)

【前言】 🚀 想发论文怎么办?手把手教你论文如何投稿!那么,首先要搞懂投稿目标——论文期刊 🎄 在期刊论文的分布中,存在一种普遍现象:即对于某一特定的学科或专业来说,少数期刊所含…

前同事居然因为 Pycharm 的这个功能,即使离职三年也依然经常被请去喝茶~

大家好,我是 哈士奇 ,一位工作了十年的"技术混子", 致力于为开发者赋能的UP主, 目前正在运营着 TFS_CLUB社区。 💬 人生格言:优于别人,并不高贵,真正的高贵应该是优于过去的自己。💬 &#x1f4e…

教你一键生成形如Springboot的高大上banner打印效果

背景 今天闲来无聊,想搞一个类似于Springboot项目启动时打印效果,如下图: 问题解决方案 那这个打印效果怎么实现的呢? 其实,对于这个中效果实现起来也是很简单的。毕竟依托于Springboot强大的框架,任何问…

网狐大联盟非联盟成员无法创建房间解决-暂时不可创建当前游戏,请选择其他游戏!

"暂时不可创建当前游戏,请选择其他游戏!" 问题所有lua文件定位:

恶意代码分析实战 16 Shellcode分析

16.1 Lab19-01 将程序载入IDA。 一堆ecx自增的操作。到200是正常的代码段。 shellcode的解码器也是从这里开始的,一开始的xor用于清空ecx,之后将18dh赋给cx,jmp来到loc_21f,而在下图可以看到loc_21调用sub_208,在call指令执行后&#xff0…

40.Isaac教程--3D 物体姿态优化

3D 物体姿态优化 ISAAC教程合集地址: https://blog.csdn.net/kunhe0512/category_12163211.html 3D 物体姿态优化在操作等应用中起着至关重要的作用,在这些应用中,检测到的物体的位置会影响机器人的整体性能。 Isaac SDK 中的 3D 对象姿势优化应用程序提…

7. 好客租房-项目日常推进ing

7. 好客租房-项目日常推进ing 本章节不涉及大量内容,主要是为了推荐项目代码日常进度而设置, 包括添加mock接口, 添加更新房源接口, 为系统添加缓存. 7.1 为前端系统提供mock服务 往往在项目开发中, 为实现前后端并行开发,后端需要对前端所有的请求都都进行支持。…

2022年度总结——2022我在CSDN的那些事暨2023我的目标展望:Pursue freedom Realize self-worth

📋前言 关于年度征文: 活动地址:2022年度征文活动页-CSDN 📚文章目录 📋前言 🎯再见2022,2023新年快乐 🎯回顾2022——“我”与我在CSDN的那些事 🧩伊始——CSDN&…

Allegro如何做镂空丝印操作指导

Allegro如何做镂空丝印操作指导 在PCB设计丝印的时候,会需要画镂空的丝印,Allegro升级到了172版本的时候,可以画镂空的丝印,如下图 具体操作如下 选择Shape Add Rect命令Options选择需要画到的层面,比如Silkscreen TOP层

Lesson1:走进C++的殿堂

首先在此声明一下,C的学习需要C语言的基础,不先学习C语言而直接学C是不现实的。市面上任何一本C的书籍,前几章的内容一定涉及到C语言的学习。 一、什么是C 照片上的这位老人便是C语言之父——本贾尼斯特劳斯特卢普(Bjarne Stroust…

JavaScript学习

JavaScript 是一门跨平台、面向对象的脚本语言,而Java语言也是跨平台的、面向对象的语言,只不过Java是编译语言,是需要编译成字节码文件才能运行的;JavaScript是脚本语言,不需要编译,由浏览器直接解析并执行…

Spring核心模块解析—BeanDifinition。

BeanDifinition前言什么是BeanDefinition?为什么要有BeanDefinition?BeanDifinition重点源码总结前言 Spring中的BeanDifinition在Bean的实例化流程中占有着非常重要的角色,如果你不了解BeanDifinition的话,面试或者学习Bean的生…

【Leetcode每日一题】69. x 的平方根/Sqrt(x)|二分查找---day3

博主简介:努力学习的预备程序媛一枚~博主主页: 是瑶瑶子啦所属专栏: LeetCode每日一题–进击大厂 目录题目描述题目分析:代码分析:题目描述 链接: 69. x 的平方根/Sqrt(x) 给你一个非负整数 x ,计算并返回 x 的 算术…

10+种编程语言做个计算器

用十种编程语言开发计算器应用 C语言C#(windows桌面软件)Swift (ios应用)pythonDart(Flutter应用,跨平台,适用安卓、ios、mac、windows、web)Java(安卓App)K…