PyFlink使用教程,Flink,Python,Java

news2025/1/10 10:17:00

环境准备

环境要求

Java 11
Python 3.7, 3.8, 3.9 or 3.10

文档:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/installation/

打开 Anaconda3 Prompt

> java -version
java version "11.0.22" 2024-01-16 LTS
Java(TM) SE Runtime Environment 18.9 (build 11.0.22+9-LTS-219)
Java HotSpot(TM) 64-Bit Server VM 18.9 (build 11.0.22+9-LTS-219, mixed mode)

> python --version
Python 3.8.8

> conda env list
# conda environments:
#
base                  *  d:\ProgramData\Anaconda3
tensorflow2.4            d:\ProgramData\Anaconda3\envs\tensorflow2.4

> conda create -n PyFlink-1.17.1 python==3.8.8

> conda activate PyFlink-1.17.1

> python -m pip install apache-flink==1.17.1

> conda list
# packages in environment at d:\ProgramData\Anaconda3\envs\PyFlink-1.17.1:
#
# Name                    Version                   Build  Channel
apache-beam               2.43.0                   pypi_0    pypi
apache-flink              1.17.1                   pypi_0    pypi
apache-flink-libraries    1.17.1                   pypi_0    pypi
avro-python3              1.9.2.1                  pypi_0    pypi
ca-certificates           2023.12.12           haa95532_0    defaults
certifi                   2023.11.17               pypi_0    pypi
charset-normalizer        3.3.2                    pypi_0    pypi
cloudpickle               2.2.0                    pypi_0    pypi
crcmod                    1.7                      pypi_0    pypi
dill                      0.3.1.1                  pypi_0    pypi
docopt                    0.6.2                    pypi_0    pypi
fastavro                  1.4.7                    pypi_0    pypi
fasteners                 0.19                     pypi_0    pypi
grpcio                    1.60.0                   pypi_0    pypi
hdfs                      2.7.3                    pypi_0    pypi
httplib2                  0.20.4                   pypi_0    pypi
idna                      3.6                      pypi_0    pypi
numpy                     1.21.6                   pypi_0    pypi
objsize                   0.5.2                    pypi_0    pypi
openssl                   1.1.1w               h2bbff1b_0    defaults
orjson                    3.9.12                   pypi_0    pypi
pandas                    1.3.5                    pypi_0    pypi
pip                       23.3.1           py38haa95532_0    defaults
proto-plus                1.23.0                   pypi_0    pypi
protobuf                  3.20.3                   pypi_0    pypi
py4j                      0.10.9.7                 pypi_0    pypi
pyarrow                   8.0.0                    pypi_0    pypi
pydot                     1.4.2                    pypi_0    pypi
pymongo                   3.13.0                   pypi_0    pypi
pyparsing                 3.1.1                    pypi_0    pypi
python                    3.8.0                hff0d562_2    defaults
python-dateutil           2.8.2                    pypi_0    pypi
pytz                      2023.3.post1             pypi_0    pypi
regex                     2023.12.25               pypi_0    pypi
requests                  2.31.0                   pypi_0    pypi
setuptools                68.2.2           py38haa95532_0    defaults
six                       1.16.0                   pypi_0    pypi
sqlite                    3.41.2               h2bbff1b_0    defaults
typing-extensions         4.9.0                    pypi_0    pypi
urllib3                   2.1.0                    pypi_0    pypi
vc                        14.2                 h21ff451_1    defaults
vs2015_runtime            14.27.29016          h5e58377_2    defaults
wheel                     0.41.2           py38haa95532_0    defaults
zstandard                 0.22.0                   pypi_0    pypi

下载的包存储在Anaconda3\envs\PyFlink-1.17.1\Lib\site-packages

PyFlink 案例

从Flink 1.11版本开始, PyFlink 作业支持在 Windows 系统上运行,因此您也可以在 Windows 上开发和调试 PyFlink 作业了。

打开 VSCode 切换到 PyFlink-1.17.1 环境,按照 教程 写一个 Table API 的示例

learn_pyflink/tableAPIJob.py

import argparse
import logging
import sys

from pyflink.common import Row
from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor)
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf

word_count_data = ["To be, or not to be,--that is the question:--",
                   "Whether 'tis nobler in the mind to suffer",
                   "The slings and arrows of outrageous fortune",
                   "Or to take arms against a sea of troubles,",
                   "And by opposing end them?--To die,--to sleep,--",
                   "No more; and by a sleep to say we end",
                   "The heartache, and the thousand natural shocks",
                   "That flesh is heir to,--'tis a consummation",
                   "Devoutly to be wish'd. To die,--to sleep;--",
                   "To sleep! perchance to dream:--ay, there's the rub;",
                   "For in that sleep of death what dreams may come,",
                   "When we have shuffled off this mortal coil,",
                   "Must give us pause: there's the respect",
                   "That makes calamity of so long life;",
                   "For who would bear the whips and scorns of time,",
                   "The oppressor's wrong, the proud man's contumely,",
                   "The pangs of despis'd love, the law's delay,",
                   "The insolence of office, and the spurns",
                   "That patient merit of the unworthy takes,",
                   "When he himself might his quietus make",
                   "With a bare bodkin? who would these fardels bear,",
                   "To grunt and sweat under a weary life,",
                   "But that the dread of something after death,--",
                   "The undiscover'd country, from whose bourn",
                   "No traveller returns,--puzzles the will,",
                   "And makes us rather bear those ills we have",
                   "Than fly to others that we know not of?",
                   "Thus conscience does make cowards of us all;",
                   "And thus the native hue of resolution",
                   "Is sicklied o'er with the pale cast of thought;",
                   "And enterprises of great pith and moment,",
                   "With this regard, their currents turn awry,",
                   "And lose the name of action.--Soft you now!",
                   "The fair Ophelia!--Nymph, in thy orisons",
                   "Be all my sins remember'd."]


def word_count(input_path, output_path):
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    # write all the data to one file
    t_env.get_config().set("parallelism.default", "1")

    # define the source
    if input_path is not None:
        t_env.create_temporary_table(
            'source',
            TableDescriptor.for_connector('filesystem')
            .schema(Schema.new_builder()
                    .column('word', DataTypes.STRING())
                    .build())
            .option('path', input_path)
            .format('csv')
            .build())
        tab = t_env.from_path('source')
    else:
        print("Executing word_count example with default input data set.")
        print("Use --input to specify file input.")
        tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
                                  DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))

    # define the sink
    if output_path is not None:
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('filesystem')
            .schema(Schema.new_builder()
                    .column('word', DataTypes.STRING())
                    .column('count', DataTypes.BIGINT())
                    .build())
            .option('path', output_path)
            .format(FormatDescriptor.for_format('canal-json')
                    .build())
            .build())
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('print')
            .schema(Schema.new_builder()
                    .column('word', DataTypes.STRING())
                    .column('count', DataTypes.BIGINT())
                    .build())
            .build())

    @udtf(result_types=[DataTypes.STRING()])
    def split(line: Row):
        for s in line[0].split():
            yield Row(s)

    # compute word count
    tab.flat_map(split).alias('word') \
        .group_by(col('word')) \
        .select(col('word'), lit(1).count) \
        .execute_insert('sink') \
        .wait()
    # remove .wait if submitting to a remote cluster, refer to
    # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
    # for more details


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout,
                        level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--input',
        dest='input',
        required=False,
        help='Input file to process.')
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    word_count(known_args.input, known_args.output)

要在PyFlink-1.17.1环境下运行

> python tableAPIJob.py


Using Any for unsupported type: typing.Sequence[~T]
No module named google.cloud.bigquery_storage_v1. As a result, the ReadFromBigQuery transform *CANNOT* be used with `method=DIRECT_READ`.
Executing word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
+I[To, 1]
+I[be,, 1]
+I[or, 1]
+I[not, 1]
+I[to, 1]
+I[be,--that, 1]
+I[is, 1]
+I[the, 1]
+I[question:--, 1]
+I[Whether, 1]
+I['tis, 1]
+I[nobler, 1]
+I[in, 1]
-U[the, 1]
+U[the, 2]
+I[mind, 1]
-U[to, 1]
+U[to, 2]
.
.
.

提交 PyFlink 作业到 Flink

参考:https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/deployment/cli/#submitting-pyflink-jobs

我的 Flink 是安装在 WSL 上面的,因此也要准备环境。

下载 java-11-linux:https://download.oracle.com/otn/java/jdk/11.0.22%2B9/8662aac2120442c2a89b1ee9c67d7069/jdk-11.0.22_linux-x64_bin.tar.gz

> tar -zxf jdk-11.0.22_linux-x64_bin.tar.gz -C /usr/lib/jdk

# 生成 jre
> bin/jlink --module-path jmods --add-modules java.desktop --output jre

> vi /etc/profile

export JAVA_HOME=/usr/lib/jdk/jdk-11.0.22
export JRE_HOME=${JAVA_HOME}/jre    
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib    
export PATH=${JAVA_HOME}/bin:$PATH

> source /etc/profile 

> ls -l /usr/bin/python*

lrwxrwxrwx 1 root root       9 Mar 26  2019 /usr/bin/python3 -> python3.7
lrwxrwxrwx 1 root root      16 Mar 26  2019 /usr/bin/python3-config -> python3.7-config
-rwxr-xr-x 1 root root    1018 Mar  4  2018 /usr/bin/python3-jsondiff
-rwxr-xr-x 1 root root    3661 Mar  4  2018 /usr/bin/python3-jsonpatch
-rwxr-xr-x 1 root root    1342 May  2  2016 /usr/bin/python3-jsonpointer
-rwxr-xr-x 1 root root     398 Nov 22  2018 /usr/bin/python3-jsonschema
-rwxr-xr-x 2 root root 4877888 Apr  3  2019 /usr/bin/python3.7
lrwxrwxrwx 1 root root      33 Apr  3  2019 /usr/bin/python3.7-config -> x86_64-linux-gnu-python3.7-config
-rwxr-xr-x 2 root root 4877888 Apr  3  2019 /usr/bin/python3.7m
lrwxrwxrwx 1 root root      34 Apr  3  2019 /usr/bin/python3.7m-config -> x86_64-linux-gnu-python3.7m-config
lrwxrwxrwx 1 root root      10 Mar 26  2019 /usr/bin/python3m -> python3.7m
lrwxrwxrwx 1 root root      17 Mar 26  2019 /usr/bin/python3m-config -> python3.7m-config

> python --version

Command 'python' not found, but can be installed with:

apt install python3         # version 3.7.3-1, or
apt install python          # version 2.7.16-1
apt install python-minimal  # version 2.7.16-1

You also have python3 installed, you can run 'python3' instead.

> python3 --version
Python 3.7.3

# 已经安装了 python-3.7.3,创建一个软连接即可
> ln -s /usr/bin/python3.7 /usr/local/bin/python

> python --version
Python 3.7.3

# 设置镜像源,否则会非常慢
> python -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade pip
> pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple

# 启动 Flink-1.17.1
> bin/start-cluster.sh

除此之外,在 WSL 上还需要安装有此python脚本依赖的库,也就是 apache-flink 库。因为 Flink 需要调用 python 命令来解析 pytion 脚本,这里面涉及到 python 和 java 之间的通讯。这一块还只是在 Flink 客户端上面(bin/flink run ...),而 Flink 的 TaskManager 在运行此任务的时候还需要调用 python 解释器,因为上面代码中有UDF函数,这个函数在Java中是不存在的,关于 Flink 支持 Python 任务的内部原理后面再写一篇。

> python -m pip install apache-flink==1.17.1
> pip list

然后将代码中的.wait()调用删掉

tab.flat_map(split).alias('word') \
       .group_by(col('word')) \
       .select(col('word'), lit(1).count) \
       .execute_insert('sink')

提交任务

> ./bin/flink run --python /mnt/d/dev/php/magook/trunk/server/learn-python/learn_pyflink/tableAPIJob.py

Executing word_count example with default input data set.
Use --input to specify file input.
Printing result to stdout. Use --output to specify output path.
Job has been submitted with JobID a18e581d16785a9872336073efdf5df0

来到 webUI 查看任务

在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1420040.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

探索Pyecharts:绘制多彩日历图的艺术与技巧

Pyecharts绘制多种炫酷日历图参数说明代码实战 导言 在数据可视化领域,日历图是一种直观展示时间和数据关系的方式。Pyecharts是一个基于Echarts的Python库,可以方便地绘制各种图表,包括炫酷的日历图。本篇博客将介绍Pyecharts中绘制多种炫…

AI未来10年展望【2024-2034】

人工智能(AI)在过去十年中迅速发展,其未来有望取得更加引人注目的发展。 在本文中,我们将探讨人工智能的未来 10 年以及我们对未来十年的期望。 我们将解决一些关键问题,以全面概述人工智能的未来。 NSDT工具推荐&…

17. 使用 tslib 库

17. 使用 tslib 库 1. tslib 简介2. tslib 移植2.1 下载 tslib 源码2.2 编译 tslib 源码2.3 tslib 安装目录下的文件夹介绍2.4 在开发板上测试 tslib 3. tslib 库函数介绍3.1 打开触摸屏设备3.2 配置触摸屏设备3.3 读取触摸屏设备 4. 基于 tslib 编写触摸屏应用程序4.1 单点触摸…

CHS_03.2.3.2_2+进程互斥的硬件实现方法

CHS_03.2.3.2_2进程互斥的硬件实现方法 知识总览中断屏蔽方法TestAndSet指令Swap指令 知识回顾 进程互斥的四种软件实现方法 知识总览 这个小节我们会介绍另外的三种进程互斥的硬件实现方法 那么 这个小节的学习过程当中 大家需要注意理解各个方法的原理 并且要稍微的了解各个…

OpenHarmony RK3568 启动流程优化

目前rk3568的开机时间有21s,统计的是关机后从按下 power 按键到显示锁屏的时间,当对openharmony的系统进行了裁剪子系统,系统app,禁用部分服务后发现开机时间仅仅提高到了20.94s 优化微乎其微。在对init进程的log进行分析并解决其…

机器学习:多项式回归(Python)

多元线性回归闭式解: closed_form_sol.py import numpy as np import matplotlib.pyplot as pltclass LRClosedFormSol:def __init__(self, fit_interceptTrue, normalizeTrue):""":param fit_intercept: 是否训练bias:param normalize: 是否标准化…

重写Sylar基于协程的服务器(1、日志模块的架构)

重写Sylar基于协程的服务器(1、日志模块的架构) 重写Sylar基于协程的服务器系列: 重写Sylar基于协程的服务器(0、搭建开发环境以及项目框架 || 下载编译简化版Sylar) 重写Sylar基于协程的服务器(1、日志模…

2.室内设计学习 - CAD 2021 调整经典界面教程及基本设置

设置经典界面 1.在第二行的空白处右击,弹出对话框,并点击【关闭】,关闭掉。 2.菜单栏没有显示的情况下,在最上面的一排,点击向下的箭头展开下拉框,勾选 【显示菜单栏】 3.点击菜单【工具】-【工具栏】-【a…

AES 加解密python实现

1. 要求 编程实现AES-128的加解密算法,满足给定明文和密钥加密得到密文,给定密文和密钥解密得到明文,最终用界面化的形式呈现。 2. 算法流程 程序主要分为加密与解密两个大模块。在加密模块中包括四个小模块,分别为轮密钥加、字…

C语言KR圣经笔记 6.4结构体指针 6.5自引用结构体

6.4 结构体指针 为了说明结构体指针和数组的某些注意事项&#xff0c;我们把上一节的关键字计数程序再写一次&#xff0c;不过这回使用指针而不是数组下标。 keytab 的外部声明不需要动&#xff0c;但 main 和 binsearch 确实需要修改。 #include <stdio.h> #include …

3、css设置样式总结、节点、节点之间关系、创建元素的方式、BOM

一、css设置样式的方式总结&#xff1a; 对象.style.css属性 对象.className ‘’ 会覆盖原来的类 对象.setAttribut(‘style’,‘css样式’) 对象.setAttribute(‘class’,‘类名’) 对象.style.setProperty(css属性名,css属性值) 对象.style.cssText “css样式表” …

开发工具之GIT协同开发流程和微服务部署实践与总结

GIT协同开发流程和微服务部署的实践&#xff0c;并总结经验和教训。通过合理的GIT协同开发流程和良好的微服务部署策略&#xff0c;团队可以更高效地开发和部署软件。 ## 引言 在当今快节奏的软件开发环境中&#xff0c;采用合适的工具和流程对于实现高效协同开发和可靠部署至…

1.25时间序列分析,FB先知模型、简要傅里叶变化解决周期性变化,实例步骤

目录 FB概念 ​编辑 GEOGEBRA可视化傅里叶​编辑 先知模型步骤 财务数据要考虑到可解释性 FB模型概念 可以用傅里叶级数来描述周期性变化的因素 GEOGEBRA可视化傅里叶 先知模型步骤

vue+ElementPlus实现中国省市区三级级联动封装

安装插件获取中国省份的所有数据 npm install element-china-area-data -S 借助ElementPlus 级联选择器 Cascader实现 <template><div><el-cascadersize"large":options"options"v-model"selectedOptions"change"handleCh…

C# 一个快速读取写入操作execl的方法封装

这里封装了3个实用类ExcelDataReaderExtensions&#xff0c;ExcelDataSetConfiguration&#xff0c;ExcelDataTableConfiguration和一个实用代码参考&#xff1a; using ExcelDataReader; using System; using System.Collections.Generic; using System.Linq; using System.T…

2024.1.29 关于 Redis 缓存详解

目录 缓存基本概念 二八定律 Redis 作为缓存 缓存更新策略 定期生成 实时生成 内存淘汰策略 缓存使用的注意事项 关于缓存预热 关于缓存穿透 关于缓存雪崩 关于缓存击穿&#xff08;瘫痪&#xff09; 缓存基本概念 所谓缓存&#xff0c;其实就是将一部分常用数据放…

向日葵企业“云策略”升级 支持Android 被控策略设置

此前&#xff0c;贝锐向日葵推出了适配PC企业客户端的云策略功能&#xff0c;这一功能支持管理平台统一修改设备设置&#xff0c;上万设备实时下发实时生效&#xff0c;很好的解决了当远程控制方案部署后&#xff0c;想要灵活调整配置需要逐台手工操作的痛点&#xff0c;大幅提…

计算机网络-数据交换方式(电路交换 报文交换 分组交换及其两种方式 )

文章目录 为什么要数据交换&#xff1f;总览电路交换电路交换的各个阶段建立连接数据传输释放连接 电路交换的特点电路交换的优缺点 报文交换报文交换流程报文交换的优缺点 分组交换分组交换流程分组交换的优缺点 数据交换方式的选择分组交换的两种方式数据报方式数据报方式的特…

正则表达式(RE)

什么是正则表达式 正则表达式&#xff0c;又称规则表达式&#xff08;Regular Expression&#xff09;。正则表达式通常被用来检索、替换那些符合某个规则的文本 正则表达式的作用 验证数据的有效性替换文本内容从字符串中提取子字符串 匹配单个字符 字符功能.匹配任意1个…

(一)Spring 核心之控制反转(IoC)—— 配置及使用

目录 一. 前言 二. IoC 基础 2.1. IoC 是什么 2.2. IoC 能做什么 2.3. IoC 和 DI 是什么关系 三. IoC 配置的三种方式 3.1. XML 配置 3.2. Java 配置 3.3. 注解配置 四. 依赖注入的三种方式 4.1. 属性注入&#xff08;setter 注入&#xff09; 4.2. 构造方法注入&a…