Python 编写 Flink 应用程序经验记录(Flink1.17.1)

news2025/1/23 10:39:19

目录

官方API文档

提交作业到集群运行

官方示例

环境

实例处理Kafka后入库到Mysql

下载依赖

读取kafka数据

写入mysql数据


官方API文档

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/overview/

https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/datastream/kafka/

提交作业到集群运行

#! /usr/bin/env python
# -*- coding: utf-8 -*-

# /opt/test_flink.py
if __name__ == "__main__":
    print("这是一个简单的测试用例")

flink 安装目录下的 examples 目录里面已经提供了一些测试案例,我们也可以直接拿它来做实验。

提交至集群

./bin/flink run -py 代码文件

通过 flink run 即可运行应用程序,由于 flink 既可运行 Java 程序、也可以运行 Python 程序,所以这里我们需要指定 -py 参数,表示运行的是 py 文件。但默认情况下解释器使用的 python2,当然如果你终端输入 python 进入的就是 python3 的话则当我没说,要是我们想指定 flink 使用 python3 解释器的话,则需要配置一个环境变量。

export PYFLINK_CLIENT_EXECUTABLE=/usr/bin/python3

下面来测试一下:

./bin/flink run -py /opt/test_flink.py

很明显结果是成功的,当然这里面没有涉及到任何与 Flink 有关的内容,只是演示如何提交一个 Python 应用程序。当然 flink run 是同时支持 Java、Python 等语言的。

不管使用哪种 API 进行编程,最终客户端都会生成 JobGraph 提交到 JM 上。但毕竟 Flink 的内核是采用 Java 语言编写的,如果 Python 应用程序变成 JobGraph 对象被提交到 Flink 集群上运行的话,那么 Python 虚拟机和 Java 虚拟机之间一定有某种方式,使得 Python 可以直接动态访问 Java 中的对象、Java 也可以回调 Python 中的对象。没错,实现这一点的便是 py4j。

提交单个 py 文件知道怎么做了,但如果该文件还导入了其它文件该怎么办呢?一个项目中还会涉及到包的存在。其实不管项目里的文件有多少,启动文件只有一个,只需要把这个启动文件提交上去即可。举例说明,当然这里仍不涉及具体和 Flink 相关的内容,先把如何提交程序这一步给走通。因为不管编写的程序多复杂,提交这一步骤是不会变的。

先来看看编写的程序:

flink_test 就是主目录,里面有一个 apps 子目录和一个 main.py 文件,apps 目录里面有三个 py 文件,对应的内容分别如图所示。然后将其提交到 Flink Standalone 集群上运行,命令和提交单个文件是一样的

即使是多文件,提交方式也是相似的,输出结果表明提交成功了。

官方示例

环境

  • Java 11
  • Python 3.7, 3.8, 3.9 or 3.10
python -m pip install apache-flink==1.17.1

编写 Flink Python Table API 程序的第一步是创建 TableEnvironment。这是 Python Table API 作业的入口类。

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")

接下来,我们将介绍如何创建源表和结果表。

t_env.create_temporary_table(
    'source',
    TableDescriptor.for_connector('filesystem')
        .schema(Schema.new_builder()
                .column('word', DataTypes.STRING())
                .build())
        .option('path', input_path)
        .format('csv')
        .build())
tab = t_env.from_path('source')

t_env.create_temporary_table(
    'sink',
    TableDescriptor.for_connector('filesystem')
        .schema(Schema.new_builder()
                .column('word', DataTypes.STRING())
                .column('count', DataTypes.BIGINT())
                .build())
        .option('path', output_path)
        .format(FormatDescriptor.for_format('canal-json')
                .build())
        .build())

你也可以使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表:

my_source_ddl = """
    create table source (
        word STRING
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '{}'
    )
""".format(input_path)

my_sink_ddl = """
    create table sink (
        word STRING,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'canal-json',
        'path' = '{}'
    )
""".format(output_path)

t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)

上面的程序展示了如何创建及注册表名分别为 source 和 sink 的表。 其中,源表 source 有一列: word,该表代表了从 input_path 所指定的输入文件中读取的单词; 结果表 sink 有两列: word 和 count,该表的结果会输出到 output_path 所指定的输出文件中。

接下来,我们介绍如何创建一个作业:该作业读取表 source 中的数据,进行一些变换,然后将结果写入表 sink

最后,需要做的就是启动 Flink Python Table API 作业。上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 execute_insert(sink_name) 被调用的时候, 作业才会被真正提交到集群或者本地进行执行。

@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):
    for s in line[0].split():
        yield Row(s)

# 计算 word count
tab.flat_map(split).alias('word') \
    .group_by(col('word')) \
    .select(col('word'), lit(1).count) \
    .execute_insert('sink') \
    .wait()

该教程的完整代码如下:

import argparse
import logging
import sys

from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf

word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]


def word_count(input_path, output_path):
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    # write all the data to one file
    t_env.get_config().set("parallelism.default", "1")

    # define the source
    if input_path is not None:
        t_env.create_temporary_table(
            'source',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .build())
                .option('path', input_path)
                .format('csv')
                .build())
        tab = t_env.from_path('source')
    else:
        print("Executing word_count example with default input data set.")
        print("Use --input to specify file input.")
        tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
                                  DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))

    # define the sink
    if output_path is not None:
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('filesystem')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .column('count', DataTypes.BIGINT())
                        .build())
                .option('path', output_path)
                .format(FormatDescriptor.for_format('canal-json')
                        .build())
                .build())
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('print')
                .schema(Schema.new_builder()
                        .column('word', DataTypes.STRING())
                        .column('count', DataTypes.BIGINT())
                        .build())
                .build())

    @udtf(result_types=[DataTypes.STRING()])
    def split(line: Row):
        for s in line[0].split():
            yield Row(s)

    # compute word count
    tab.flat_map(split).alias('word') \
        .group_by(col('word')) \
        .select(col('word'), lit(1).count) \
        .execute_insert('sink') \
        .wait()
    # remove .wait if submitting to a remote cluster, refer to
    # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
    # for more details


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input, known_args.output)

接下来,可以在命令行中运行作业(假设作业名为 word_count.py):

python word_count.py

上述命令会构建 Python Table API 程序,并在本地 mini cluster 中运行。如果想将作业提交到远端集群执行, 可以参考作业提交示例。

最后,你可以得到如下运行结果:

实例处理Kafka后入库到Mysql

下载依赖

wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar

读取kafka数据

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import sys
import logging

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import WatermarkStrategy, Types
from pyflink.common.serialization import SimpleStringSchema
from pyflink.datastream.connectors.kafka import KafkaSource, KafkaOffsetsInitializer

from pyflink.common import Row
from pyflink.datastream import FlatMapFunction

def read_kafka():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///D:/安技汇/运营平台/DataManage/flink-sql-connector-kafka-1.17.1.jar")

    source = KafkaSource.builder() \
        .set_bootstrap_servers("172.16.12.128:9092") \
        .set_topics("test") \
        .set_group_id("my-group") \
        .set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
        .set_value_only_deserializer(SimpleStringSchema()) \
        .build()
        # 从消费组提交的位点开始消费,不指定位点重置策略
        #.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets()) \
        # 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
        #.set_starting_offsets(KafkaOffsetsInitializer.committed_offsets(KafkaOffsetResetStrategy.EARLIEST)) \
        # 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
        #.set_starting_offsets(KafkaOffsetsInitializer.timestamp(1657256176000)) \
        # 从最早位点开始消费
        #.set_starting_offsets(KafkaOffsetsInitializer.earliest()) \
        # 从最末尾位点开始消费
        #.set_starting_offsets(KafkaOffsetsInitializer.latest()) \
        #.set_property("partition.discovery.interval.ms", "10000")  # 每 10 秒检查一次新分区
        #.set_property("security.protocol", "SASL_PLAINTEXT") \
        #.set_property("sasl.mechanism", "PLAIN") \
        #.set_property("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";")

    kafka_stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "Kafka Source")
    kafka_stream.print()

    env.execute("Source")


if __name__ == "__main__":
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    read_kafka()

写入mysql数据

没通,待补充。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1138776.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

计网小题题库整理第一轮(面向期末基础)(2)

该系列第二期,第一期链接在这~ 计网小题题库整理第一轮(面向期末基础)(1)https://blog.csdn.net/jsl123x/article/details/134030486?spm1001.2014.3001.5501 一.选择题 1、Internet的前身是 (C &#x…

Visual Studio远程连接Linux编译代码时,头文件在/usr/include中找不到,文件存在于/usr/include的子目录中

文章目录 1 问题的提出2 问题分析3 问题的解决 1 问题的提出 VS2022在编译数据安全传输平台时,远程连接到Centos上进行编译,但是提示找不到json头文件。 2 问题分析 在Linux系统下编译代码时,系统会主动到/usr/include目录主动搜索头文件。…

解释器模式——化繁为简的翻译机

● 解释器模式介绍 解释器模式(Interpreter Pattern)是一种用的比较少的行为型模式,其提供了一种解释语言的语法或表达的方式,该模式定义了一个表达式接口,通过该接口解释一个特定的上下文。在这么多的设计模式中&…

【神印王座】改编遇瓶颈,伊莱克斯无建模,皓晨加戏被绞杀,喜提五挂

【侵权联系删除】【文/郑尔巴金】 神印王座动画第78集已经更新了,官方实锤不会断更了,这可真的太爽了。龙皓晨在永恒之塔开始接受伊莱克斯的传承,不过剧情方面有点小瑕疵。伊莱克斯如此重要角色,竟然没有建模,龙皓晨更…

FreeRTOS 事件标志组 详解

目录 什么是事件标志组? 事件标志位 事件标志组 事件标志组相关 API 函数 1. 创建事件标志组 2. 设置事件标志位 3. 清除事件标志位 4. 等待事件标志位 事件标志组实操 什么是事件标志组? 事件标志位 表明某个事件是否发生,联想&am…

【JAVA学习笔记】47 - 异常,try-catch处理,throw处理

项目代码 https://github.com/yinhai1114/Java_Learning_Code/tree/main/IDEA_Chapter12/scr/com/yinhai/exception_ 〇、异常处理的引入 程序出现一个小问题如int num1 10;int num2 0;num1 / num2 > 10 / 0 会抛出错误,但这样不算致命的小问题就…

android studio启动Task配置

Android studio 高版本默认不开启Task配置,需要自己手动开启 1.低版本配置路径:(复制他人图片) 2.高版本路径:添加下图勾选配置即可 3.gradle task 3.1 初识task gradle中所有的构建工作都是由task完成的,它帮我们处…

案例精选|聚铭网络多产品联合部署为北京迎祥酒店建立信息安全屏障

北京迎祥酒店位于龙脉之上的北京后花园昌平区,总面积约18666平米,主营餐饮、住宿、汤泉、婚礼四大业务,酒店每一个细节都散发着国潮气息,充满艺术气质,祥瑞的照壁、精工的雕花、厚重的石刻、颇具京韵京味,是…

Python 自定义模块和包实现GUI(图形界面)登录界面

上一篇:Python 自定义模块和包设计英语生词本(文件版)-CSDN博客 紧接上一篇博文,当我们熟练掌握自定义模块和包、掌握文件的的读取与写入、掌握正则表达式内置模块"re"、掌握GUI(图形界面)的部分…

HarmonyOS原生分析能力,即开即用助力精细化运营

数据分析产品对开发者的价值呈现在两个层面,第一个是产品的层面,可以通过数据去洞察用户的行为,从而找到产品的优化点。另外一个就是运营层面,可以基于数据去驱动,来实现私域和公域的精细化运营。 在鸿蒙生态上&#…

Mac用NTFS文件夹读写NTFS硬盘 NTFS能复制多大的文件

Mac作为一款备受欢迎的计算机操作系统,具备了许多令人惊叹的功能和特性。然而,对于一些Mac用户来说,使用NTFS格式的硬盘可能存在一些疑问。他们可能想知道Mac是否能够读写NTFS格式的硬盘,以及NTFS格式的硬盘是否有文件大小的限制。…

067:mapboxGL上传CSV文件,显示图形,导出为Geojson文件

第067个 点击查看专栏目录 本示例的目的是演示如何在vue+mapbox中上传CSV文件,显示图形,导出为Geojson文件。 直接复制下面的 vue+mapbox源代码,操作2分钟即可运行实现效果 文章目录 示例效果使用的csv文件配置方式示例源代码(共140行)安装依赖相关API参考:专栏目标示例…

CentOS 搭建本地 yum 源方式 安装 httpd 服务

CentOS 搭建本地 yum 源方式 安装 httpd 服务 修改 yum 源 挂载光驱 mkdir -p /mnt/cdrom mount /dev/cdrom /mnt/cdromvi /etc/fstab追加以下内容: /dev/cdrom /mnt/cdrom iso9660 defaults 0 0手动修改CentOS-Base.repo 备份 yum 源配置文件 mv /etc/yum.re…

如何用FLStudio水果21中文版创作音乐?(官方基础教程中文版)

本文讲的是FL Studio的界面和基础工作流程。简言之,FL Studio能让你载入乐器和音频采样,通过手动输入音符数据或实时弹奏来演奏它们;录制外部声音(比如用麦克风录音)然后通过调音台来回放整个混音(还能添加…

私有云:架构图

私有云:架构图 1、架构图2、服务器分配及配置3、本地物理机hosts文件配置4、相关软件包5、本地物理机电脑配置参考【内存最好20G往上】 机缘巧合之下突然想玩玩虚拟化,然后就查资料本地自己搭建一套私有云 使用【VMware Workstation】这个虚拟化软件来进…

Linux系统下DHCP服务安装部署和使用实例详解(蜜罐)

目录 一、概述 二、具体配置如下: 一、概述 DHCP :动态主机设置协议(英语:Dynamic Host Configuration Protocol,DHCP)是一个局域网的网络协议,使用UDP协议工作,主要有两个用途&…

状态模式-对象状态及其转换

某信用卡业务系统,银行账户存在3种状态,且在不同状态下存在不同的行为: 1)正常状态(余额大等于0),用户可以存款也可以取款; 2)透支状态(余额小于0且大于-20…

旅游业热潮中的数字化转型,拓世AI数字人直播一体机重新定义酒店服务的未来

国内经济的快速发展使得居民的生活条件逐渐改善,我国居民人均可支配收入持续增多,居民消费能力和消费水平均同步提高。物质生活条件的持续改善使得人们精神层面的需求加速释放,旅游需求迅速增多。人们出游意愿强烈,旅游行业复苏加…

02 # 手写 instanceof 的原理

instanceof 干什么的&#xff1f; instanceof 运算符用于检测构造函数的 prototype 属性是否出现在某个实例对象的原型链上。 instanceof 可以判断一个对象是否属于某个类 <script>function Person(name, age) {this.name name;this.age age;}Person.prototype.sayH…

基于springboot,vue学生宿舍管理系统

开发工具&#xff1a;IDEA 服务器&#xff1a;Tomcat9.0&#xff0c; jdk1.8 项目构建&#xff1a;maven 数据库&#xff1a;mysql5.7 系统分前后台&#xff0c;项目采用前后端分离 前端技术&#xff1a;vuevue-element-admin 服务端技术&#xff1a;springboot,mybatis…