pyflink 环境测试以及测试案例

news2025/1/22 19:11:34

1. py 的 环境以来采用Anaconda环境包

安装版本:https://www.anaconda.com/distribution/#download-section
Python3.8.8版本:Anaconda3-2021.05-Linux-x86_64.sh
下载地址
https://repo.anaconda.com/archive/

2. 安装

bash Anaconda3-2021.05-Linux-x86_64.sh

2.1 如图

在这里插入图片描述

在这里插入图片描述

3. 配置配置anaconda的环境变量:

vim /etc/profile
##增加如下配置
export ANACONDA_HOME=/root/anaconda3/bin
export PATH=$PATH:$ANACONDA_HOME/bin
重新加载环境变量: source /etc/profile

4. 修改bashrc文件

sudo vim ~/.bashrc
添加如下内容:
export PATH=~/anaconda3/bin:$PATH

说明:

profile
其实看名字就能了解大概了, profile 是某个用户唯一的用来设置环境变量的地方, 因为用户可以有多个 shell 比如 bash, sh, zsh 之类的, 但像环境变量这种其实只需要在统一的一个地方初始化就可以了, 而这就是 profile.
bashrc
bashrc 也是看名字就知道, 是专门用来给 bash 做初始化的比如用来初始化 bash 的设置, bash 的代码补全, bash 的别名, bash 的颜色. 以此类推也就还会有 shrc, zshrc 这样的文件存在了, 只是 bash 太常用了而已.

5. 启动anaconda并测试

注意: 请将当前连接node1的节点窗口关闭,然后重新打开,否则无法识别
如图:
在这里插入图片描述

如果没有可以重启服务器。

如果大家发现命令行最前面出现了 (base) 信息, 可以通过以下方式, 退出Base环境

vim ~/.bashrc
拉到文件的最后面: 输入 i 进入插入模式
将以下内容添加:
conda deactivate

6. Anaconda相关组件命令

地址:https://www.continuum.io/downloads

安装包:pip install xxx,conda install xxx
卸载包:pip uninstall xxx,conda uninstall xxx
升级包:pip install upgrade xxx,conda update xxx

6.1 功能:

Anaconda自带,无需单独安装
实时查看运行过程
基本的web编辑器(本地)
ipynb 文件分享
可交互式
记录历史运行结果
修改jupyter显示的文件路径:
通过jupyter notebook --generate-config命令创建配置文件,之后在进入用户文件夹下面查看.jupyter隐藏文件夹,修改其中文件jupyter_notebook_config.py的202行为计算机本地存在的路径。

IPython:

命令:ipython,其功能如下
1.Anaconda自带,无需单独安装
2.Python的交互式命令行 Shell
3.可交互式
4.记录历史运行结果
5.及时验证想法

7. Anaconda中的conda命令做详细介绍和配置。

** 7.1. conda命令及pip命令**

conda install  包名    pip install 包名
conda uninstall 包名   pip uninstall 包名
conda install -U 包名   pip install -U 包名

7.2 Anaconda设置为国内下载镜像

conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
conda config --set show_channel_urls yes

7.3 conda创建虚拟环境

conda env list
conda create py_env python=3.8.8 #创建python3.8.8环境
activate py_env   #激活环境
deactivate py_env #退出环境

----------------------------------------------- Pyflink 环境安装-------------------------------------

8. pyflink 环境安装

激活虚拟环境

source ~/Documents/install/miniconda/bin/activate

创建 pyflink 虚拟环境

conda create --name py310_pyflink171_venv -y -q python=3.10.8
conda activate py310_pyflink171_venv
pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple
pip install apache-flink==1.17.1 --no-cache-dir  -i https://mirrors.aliyun.com/pypi/simple --use-pep517

或者 flink 14.5

# 创建 pyflink 虚拟环境
conda create --name py314_pyflink171_venv -y -q python=3.8.8
conda activate py314_pyflink171_venv
pip install --upgrade pip -i https://mirrors.aliyun.com/pypi/simple
pip install apache-flink==1.14.5 --no-cache-dir  -i https://mirrors.aliyun.com/pypi/simple --use-pep517

9. 官网测试例子

地址:https://nightlies.apache.org/flink/flink-docs-release-1.16/api/python/examples/table/word_count.html

vi word_count2.py

################################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import argparse
import logging
import sys

from pyflink.table import TableEnvironment, EnvironmentSettings, TableDescriptor, Schema,\
    DataTypes, FormatDescriptor
from pyflink.table.expressions import col, lit
from pyflink.table.udf import udf

words = ["flink", "window", "timer", "event_time", "processing_time", "state",
         "connector", "pyflink", "checkpoint", "watermark", "sideoutput", "sql",
         "datastream", "broadcast", "asyncio", "catalog", "batch", "streaming"]

max_word_id = len(words) - 1


def streaming_word_count(output_path):
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())

    # define the source
    # randomly select 5 words per second from a predefined list
    t_env.create_temporary_table(
        'source',
        TableDescriptor.for_connector('datagen')
                       .schema(Schema.new_builder()
                               .column('word_id', DataTypes.INT())
                               .build())
                       .option('fields.word_id.kind', 'random')
                       .option('fields.word_id.min', '0')
                       .option('fields.word_id.max', str(max_word_id))
                       .option('rows-per-second', '5')
                       .build())
    tab = t_env.from_path('source')

    # define the sink
    if output_path is not None:
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('filesystem')
                           .schema(Schema.new_builder()
                                   .column('word', DataTypes.STRING())
                                   .column('count', DataTypes.BIGINT())
                                   .build())
                           .option('path', output_path)
                           .format(FormatDescriptor.for_format('canal-json')
                                   .build())
                           .build())
    else:
        print("Printing result to stdout. Use --output to specify output path.")
        t_env.create_temporary_table(
            'sink',
            TableDescriptor.for_connector('print')
                           .schema(Schema.new_builder()
                                   .column('word', DataTypes.STRING())
                                   .column('count', DataTypes.BIGINT())
                                   .build())
                           .build())

    @udf(result_type=DataTypes.STRING())
    def id_to_word(word_id):
        return words[word_id]

    # compute word count
    tab.select(id_to_word(col('word_id'))).alias('word') \
       .group_by(col('word')) \
       .select(col('word'), lit(1).count) \
       .execute_insert('sink') \
       .wait()
    # remove .wait if submitting to a remote cluster, refer to
    # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
    # for more details


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")

    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--output',
        dest='output',
        required=False,
        help='Output file to write results to.')

    argv = sys.argv[1:]
    known_args, _ = parser.parse_known_args(argv)

    streaming_word_count(known_args.output)




10.执行命令

 python word_count2.py

11. 执行结果

在这里插入图片描述

12 实例2

12.1 安装mysql docker 安装
https://blog.csdn.net/wudonglianga/article/details/133927305
12.2 创建表语句

CREATE TABLE `bigdatauser` (
  `id` int(32) NOT NULL AUTO_INCREMENT,
  `name` varchar(64) DEFAULT NULL,
  `age` int(32) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4

12.3 python 脚本

from pyflink.table import EnvironmentSettings, TableEnvironment


print('step 01')
# 1. create a TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
print('step 02')

table_env.get_config().get_configuration().set_string("pipeline.jars", "file:/opt/flink/lib/flink-connector-jdbc_2.12-1.14.5.jar;file:/opt/flink/lib/mysql-connector-java-5.1.49.jar")
print('step 03')

# 2. create source Table=
table_env.execute_sql("""
    CREATE TABLE products (
        id INT,
        name STRING,
        age INT,
        PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
	'connector' = 'jdbc',  
        'url' = 'jdbc:mysql://192.168.43.185:3306/wudl',
        'username'='root',
        'password'='123456',
        'table-name' = 'bigdatauser'
        
    )
""")

# 3. create sink Table
table_env.execute_sql("""
    CREATE TABLE dproducts (
        id int,
        name STRING,
        age int
    ) WITH (
        'connector' = 'print'
    )
""")

print('step 04')
table_env.execute_sql("INSERT INTO dproducts SELECT  p.id, p.name, p.age FROM products AS p").wait()
print('step 06')

12.4 执行结果
在这里插入图片描述

13. 任务提交服务器运行

打包运行环境

# 找到 minconda(安装路径 envs目录下) 或者对应虚拟环境安装目录
# 打包 py310_pyflink171_venv 虚拟环境
cd ~/Documents/install/miniconda/env
zip -r py310_pyflink171_venv.zip py310_pyflink171_venv

** 1.提交至 jobmanager**

./flink run \
--jobmanager localhost:8081 \
-pyarch file:///workplace/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-py /workplace/src/word_count.py

2. 带目录,指定入口模块提交

./flink run \
--jobmanager localhost:8081 \
-pyarch file:///workplace/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyfs /workplace/src \
-pym word_count

3. 提交至 yarn 集群管理
提交运行
3.1.本地 py虚拟环境

./flink run -m yarn-cluster \
-pyarch file:///workplace/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-py word_count.py

3.2. hdfs py虚拟环境

./flink run  -m yarn-cluster \
-pyarch hdfs://dae-ns/py_env/py310_pyflink171_venv.zip \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv \
-py word_count.py

3.3.带目录 src

./bin/flink run-application -t yarn-application \
-Dyarn.application.name=wordcount \
-Dyarn.ship-files=/workplace/src \
-pyarch shipfiles/py310_pyflink171_venv.zip \
-pyclientexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyexec py310_pyflink171_venv.zip/py310_pyflink171_venv/bin/python3 \
-pyfs src \
-pym word_count

注意:

虚拟环境打包,该虚拟环境创建方式建议使用 conda,或者virtualenv --always-copy 方式创建,这样打的虚拟环境更全
提交虚拟环境地址:py310_pyflink171_venv.zip/py310_pyflink171_venv 注意这个地址是双层

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1110235.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于SSM+Vue的汽车服务商城系统

末尾获取源码 开发语言:Java Java开发工具:JDK1.8 后端框架:SSM 前端:Vue 数据库:MySQL5.7和Navicat管理工具结合 服务器:Tomcat8.5 开发软件:IDEA / Eclipse 是否Maven项目:是 目录…

SpringFrameWork之注解类管理Bean

1 Bean 注解方式的扫描 1.1 .1注解理解 和 XML 配置文件一样,注解本身并不能执行,注解本身仅仅只是做一个标记,具体的功能是框架检测到注解标记的位置,然后针对这个位置按照注解标记的功能来执行具体操作。 1.1.2 扫描理解 Sp…

怎么使用LightPicture开源搭建图片管理系统并远程访问?【搭建私人图床】

文章目录 1.前言2. Lightpicture网站搭建2.1. Lightpicture下载和安装2.2. Lightpicture网页测试2.3.cpolar的安装和注册 3.本地网页发布3.1.Cpolar云端设置3.2.Cpolar本地设置 4.公网访问测试5.结语 1.前言 现在的手机越来越先进,功能也越来越多,而手机…

【单片机毕业设计】【hj-006-1】烟雾、甲烷气体检测 | 空气质量检测 | 有害气体检测

一、基本介绍 项目名: 基于单片机的烟雾、甲烷气体检测系统设计 基于单片机的空气质量检测 系统设计 基于单片机的有害气体检测系统设计 项目编号:mcuclub-hj-006-1 单片机类型:STM32F103C8T6 具体功能: 1、通过MQ-2检测烟雾值&…

人防行业通信系统

深圳市华脉智联科技有限公司是一家拥有核心自主知识产权的高科技企业,公司致力于公网对讲、融合通信、应急通信、执法调度等领域的系统和技术的开发和探讨,为行业用户提供一整套以通信为基础,软硬件结合的实战解决方案。华脉智联始终坚持将解…

2023年淘宝天猫双十一什么时候开始?天猫双十一满减活动规则和优惠力度是多少

2023年天猫淘宝双十一活动将在10月24日20时开启,同时包含两波正式开买时间点,分别为10月31日20时和11月10日20时。 一、2023天猫淘宝双十一活动时间表 第一波 (1)预售 预售预热:2023年10月24日14:00:00-2023年10月24日19:59:59 定金*支付…

工控机通过Profinet转Modbus RTU网关与报警控制仪通讯配置案例

本文将以工控机通过Profinet转Modbus RTU网关(XD-MDPN100)与报警控制仪通讯的实际案例为例,介绍Profinet转Modbus RTU网关(XD-MDPN100)在工业通信的应用。 在某个生产车间的报警系统中,报警控制仪是起到监…

嘉立创新建元件封装进行DRC检测发现无封装

新建元件和封装 元件就是原理图中放的主要是引脚对应正确,封装就是pcb设计中对应元件实际放置的焊盘。所以元件和封装需要关联 关联的时候需要注意 有时候出现元件无封装的情况,一般是因为没有管理封装(简单失误)或者是关联封…

【NPM】vuex 数据持久化库 vuex-persistedstate

在 GitHub 上找到:vuex-persistedstate。 安装 npm install --save vuex-persistedstate使用 import { createStore } from "vuex"; import createPersistedState from "vuex-persistedstate";const store createStore({// ...plugins: [cr…

echarts设置竖向不同区间范围颜色,并且x轴自定义轴线刻度范围

需求:设置竖向范围区间,不同范围区间颜色不同并且提示信息不同,之后修改x轴的固定间距范围,让0-200-400-600改为0-340-476-754这种,在这我是markLine的方式实现的,这边我还用到x轴的翻转所以显示的是镜像的…

视频SDK开发,多平台SDK快速接入

随着科技的不断发展,视频已经成为了企业业务中不可或缺的一部分。无论是在线教育、企业培训还是产品展示,视频都发挥着至关重要的作用。为了满足企业对视频应用的需求,美摄视频SDK应运而生,为企业提供了一站式的视频解决方案。 一…

聊一聊Spring 事务的相关操作

Java全能学习面试指南&#xff1a;https://javaxiaobear.cn 1、JdbcTemplate 1.1、简介 Spring 框架对 JDBC 进行封装&#xff0c;使用 JdbcTemplate 方便实现对数据库操作 1.2、准备工作 ①搭建子模块 搭建子模块&#xff1a;spring-jdbc-tx ②加入依赖 <dependenci…

21天打卡进阶Python基础操作

python21天打卡day3-python数据类型 #int a2 print(a) print(type(a)) #float a2.2 print(a) print(type(a)) #string anihao print(a) print(type(a)) #list a[1,2,3,4] print(a) print(a[0]) print(type(a)) #元组 a(1,2) print(a) print(type(a)) #字典dict a{name:yangyal,…

Unity3D 如何制作带厚度的透明图片详解

Unity3D是一款功能强大的游戏开发引擎&#xff0c;可以实现各种复杂的游戏效果。本文将详细介绍如何使用Unity3D制作带厚度的透明图片&#xff0c;并提供代码实现。 对惹&#xff0c;这里有一个游戏开发交流小组&#xff0c;希望大家可以点击进来一起交流一下开发经验呀&#…

react antd实现upload上传文件前form校验,同时请求带data

最近的需求&#xff0c;两个下拉框是必填项&#xff0c;点击上传按钮&#xff0c;如果有下拉框没选要有提示&#xff0c;如图 如果直接使用antd的Upload组件&#xff0c;一点击文件选择的窗口就打开了&#xff0c;哪怕在Button里再加点击事件&#xff0c;也只是&#xff08;几乎…

AutoCAD 2022 for Mac/Windows升级您的设计工具,提升工作效率

Autodesk AutoCAD 2022 是设计行业最流行的计算机辅助设计 (CAD) 软件之一。这款软件由Autodesk公司开发&#xff0c;它提供了强大的功能&#xff0c;从基本的设计和修改工具&#xff0c;到复杂的3D建模和渲染&#xff0c;一切尽在掌握。通过其直观的用户界面和不断更新的功能&…

mmDetection3D避坑指南

1 运行test.py报CUDA类错误 包括API之类的错误&#xff0c;是因为KITTI数据集有损坏&#xff0c;需要重新导入完整KITTI官方数据集,再根据官方步骤用create_data.py处理

memcpy内存拷贝函数

目录 一、memcpy内存拷贝函数 注意事项 二、memcpy与strcpy对比 三、模拟实现memcpy函数 四、memcpy函数不能进行两块存在内存重叠的空间的内存拷贝 五、改进my_memcpy函数 一、memcpy内存拷贝函数 头文件&#xff1a;string.h 函数原型&#xff1a;void* memcpy(void* …

Acwing 3306.装珠饰(十一届蓝桥java/py组J题)--dp之分组背包(hard)

分析&#xff1a; 6件装备作为一个整体去看待&#xff01;&#xff01;&#xff01;加的效果是看总的装备数目 分组背包的一个特点&#xff1a;每一个组里面只能取出一个物品&#xff0c;这里是把抽象成不同的方案数(有点多重背包的二进制处理方法的感觉。) 代码实现&#xff1…

ubuntu 20.04 passwd 指令不能使用

Linux 更改用户密码报Changing password for user 用户名. passwd: Module is unknown或更改新增用户密码passwd&#xff1a;未知的用户名 报错信息如下&#xff1a; 解决方法&#xff1a; 可以排查 /etc/pam.d/passwd配置文件 注释掉包含pam_passwdqc.so模块的行&#xff0c…