Flink架构底层原理详解:案例解析(43天)

news2024/11/15 10:51:50

系列文章目录

一、Flink架构(掌握)
二、Flink代码案例(掌握)
三、UDF(熟悉)
四、Flink常见面试题整理

文章目录

  • 系列文章目录
  • 前言
    • 一、Flink架构(掌握)
      • 1、系统架构
        • 1.1 通信(了解)
        • 1.2 JobManager
        • 1.3 TaskManager
        • 1.4 Scheduler
        • 1.5 Checkpoint Coordinator
        • 1.6 Memory & IO Manager
        • 1.7 Network Manager
        • 1.8 Client
      • 2、任务提交流程
        • 2.1 抽象提交流程
        • 2.2 Standalone模式提交流程
        • 2.3 Yarn-session模式提交流程
          • 2.3.1 初始化Session集群
          • 2.3.2 提交任务
        • 2.4 Yarn-per-job模式提交流程
        • 2.5 Yarn-application模式提交流程
      • 3、一些重要的概念
        • 3.1 程序流程图
        • 3.2 一些概念
    • 二、Flink代码案例(掌握)
      • 1、需求
      • 2、Flink流式程序开发流程
      • 3、创建项目
      • 4、实现
        • 4.1 批案例
        • 4.2 流案例
        • 4.3 SQL案例
      • 5、提交运行
        • 5.1 开源提交
        • 5.2 阿里云提交
    • 三、UDF(熟悉)
      • 1、概述
      • 2、Scalar Function
        • 2.1 需求
        • 2.2 实现
      • 3、Table Function
        • 3.1 需求
        • 3.2 实现
      • 4、Aggregate Function
        • 4.1 需求
        • 4.2 实现
      • 5、阿里云UDF
        • 5.1 注册UDF函数
        • 5.2 使用UDF函数
    • 四、Flink常见面试题整理
      • 1、Flink中的部署模式?你是如何部署Flink?你的Flink的项目是用什么方式部署?
      • 2、说一下对Flink中时间的理解?你在你的项目中是如何使用Flink进行数据统计的?
      • 3、你在项目中如何解决延迟到来的数据?如何彻底解决数据延迟到的情况?
      • 4、介绍一下Flink的底层原理?介绍一下Flink的架构?
      • 5、用户自定义函数的分类


前言

本文主要详解了Flink架构,通过案例详解Flink流式开发。


提示:以下是本篇文章正文内容,下面案例可供参考

一、Flink架构(掌握)

1、系统架构

官网的架构图如下:

在这里插入图片描述

讲义的架构如下:

在这里插入图片描述

1.1 通信(了解)

Spark的通信:在1.6版本及之前,用的是akka通信框架,在1.6之后,用的是netty。

在这里插入图片描述

Flink的通信:akka通信框架。

在这里插入图片描述

1.2 JobManager

作用:管理众多的TaskManager从节点。负责任务分配和资源管理

JobManager中包括如下3个组件:

  • ResourceManager:这是Flink自己的资源管理器。要和Yarn的ResourceManager区分开来。

  • JobMaster:**作业调度器。**负责向资源管理器申请资源;分配任务给到TaskManager进行执行

  • Dispatcher:分发器。用来接收Client进程提交的Flink任务,然后去启动JobMaster,将Flink任务转发给JobMaster

1.3 TaskManager

作用:接收JobManager分配过来的任务;同时向JobManager汇报Task执行状态、心跳等信息

1.4 Scheduler

Spark中的调度器:DAGScheduler和TaskScheduler

  • DAGScheduler:将Job任务形成DAG有向无环图和划分Stage阶段,确定每个Stage阶段有多少个Task线程
  • TaskScheduler:将DAGScheduler发送过来的TaskSet中的Task线程任务分配给到Executor进程进行执行

Flink:JobMaster作业调度器。负责向资源管理器申请资源;分配任务给到TaskManager进行执行

1.5 Checkpoint Coordinator

检查点协调器。主要负责Checkpoint的操作,对Flink程序进行容错。

1.6 Memory & IO Manager

内存和IO管理器。负责TaskManager的内存和IO管理

1.7 Network Manager

网络管理器,负责不同节点间的Slot进行数据的交换。分为如下3种场景:

# 场景1: 同一个节点,同一个TaskManager的不同Slot间
举例: 你和你的同学都在广州黑马的218教室学习
数据交换效率最高,而且不需要经过网络管理器

# 场景2: 同一个节点,不同的TaskManager的Slot间
举例: 你和你的同学都在广州黑马,但是在不同教室
数据交换效率中等,而且不需要经过网络管理器

# 场景3: 不同节点Slot间
举例: 你在广州黑马,你的同学在深圳黑马
数据交换效率最低,而且需要经过网络管理器
1.8 Client

只是负责任务的提交。提交成功后,其实可以断开了。在命令提交任务时,可以指定-d参数来配置。

如果配置了-d,则说明客户端和集群断开了。

2、任务提交流程

2.1 抽象提交流程

在这里插入图片描述

在这里插入图片描述

1- Flink任务(App)通过Client客户端提交给到分发器
2- 分发器接收到Flink任务以后,接着去启动JobManager中的JobMaster,并且将Flink任务提交给到JobMaster
3- JobMaster接收到Flink任务以后,向ResourceManager资源管理器申请Slot资源
4- 资源管理器接收到资源申请之后,首先启动新的TaskManager
5- 新的TaskManager启动以后,会反向注册回资源管理器,并且告诉它我目前有多少Slot的资源
6- 资源管理器命令TaskManager将空闲的Slot资源提供出来
7- TaskManager接收到资源提供的命令以后,将资源给到JobMaster
8- JobMaster申请到资源以后,将任务分配给到具体的TaskManager进行执行
2.2 Standalone模式提交流程

在这里插入图片描述

(1)客户端提交任务到Dispacher(分发器)

(2)Dispacher分发器启动JobMaster

(3)JobMaster启动后,它会向JobManager的ResourceManager(资源管理器)请求资源(slot)

(4)JobManager的ResourceManager(资源管理器)向TaskManager请求资源(slot)

(5)TaskManager会向JobMaster提供资源(slot)

(6)JobMaster收到资源后,会向TaskManager提交(分发)任务

(7)TaskManager收到任务后,就在Slot上执行

(8)任务执行完后,释放资源

注意:Standalone模式下,Slot资源使用完了以后,那么无法继续提交Flink程序,会报错。

/export/server/flink/bin/flink run -py /export/software/checkpoint_demo.py

在这里插入图片描述

2.3 Yarn-session模式提交流程

如果需要把任务提交在Yarn-Session下运行,则分为2步:

  • 初始化Yarn-session集群
  • 提交任务

首先看第一步。

2.3.1 初始化Session集群

(1)请求Yarn的ResourceManager(资源管理器)

(2)Yarn的ResourceManager收到请求后,会启动一个Container(容器),当然这个容器就是ApplicationMaster(AppMaster)

(3)这个AppMaster就是Flink的JobManager,这个JobManager会初始化Dispacher和ResourceManager(资源管理器)

这里还没有初始化TaskManager,因此集群没有slot资源

在这里插入图片描述

2.3.2 提交任务

在这里插入图片描述

(1)客户端提交任务给JobManager(AppMaster)的分发器(Dispacher)

(2)分发器收到任务后,会启动JobMaster

(3)JobMaster启动后,会向JobManager(AppMaster)请求资源(slot)

(4)JobManager会向Yarn的ResourceManager请求资源

(5)Yarn的ResourceManager收到请求后,会在闲置的节点动态启动Container(TaskManager)

(6)Container启动成功后,会注册给AppMaster(JobManager)的ResourceManager

(7)Container会向AppMaster(JobManager)的JobMaster提供资源(slot)

(8)JobMaster会把任务分发给Container(TaskManager)去执行

(9)待任务执行完后,Container(TaskManager)会被AppMaster(JobManager),最终留下JobManager,这个不会被销毁

2.4 Yarn-per-job模式提交流程

在这里插入图片描述

(1)客户端提交任务给Yarn的ResourceManager

(2)Yarn的ResourceManager收到请求后,会启动一个Container(AppMaster),这个AppMaster就是Flink的JobManager

(3)JobManager里有任务调度器和资源管理器,任务调度器就会开始调度任务,向JobManager的资源管理器申请资源

(4)JobManager的资源管理器它会向Yarn的ResourceManager申请资源

(5)Yarn的ResourceManager会动态启动Container(TaskManager),这些Container就是资源

(6)这些Container启动后,会反向注册给AppMaster(JobManager)

(7)这些Container向JobMaster提供资源

(8)JobMaster收到资源后,把任务分发给Container(TaskManager)去执行

(9)任务执行完后,AppMaster(JobManager)会把Container(TaskManager)注销

(10)AppMaster(JobManager)会向Yarn的ResourceManager注销自己

2.5 Yarn-application模式提交流程

与Yarn-per-job的区别是Client进程运行的地方不一样。application模式是在集群中随机找一个从节点启动和运行Client进程。Flink程序的提交流程与Yarn-per-job完全一样。

3、一些重要的概念

3.1 程序流程图

在这里插入图片描述
在这里插入图片描述

3.2 一些概念
  • 层级关系

Spark层级关系:Spark的应用 > Job任务 > DAG有向无环图 > Stage阶段 > Task线程任务

Flink层级关系:Flink的应用 > Job任务 > DAG有向无环图 > 算子链 > Task线程任务 > SubTasks子任务

  • 并行度

运行同时运行的任务数。Flink的并行度的设置如下:

#1.默认,在配置文件中,优先级最低。不推荐使用
在flink-conf.yaml中可配置

#2.任务提交时指定(推荐)
bin/flink run -p 3 xxxx.jar

#3.在全局代码中配置
env.setParallelism(1)

#4.在算子中,优先级最高
...reduce().setParllelism(1)
  • 算子&算子链

算子:每一个对数据处理的方法/API都称之为算子。

算子链:把窄依赖的算子合并在一起。算子链能够提升数据处理效率

在这里插入图片描述

  • 宽依赖&窄依赖

Spark

宽依赖:Shuffle Dependency

窄依赖:Narrow Dependency

Flink

宽依赖(重分区):redistributing dependency

窄依赖(一对一):one-to-one dependency

  • 概念

Job:Flink的程序

Task:Flink的并行度

SubTask:每个任务中的子任务数

  • Slot槽&槽共享

在这里插入图片描述

槽:slot,是集群的静态资源,在Standalone模式下,槽是预先配置的,不能更改。如果要改,改完后需要重启集群。

Yarn模式,可以通过启动多个TaskManager来动态初始化多个slot槽。

slot是运行Flink的单位。Flink任务必须运行在slot里。

slot和并行度是有关联的。并行度的数量不能超过可用slot的数量。

槽共享:一个槽可以运行不同Task下的多个SubTask。

不同的Task下的相同SubTask,尽量在同一个slot上执行,这是为了提升程序的执行效率。这就是槽共享

相同的Task下的SubTask,一定不会在同一个slot上执行,这是为了充分利用集群资源,达到并行效果。

二、Flink代码案例(掌握)

1、需求

使用代码来实现Flink的wordcount案例。
SparkCore版的WordCount实现过程:
1- 创建顶级对象SparkContext
2- 数据输入
3- 数据处理
	3.1- 文本内容切分: flatMap
	3.2- 数据格式转换: map
	3.3- 分组聚合: reduceByKey
4- 数据输出
5- 释放资源

2、Flink流式程序开发流程

1- 创建流式执行环境
2- 数据输入
3- 数据处理
4- 数据输出
5- 启动流式任务
Flink中算子的分类:
1- source算子: 数据读取
2- transformation算子: 数据处理
3- sink算子: 数据输出

3、创建项目

前提条件:无论是在远程Linux环境还是本地Windows环境。要想成功开发Python版Flink,都需要有Python环境。

推荐如下的操作,在虚拟机集群的所有节点上都执行一次:

#1.保证有Python3.6、3.7或者3.8
python -V

#2.安装flink依赖
python -m pip install apache-flink==1.15.4 -i https://pypi.tuna.tsinghua.edu.cn/simple

在这里插入图片描述

4、实现

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/datastream_tutorial/

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table_api_tutorial/

在这里插入图片描述

4.1 批案例
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
import os

os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1 - 创建流式执行环境
    # 1.1- 得到顶级对象
    env = StreamExecutionEnvironment.get_execution_environment()

    # 1.2- 设置运行模式:批处理
    env.set_runtime_mode(RuntimeExecutionMode.BATCH)

    # 1.3- 设置并行度:全局并行度
    env.set_parallelism(1)

    # 2 - 数据输入
    init_ds = env.read_text_file(file_path="file:///export/data/flink_base/content.txt",charset_name="UTF-8")

    # 3 - 数据处理
    # 3.1- 将文本内容切分得到一个个单词
    """
        lambda 形参1,形参2... : 单行代码
    """
    flatmap_ds = init_ds.flat_map(lambda line: line.split(" "))

    # 3.2- 将单词转成元组
    map_ds = flatmap_ds.map(lambda word: (word,1))

    # 3.3- 按照单词分组
    keyby_ds = map_ds.key_by(lambda tup: tup[0])

    # 3.4- 对单词的次数进行聚合
    """
        rdd.reduceByKey(lambda agg,curr: agg+curr)
    """
    # 错误代码
    # result = keyby_ds.reduce(lambda agg,curr: agg+curr)

    result = keyby_ds.reduce(lambda tup1,tup2: (tup1[0],tup1[1]+tup2[1]))

    # 4 - 数据输出
    result.print()

    # 5 - 启动流式任务
    env.execute()

运行结果截图:

在这里插入图片描述

可能遇到的错误:

在这里插入图片描述

原因: 1- 服务器上没有安装JDK;2- 安装了JDK,但是在代码中没有明确告诉程序JDK在什么地方
解决办法: 在flink代码文件上面添加如下内容
import os
os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'
4.2 流案例
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, DataStream
import os

os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1 - 创建流式执行环境
    # 1.1- 得到顶级对象
    env = StreamExecutionEnvironment.get_execution_environment()

    # 1.2- 设置运行模式:流处理
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

    # 1.3- 设置并行度:全局并行度
    env.set_parallelism(1)

    # 2 - 数据输入
    init_ds = DataStream(env._j_stream_execution_environment.socketTextStream("192.168.88.161",9999))

    # 3 - 数据处理
    # 3.1- 将文本内容切分得到一个个单词
    """
        lambda 形参1,形参2... : 单行代码
    """
    flatmap_ds = init_ds.flat_map(lambda line: line.split(" "))

    # 3.2- 将单词转成元组
    map_ds = flatmap_ds.map(lambda word: (word,1))

    # 3.3- 按照单词分组
    keyby_ds = map_ds.key_by(lambda tup: tup[0])

    # 3.4- 对单词的次数进行聚合
    """
        rdd.reduceByKey(lambda agg,curr: agg+curr)
    """
    # 错误代码
    # result = keyby_ds.reduce(lambda agg,curr: agg+curr)

    result = keyby_ds.reduce(lambda tup1,tup2: (tup1[0],tup1[1]+tup2[1]))

    # 4 - 数据输出
    result.print()

    # 5 - 启动流式任务
    env.execute()

运行结果截图:

在这里插入图片描述

可能遇到的错误:

在这里插入图片描述

原因: 9999端口号没有启动
解决办法: 提前在程序运行前,在node1上执行nc -lk 9999
4.3 SQL案例
import os

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment

os.environ['FLINK_HOME'] = '/export/server/flink'
os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

if __name__ == '__main__':
    # 1- 创建流式执行任务
    # 1.1- 创建顶级对象
    env = StreamExecutionEnvironment.get_execution_environment()

    # 1.2- 得到FlinkSQL层级的顶级对象
    table = StreamTableEnvironment.create(stream_execution_environment=env)

    # 1.3- 设置并行度:全局并行度
    # env.set_parallelism(1)

    # 2- 数据输入
    table.execute_sql("""
        create table source(
            word varchar
        ) with (
            'connector'='socket',
            'hostname'='192.168.88.161',
            'port'='9999',
            'format'='csv'
        )
    """)

    # 3- 数据输出
    table.execute_sql("""
        create table sink(
            word varchar,
            cnt bigint
        ) with (
            'connector'='print'
        )
    """)

    # 4- 数据处理
    table.execute_sql("""
        insert into sink
        select 
            word,
            count(1) as cnt
        from source
        group by word
    """).wait()

    # 5- 启动流式任务
    env.execute()

运行结果截图:

在这里插入图片描述

可能遇到的错误一:

在这里插入图片描述

原因: 代码没有找到Flink的安装目录在什么地方
解决办法: 在代码的上面添加如下内容
os.environ['FLINK_HOME'] = '/export/server/flink'

可能遇到的错误二:

在这里插入图片描述

原因: 需要在增删改查的语句代码后面增加wait()的方法调用

在这里插入图片描述

5、提交运行

5.1 开源提交

环境准备:在node1上执行即可

需要确保Flink的Standalone集群是启动的状态,如果没有启动,需要执行如下命令:
cd /export/server/flink/bin
./start-cluster.sh

启动nc
nc -lk 9999

在这里插入图片描述

提交命令:在node1上执行即可

/export/server/flink/bin/flink run -py /export/data/flink_base/flink_sql_wordcount.py

注意: 代码所在的路径要改成你自己的

运行成功截图如下:

在这里插入图片描述

在这里插入图片描述

5.2 阿里云提交

S

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述
)

运行结果截图:

在这里插入图片描述

三、UDF(熟悉)

1、概述

UDF,user defined function,用户自定义函数。

官网如下:

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/functions/udfs/

https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/python/table/udfs/python_udfs/

Flink的UDF函数可以分为如下几种类型:

  • Scalar Function:标量函数,UDF,一对一。举例:split、substr、concat
  • Table Function:表数据生成函数,UDTF,一对多。举例:explode、json_tuple
  • Aggregate Function:聚合函数,UDAF,多对一。举例:sum、avg、max、min、count等
  • Table Aggregate Function:表数据生成聚合函数,UDTAF,多对多。

2、Scalar Function

Scalar Function,UDF。就是一进一出的函数。比如map方法。

2.1 需求
实现一个类似于两数之和的sum函数,函数名:mySum
优先采用SQL来实现。

输入数据:
| num1 | num2 |
|  1   |   2  |
|  3   |   4  |

输出结果:
| num1 | num2 | result |
|  1   |   2  |   3    |
|  3   |   4  |   7    |
2.2 实现
import os

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf

os.environ['FLINK_HOME'] = '/export/server/flink'
os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

"""
    实现一个类似于两数之和的sum函数,函数名:mySum
"""
if __name__ == '__main__':
    # 1- 创建流式执行环境
    # 1.1- 创建顶级对象
    env = StreamExecutionEnvironment.get_execution_environment()

    # 1.2- 得到SQL API层的顶级对象
    table = StreamTableEnvironment.create(stream_execution_environment=env)

    # 1.3- 设置全局并行度
    env.set_parallelism(1)

    # 2- 数据输入
    table.execute_sql("""
        create table source(
            num1 bigint,
            num2 bigint
        ) with (
            'connector'='socket',
            'hostname'='192.168.88.161',
            'port'='9999',
            'format'='csv'
        )
    """)

    # 3- 数据输出
    table.execute_sql("""
        create table sink(
            num1 bigint,
            num2 bigint,
            `result` bigint
        ) with (
            'connector'='print'
        )
    """)

    # 4- 数据处理
    # 4.1- 创建自定义Python函数
    @udf(result_type=DataTypes.BIGINT())
    def mySum_func(num_arg1, num_arg2):
        return num_arg1 + num_arg2

    # 4.2- 注册
    table.create_temporary_function('mySum',mySum_func)

    # 4.3- 调用
    table.execute_sql("""
        insert into sink
        select
            num1,num2,mySum(num1,num2) as  `result`
        from source
    """).wait()


    # 5- 启动流式任务
    env.execute()

运行结果截图:

在这里插入图片描述

3、Table Function

Table Function,表值函数,一进多出的函数。类似于Hive中的UDTF。

3.1 需求
实现一个类似于flatMap的功能(explode)的功能。数据源来自于socket。函数名:myExplode。

输入数据:
| mynum |
|  3    |
|  4    |

输出结果: 返回<mynum,并且大于等于0的数字
| result |
|   0    |
|   1    |
|   2    |

| result |
|   0    |
|   1    |
|   2    |
|   3    |
3.2 实现
udtf在SQL语句中进行调用,语法比较特殊
格式: lateral table(UDTF函数名称(字段名称)) as 视图名称(视图中新的字段名称1,视图中新的字段名称2..视图中新的字段名称n) on true
import os

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf,udtf

os.environ['FLINK_HOME'] = '/export/server/flink'
os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

"""
    实现一个类似于flatMap的功能(explode)的功能。数据源来自于socket。函数名:myExplode
    UDTF
"""
if __name__ == '__main__':
    # 1- 创建流式执行环境
    # 1.1- 创建顶级对象
    env = StreamExecutionEnvironment.get_execution_environment()

    # 1.2- 得到SQL API层的顶级对象
    table = StreamTableEnvironment.create(stream_execution_environment=env)

    # 1.3- 设置全局并行度
    env.set_parallelism(1)

    # 2- 数据输入
    table.execute_sql("""
        create table source(
            mynum bigint
        ) with (
            'connector'='socket',
            'hostname'='192.168.88.161',
            'port'='9999',
            'format'='csv'
        )
    """)

    # 3- 数据输出
    table.execute_sql("""
        create table sink(
            `result` bigint
        ) with (
            'connector'='print'
        )
    """)

    # 4- 数据处理
    # 4.1- 创建自定义的Python函数
    @udtf(result_types=DataTypes.BIGINT())
    def myExplode_func(num_arg):
        return range(num_arg)

    # 4.2- 注册进Flink中
    # 下面2种任意使用其中一个都行
    table.create_temporary_system_function('myExplode', myExplode_func)
    # table.create_temporary_function('myExplode', myExplode_func)

    # 4.3- 调用
    # 错误调用
    # table.execute_sql("""
    #     insert into sink
    #     select
    #         myExplode(mynum) as `result`
    #     from source
    # """).wait()

    # 正确调用
    """
        udtf在SQL语句中进行调用,语法比较特殊
        格式:lateral table(UDTF函数名称(字段名称)) as 视图名称(视图中新的字段名称1,视图中新的字段名称2..视图中新的字段名称n) on true
    """
    table.execute_sql("""
        insert into sink
        select
            new_field
        from source
        left join lateral table(myExplode(mynum)) as tt(new_field) on true
        -- Hive中UDTF函数调用:lateral view explode(split(line,' ')) t as new_field
    """).wait()

    # 5- 启动流式任务
    env.execute()

运行结果截图:

在这里插入图片描述

可能遇到的错误一:

在这里插入图片描述

原因: result单词在Flink的SQL中是一个关键字
解决办法:
	1- 修改result字段的名称,变成不是关键字的
	2- 在result上面加上``

可能遇到的错误二:

在这里插入图片描述

原因: UDTF的调用在FlinkSQL中有特殊的语法要求
解决办法: 改成如下的SQL语句
insert into sink
select
	new_field
from source
left join lateral table(myExplode(mynum)) as tt(new_field) on true

可能遇到的错误三:

在这里插入图片描述

原因: 对UDTF进行注册需要使用@udtf装饰器

可能遇到的错误四:

在这里插入图片描述

原因: @udtf装饰器中的参数名叫做  result_types

4、Aggregate Function

Aggregate Function,聚合函数,是多进一出的函数,类似于Hive的UDAF函数。

4.1 需求
实现一个类似于count的函数,统计词频。数据源为socket,函数名:myCount

输入数据:
|  word  |
| hello  |
| spark  |
| hello  |

输出数据:
| word  |   cnt  |
| hello |    2   |
| spark |    1   |
4.2 实现
import os

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.types import DataType
from pyflink.table.udf import udf, AggregateFunction, ACC, T

os.environ['FLINK_HOME'] = '/export/server/flink'
os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3'

"""
    实现一个类似于count的函数,统计词频。数据源为socket,函数名:myCount
"""

# 4.1- 创建udaf函数的相关代码
class MyUDAF(AggregateFunction):

    """
        创建累加器,也就是用来对数据进行初始化
        累加器的作用:用来临时保存中间的聚合结果
    """
    def create_accumulator(self) -> ACC:
        # 返回列表,里面的0表示的是单词次数的初始值
        return [0]

    # 从累加器中获取数据
    def get_value(self, accumulator: ACC) -> T:
        # 这里的0是索引/下标
        return accumulator[0]

    # 对数据进行累加。+U
    def accumulate(self, accumulator: ACC, *args):
        # 相同的单词,每来一个就对单词次数+1
        # accumulator[0] = accumulator[0] + 1
        accumulator[0] += 1

    # 撤回累加器数据的变化。-U
    def retract(self, accumulator: ACC, *args):
        accumulator[0] = accumulator[0] - 1

    # 合并多个累加器中的值,因为Flink是多线程分布式运行
    def merge(self, accumulator: ACC, accumulators):
        for i in accumulators:
            accumulator[0] = accumulator[0] + i[0]

    # 获得最终结果的数据类型
    def get_result_type(self) -> DataType:
        return DataTypes.BIGINT()

    # 获得累加器中存放的元素的数据类型
    def get_accumulator_type(self) -> DataType:
        return DataTypes.BIGINT()

if __name__ == '__main__':
    # 1- 创建流式执行环境
    # 1.1- 创建顶级对象
    env = StreamExecutionEnvironment.get_execution_environment()

    # 1.2- 得到SQL API层的顶级对象
    table = StreamTableEnvironment.create(stream_execution_environment=env)

    # 1.3- 设置全局并行度
    env.set_parallelism(1)

    # 2- 数据输入
    table.execute_sql("""
        create table source(
            word varchar
        ) with (
            'connector'='socket',
            'hostname'='192.168.88.161',
            'port'='9999',
            'format'='csv'
        )
    """)

    # 3- 数据输出
    table.execute_sql("""
        create table sink(
            word varchar,
            cnt bigint
        ) with (
            'connector'='print'
        )
    """)

    # 4- 数据处理
    # 4.2- 注册进Flink
    """
        UDAF注册的时候,传递的是类的实例对象,也就是类名(参数)
    """
    table.create_temporary_function('myCount',MyUDAF())

    # 4.3- 调用
    table.execute_sql("""
        insert into sink
        select
            word,
            myCount(1) as cnt
        from source
        group by word
    """).wait()


    # 5- 启动流式任务
    env.execute()

运行结果截图:

在这里插入图片描述

5、阿里云UDF

在进行函数注册时,先把函数开发好。

开发阿里云的UDF参考手册:https://help.aliyun.com/zh/flink/developer-reference/python/?spm=a2c4g.11186623.0.0.2ac522158vB92w

5.1 注册UDF函数

选择SQL开发 -> 函数选项,上传压缩包,如下图:

在这里插入图片描述

点击确定,如下图:

在这里插入图片描述

点击创建函数,提示创建成功,如下图:

在这里插入图片描述

到此,则函数创建成功。

5.2 使用UDF函数

在这里插入图片描述

在这里插入图片描述

  • sub_string函数
#1.创建表
CREATE TEMPORARY TABLE function_udf(
  a VARCHAR,
  b INT,
  c INT
) WITH (
  'connector' = 'socket',
  'hostname' = '172.24.24.49',
  'port' = '9999',
  'format' = 'csv'
);


#2.查询SQL
SELECT sub_string(a,2,5) FROM function_udf;

注意: hostname一定要改成自己的ECS服务器内网IP

在这里插入图片描述

  • split函数
#1.创建表
同上,略

#2.查询SQL
SELECT a,b,c,d,e
FROM function_udf,lateral table(split(a)) as T(d,e);

在这里插入图片描述

  • weight_avg函数
#1.创建表
同上,略。

#2.查询SQL
SELECT weighted_avg(b,c) FROM function_udf;

在这里插入图片描述

四、Flink常见面试题整理

1、Flink中的部署模式?你是如何部署Flink?你的Flink的项目是用什么方式部署?

  • 开源Flink:我们使用的是开源版的Flink,部署项目的时候使用的Application应用模式,给你具体说下为什么我们使用application模式进行部署,xxxx。另外我给您介绍下其他一些部署模式。Session、per-job
  • 阿里云Flink:我们使用的是阿里云Flink,部署项目的时候使用的per-job,也就是job分离模式,给你具体说下为什么我们使用job分离模式进行部署,xxxx。另外我给您介绍下其他一些部署模式。Session、application

2、说一下对Flink中时间的理解?你在你的项目中是如何使用Flink进行数据统计的?

步骤一:介绍Flink中的时间分类,并且要说出每个时间代表的含义。

步骤二:结合业务举例说明如何使用3类时间。

​ 举例:在项目中,我们使用Flink进行数据的实时ETL。但是在做数据检查和核对的时候,发现有些数据出现事件时间的缺失,排错问题发现是业务方上报数据,导致了部分事件时间的空缺。然后与业务方进行异常数据处理方式的沟通,最终确定是使用处理时间来填补事件时间空缺的情况。

3、你在项目中如何解决延迟到来的数据?如何彻底解决数据延迟到的情况?

在Flink的SQL开发中,无法解决数据超过watermark允许延迟时间后到来的数据被丢失的情况。在DataStream的开发中可以通过侧输出流解决延迟来的数据,也就是在DataStream的编程中,可以做到彻底的解决数据延迟到来的情况。

如何彻底解决数据延迟到的情况?watermark水印+侧输出流

在这里插入图片描述

4、介绍一下Flink的底层原理?介绍一下Flink的架构?

步骤一:从Flink的作业提交运行时的架构说起:我们开发完Flink程序以后,通过命令行或者界面提交Flink程序到集群中运行,首先第一步会启动Client客户端进程。接着Client进程将我们的Flink job通过Actor通信系统提交给JobManager,JobManager拿到任务后会分配给到具体的TaskManager来执行,并且任务运行的具体场所是TaskManager中的Slot

步骤二:我们的项目中,使用的是阿里云Flink。因此我们通过per-job模式部署Flink程序。接下来给您具体介绍下该模式下任务的底层提交流式是什么样的?

步骤三:回答Per-job作业的提交流程

5、用户自定义函数的分类

UDF:用户自定义函数,输入一行数据,得到一行数据。一对一

UDAF:用户自定义聚合函数,输入多行数据,得到一行数据。多对一

UDTF:用户自定义表数据生成函数,输入一行数据,得到多行数据。一对多

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1942590.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

海康威视工业相机SDK+Python+PyQt开发数据采集系统(支持软件触发、编码器触发)

海康威视工业相机SDKPythonPyQt开发数据采集系统&#xff08;支持软件触发、编码器触发&#xff09; pythonpyqt开发海康相机数据采集系统 1 开发软件功能&#xff1a; 支持搜索相机&#xff1a;Gige相机设备和USB相机设备支持两种触发模式&#xff1a;软件触发和编码器触发支…

Docker、containerd、CRI-O 和 runc 之间的区别

容器与 Docker 这个名称并不紧密相关。你可以使用其他工具来运行容器 您可以使用 Docker 或一堆非Docker 的其他工具来运行容器。docker只是众多选项之一&#xff0c;Docker&#xff08;公司&#xff09;在生态系统中创建了一些很棒的工具&#xff0c;但不是全部。 容器方面有…

【北京迅为】《i.MX8MM嵌入式Linux开发指南》-第三篇 嵌入式Linux驱动开发篇-第四十三章 驱动模块传参

i.MX8MM处理器采用了先进的14LPCFinFET工艺&#xff0c;提供更快的速度和更高的电源效率;四核Cortex-A53&#xff0c;单核Cortex-M4&#xff0c;多达五个内核 &#xff0c;主频高达1.8GHz&#xff0c;2G DDR4内存、8G EMMC存储。千兆工业级以太网、MIPI-DSI、USB HOST、WIFI/BT…

【PPT把当前页输出为图片】及【PPT导出图片模糊】的解决方法(sci论文图片清晰度)

【PPT把当前页输出为图片】及【PPT导出图片模糊】的解决方法 内容一&#xff1a;ppt把当前页输出为图片&#xff1a;内容二&#xff1a;ppt导出图片模糊的解决方法&#xff1a;方法&#xff1a;步骤1&#xff1a;打开注册表编辑器步骤2&#xff1a;修改注册表&#xff1a; 该文…

使用jacob文字生成语音文件时遇到的问题及解决方案

使用jacob文字生成语音文件时 出现如下错误 java.lang.UnsatisfiedLinkError: no jacob-1.18-x64 in java.library.path错误表明Java虚拟机无法在其指定的java.library.path路径中找到名为jacob-1.18-x64的本地库文件。这个错误通常发生在尝试通过JNI或者JNA调用本地库时&…

算法学习笔记(Hello算法)—— 初识算法

1、相关链接 Hello算法&#xff1a;Hello 算法 (hello-algo.com) 2、算法是什么 2.1 算法定义 算法是一系列明确、有限且有效的步骤或指令的集合&#xff0c;用于解决特定问题或执行特定任务。 算法具有以下基本特征&#xff1a; 输入&#xff1a;算法至少有一个输入&…

【python】PyQt5中QAbstractButton基类的特性详细分析与实战应用

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…

聚水潭·奇门对接打通金蝶云星空订单查询接口与销售退货新增接口

聚水潭奇门对接打通金蝶云星空订单查询接口与销售退货新增接口 对接源平台:聚水潭奇门 聚水潭SaaSERP于2014年4月上线&#xff0c;目前累计超过2.5万商家注册使用&#xff0c;成为淘宝应用服务市场ERP类目商家数和商家月订单增速最快的ERP。2014年及2015年“双十一”当天&#…

TQSDRPI开发板教程:实现PL端的UDP回环与GPSDO

本教程将完成一个全面的UDP运行流程与GPSDO测试&#xff0c;从下载项目的源代码开始&#xff0c;通过编译过程&#xff0c;最终将项目部署到目标板卡上运行演示。此外&#xff0c;我们还介绍如何修改板卡的IP地址&#xff0c;以便更好地适应您的网络环境或项目需求。 首先从Gi…

【Java】:洗牌功能和杨辉三角的实现

洗牌 此操作包含的基本功能有&#xff1a; 组牌&#xff1a;组建 52 张扑克牌 四种花色&#xff1a;“♥️”&#xff0c;“♠️”&#xff0c;“⬛️”&#xff0c;“♣️”每种花色 13 张牌&#xff1a;1~13 洗牌&#xff1a;将 52 张扑克牌打乱顺序发牌&#xff1a;给三个人…

MybatisPlus设置动态表名

对于一些数据量比较大的表&#xff0c;为了提高查询性能&#xff0c;我们一般将表拆分成多张表&#xff0c;常见的是根据数据量&#xff0c;按年分表或者按月分表&#xff1b;分表虽然太高了查询性能&#xff0c;但是在查询的时候&#xff0c;如何才能查询执行分表数据呢&#…

谷粒商城实战笔记-45-商品服务-API-三级分类-查询-递归树形结构数据获取

文章目录 一&#xff0c;准备工作1&#xff0c;启动虚拟机2&#xff0c;启动mysql3&#xff0c;执行MySQL脚本插入分类数据4&#xff0c;关于三级分类 二&#xff0c;Controller层新增接口三&#xff0c;Service层新增接口1&#xff0c;代码实现2&#xff0c;测试 从这一节开始…

海康威视综合安防管理平台 detection 前台RCE漏洞复现

0x01 产品简介 海康威视综合安防管理平台是一套“集成化”、“智能化”的平台,通过接入视频监控、一卡通、停车场、报警检测等系统的设备。海康威视集成化综合管理软件平台,可以对接入的视频监控点集中管理,实现统一部署、统一配置、统一管理和统一调度。 0x02 漏洞概述 海康…

Stateflow中的状态转换表

状态转换表是表达顺序模态逻辑的另一种方式。不要在Stateflow图表中以图形方式绘制状态和转换&#xff0c;而是使用状态转换表以表格格式表示模态逻辑。 使用状态转换表的好处包括&#xff1a; 易于对类列车状态机进行建模&#xff0c;其中模态逻辑涉及从一个状态到其邻居的转换…

【Axure高保真原型】批量增加标签——中继器版

今天和大家分享批量增加标签——中继器版的原型模板&#xff0c;效果包括&#xff1a; 添加标签&#xff1a;在输入框了输入需要添加的标签信息&#xff0c;点击添加标签按钮或者按键盘回车键可以动态添加该标签 批量添加&#xff1a;可以一次性添加多个标签&#xff0c;在输入…

15现代循环神经网络—GRU与LSTM

目录 1.门控循环单元 GRU关注一个序列门候选隐状态(candidate hidden state)隐状态总结从零开始代码实现代码简洁实现2.长短期记忆网络 LSTM门候选记忆单元(candidate memory cell)记忆单元隐状态代码1.门控循环单元 GRU GRU 是最近几年提出来的,在 LSTM 之后,是一个稍微简…

FastAPI(六十六)实战开发《在线课程学习系统》接口开发--用户注册接口开发

源码见"fastapi_study_road-learning_system_online_courses: fastapi框架实战之--在线课程学习系统" 在前面我们分析了接口的设计&#xff0c;那么我们接下来做接口的开发。 首先&#xff0c;我们先设计下pydantic用户参数的校验&#xff1a; """…

2024-07-20 Unity插件 Odin Serializer2 —— 序列化教程

文章目录 1 由根对象决定序列化2 实现 Odin 序列化器2.1 继承已有序列化类2.2 自定义序列化类 3 避免 Unity 无限深度警告4 指定序列化秘钥4.1 External String Reference Resolver4.2 External GUID Reference Resolver4.3 External Index Reference Resolver 4 功能与限制4.1…

物联网在电力行业的应用

作者主页: 知孤云出岫 这里写目录标题 作者主页:物联网在电力行业的应用简介主要应用领域代码案例分析1. 智能电表数据采集和分析2. 设备监控和预测性维护3. 能耗管理和优化4. 电力负载预测5. 分布式能源管理6. 电动汽车充电管理7. 电网安全与故障检测 物联网在电力行业的应用…

vue3-video-play 导入 以及解决报错

npm install vue3-video-play --save # 或者 yarn add vue3-video-play import Vue3VideoPlay from vue3-video-play; import vue3-video-play/dist/style.css; app.use(Vue3VideoPlay) <template><div id"main-container-part"><div class"al…