尚硅谷大数据Flink1.17实战教程-笔记03【Flink运行时架构】

news2024/12/25 9:35:17
  • 尚硅谷大数据技术-教程-学习路线-笔记汇总表【课程资料下载】
  • 视频地址:尚硅谷大数据Flink1.17实战教程从入门到精通_哔哩哔哩_bilibili
  1. 尚硅谷大数据Flink1.17实战教程-笔记01【Flink概述、Flink快速上手】
  2. 尚硅谷大数据Flink1.17实战教程-笔记02【Flink部署】
  3. 尚硅谷大数据Flink1.17实战教程-笔记03【Flink运行时架构】
  4. 尚硅谷大数据Flink1.17实战教程-笔记04【】
  5. 尚硅谷大数据Flink1.17实战教程-笔记05【】
  6. 尚硅谷大数据Flink1.17实战教程-笔记06【】
  7. 尚硅谷大数据Flink1.17实战教程-笔记07【】
  8. 尚硅谷大数据Flink1.17实战教程-笔记08【】

目录

基础篇

第04章-Flink部署

P023【023_Flink运行时架构_系统架构】07:13

P024【024_Flink运行时架构_核心概念_并行度】06:45

P025【025_Flink运行时架构_核心概念_并行度设置&优先级】18:40

P026【026_Flink运行时架构_核心概念_算子链】08:34

P027【027_Flink运行时架构_核心概念_算子链演示】17:11

P028【028_Flink运行时架构_核心概念_任务槽】09:52

P029【029_Flink运行时架构_核心概念_任务槽的共享组】07:59

P030【030_Flink运行时架构_核心概念_slot与并行度的关系&演示】21:27

P031【031_Flink运行时架构_提交流程_Standalone会话模式&四张图】09:49

P032【032_Flink运行时架构_提交流程_Yarn应用模式】05:18


基础篇

第04章-Flink部署

P023【023_Flink运行时架构_系统架构】07:13

Flink运行时架构——Standalone会话模式为例

P024【024_Flink运行时架构_核心概念_并行度】06:45

  • 一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。这样,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。
  • 例如:如上图所示,当前数据流中有source、map、window、sink四个算子,其中sink算子的并行度为1,其他算子的并行度都为2。所以这段流处理程序的并行度就是2。

P025【025_Flink运行时架构_核心概念_并行度设置&优先级】18:40

4.2.1 并行度(Parallelism)

2)并行度的设置

在Flink中,可以用不同的方法来设置并行度,它们的有效范围和优先级别也是不同的。

1)代码中设置

我们在代码中,可以很简单地在算子后跟着调用setParallelism()方法,来设置当前算子的并行度:

stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);

这种方式设置的并行度,只针对当前算子有效。

另外,我们也可以直接调用执行环境的setParallelism()方法,全局设定并行度:

env.setParallelism(2);

这样代码中所有算子,默认的并行度就都为2了。我们一般不会在程序中设置全局并行度,因为如果在程序中对全局并行度进行硬编码,会导致无法动态扩容。

这里要注意的是,由于keyBy不是算子,所以无法对keyBy设置并行度。

2)提交应用时设置

在使用flink run命令提交应用时,可以增加-p参数来指定当前应用程序执行的并行度,它的作用类似于执行环境的全局设置:

bin/flink run p 2 c com.atguigu.wc.SocketStreamWordCount

./FlinkTutorial-1.0-SNAPSHOT.jar

如果我们直接在Web UI上提交作业,也可以在对应输入框中直接添加并行度。

package com.atguigu.wc;

import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * TODO DataStream实现Wordcount:读socket(无界流)
 *
 * @author
 * @version 1.0
 */
public class WordCountStreamUnboundedDemo {
    public static void main(String[] args) throws Exception {
        // TODO 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // IDEA运行时,也可以看到webui,一般用于本地测试
        // 需要引入一个依赖 flink-runtime-web
        // 在idea运行,不指定并行度,默认就是 电脑的 线程数
        // StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
        env.setParallelism(3);

        // TODO 2.读取数据: socket
        DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);

        // TODO 3.处理数据: 切换、转换、分组、聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = socketDS
                .flatMap(
                        (String value, Collector<Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        }
                )
                .setParallelism(2)
                .returns(Types.TUPLE(Types.STRING,Types.INT))
                // .returns(new TypeHint<Tuple2<String, Integer>>() {})
                .keyBy(value -> value.f0)
                .sum(1);

        // TODO 4.输出
        sum.print();

        // TODO 5.执行
        env.execute();
    }
}

/**
 并行度的优先级:
    代码:算子 > 代码:env > 提交时指定 > 配置文件
 */

并行度优先级:代码:算子 > 代码:全局env > 提交时指定命令 > 配置文件。

P026【026_Flink运行时架构_核心概念_算子链】08:34

4.2.2 算子链(Operator Chain)

2)合并算子链

在Flink中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task,这样原来的算子就成为了真正任务里的一部分,如下图所示。每个task会被一个线程执行。这样的技术被称为“算子链”(Operator Chain)。

P027【027_Flink运行时架构_核心概念_算子链演示】17:11

package com.atguigu.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * TODO DataStream实现Wordcount:读socket(无界流)
 *
 * @author
 * @version 1.0
 */
public class OperatorChainDemo {
    public static void main(String[] args) throws Exception {
        // TODO 1.创建执行环境
		// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // IDEA运行时,也可以看到webui,一般用于本地测试
        // 需要引入一个依赖 flink-runtime-web
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        // 在idea运行,不指定并行度,默认就是 电脑的 线程数
        env.setParallelism(1);

        // 全局禁用 算子链
		//env.disableOperatorChaining();

        // TODO 2.读取数据:socket
        DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);

        // TODO 3.处理数据: 切换、转换、分组、聚合
        SingleOutputStreamOperator<Tuple2<String,Integer>> sum = socketDS
				//.disableChaining()
                .flatMap(
                        (String value, Collector<String> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(word);
                            }
                        }
                )
                .startNewChain()
				//.disableChaining()
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1))
                .returns(Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);

        // TODO 4.输出
        sum.print();

        // TODO 5.执行
        env.execute();
    }
}

/**
 1、算子之间的传输关系:
     一对一
     重分区

 2、算子 串在一起的条件:
    1) 一对一
    2) 并行度相同

 3、关于算子链的api:
    1)全局禁用算子链:env.disableOperatorChaining();
    2)某个算子不参与链化:  算子A.disableChaining(),  算子A不会与 前面 和 后面的算子 串在一起
    3)从某个算子开启新链条:  算子A.startNewChain(), 算子A不与 前面串在一起,从A开始正常链化
 */

P028【028_Flink运行时架构_核心概念_任务槽】09:52

4.2.3 任务槽(Task Slots)

P029【029_Flink运行时架构_核心概念_任务槽的共享组】07:59

3)任务对任务槽的共享

默认情况下,Flink是允许子任务共享slot的。如果我们保持sink任务并行度为1不变,而作业提交时设置全局并行度为6,那么前两个任务节点就会各自有6个并行子任务,整个流处理程序则有13个子任务。如上图所示,只要属于同一个作业,那么对于不同任务节点(算子)的并行子任务,就可以放到同一个slot上执行。所以对于第一个任务节点source→map,它的6个并行子任务必须分到不同的slot上,而第二个任务节点keyBy/window/apply的并行子任务却可以和第一个任务节点共享slot。

package com.atguigu.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * TODO DataStream实现Wordcount:读socket(无界流)
 *
 * @author
 * @version 1.0
 */
public class SlotSharingGroupDemo {
    public static void main(String[] args) throws Exception {
        // TODO 1.创建执行环境
		// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // IDEA运行时,也可以看到webui,一般用于本地测试
        // 需要引入一个依赖 flink-runtime-web
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        // 在idea运行,不指定并行度,默认就是 电脑的 线程数
        env.setParallelism(1);

        // TODO 2.读取数据:socket
        DataStreamSource<String> socketDS = env.socketTextStream("hadoop102", 7777);

        // TODO 3.处理数据: 切换、转换、分组、聚合
        SingleOutputStreamOperator<Tuple2<String,Integer>> sum = socketDS
                .flatMap(
                        (String value, Collector<String> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(word);
                            }
                        }
                )
                .returns(Types.STRING)
                .map(word -> Tuple2.of(word, 1)).slotSharingGroup("aaa")
                .returns(Types.TUPLE(Types.STRING,Types.INT))
                .keyBy(value -> value.f0)
                .sum(1);


        // TODO 4.输出
        sum.print();

        // TODO 5.执行
        env.execute();
    }
}

/**
 1、slot特点:
    1)均分隔离内存,不隔离cpu
    2)可以共享:
          同一个job中,不同算子的子任务 才可以共享 同一个slot,同时在运行的
          前提是,属于同一个 slot共享组,默认都是“default”

 2、slot数量 与 并行度 的关系
    1)slot是一种静态的概念,表示最大的并发上限
       并行度是一种动态的概念,表示 实际运行 占用了 几个

    2)要求: slot数量 >= job并行度(算子最大并行度),job才能运行
       TODO 注意:如果是yarn模式,动态申请
         --> TODO 申请的TM数量 = job并行度 / 每个TM的slot数,向上取整
       比如session: 一开始 0个TaskManager,0个slot
         --> 提交一个job,并行度10
            --> 10/3,向上取整,申请4个tm,
            --> 使用10个slot,剩余2个slot
 */

P030【030_Flink运行时架构_核心概念_slot与并行度的关系&演示】21:27

4.2.4 任务槽和并行度的关系

任务槽和并行度都跟程序的并行执行有关,但两者是完全不同的概念。简单来说任务槽是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置;而并行度是动态概念,也就是TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置。

slot数量 与 并行度 的关系
    1)slot是一种静态的概念,表示最大的并发上限
       并行度是一种动态的概念,表示 实际运行 占用了 几个

    2)要求: slot数量 >= job并行度(算子最大并行度),job才能运行
       TODO 注意:如果是yarn模式,动态申请
         --> TODO 申请的TM数量 = job并行度 / 每个TM的slot数,向上取整
       比如session: 一开始 0个TaskManager,0个slot
         --> 提交一个job,并行度10
            --> 10/3,向上取整,申请4个tm
            --> 使用10个slot,剩余2个slot

P031【031_Flink运行时架构_提交流程_Standalone会话模式&四张图】09:49

4.3 作业提交流程

4.3.1 Standalone会话模式作业提交流程

4.3.2 逻辑流图/作业图/执行图/物理流图

逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)。

P032【032_Flink运行时架构_提交流程_Yarn应用模式】05:18

4.3.3 Yarn应用模式作业提交流程

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/767169.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

好用的Net反编译工具,界面和VS差不多,供大家学习研究参考

1、无需设置 由于它是一个便携式工具&#xff0c;它不需要您将其安装在目标计算机上&#xff0c;因为只需拆包其存档并启动可执行文件即可完全访问其功能。 您还可以从可移动存储介质(如USB闪存驱动器或外部HDD)运行它。此外&#xff0c;它不会修改系统中的注册表项&#xff0c…

学习记录——SpectFormer、DilateFormer、ShadowFormer

SpectFormer: Frequency and Attention is what you need in a Vision Transformer, arXiv2023 频域混合注意力SpectFormer 2023 论文&#xff1a;https://arxiv.org/abs/2304.06446 代码&#xff1a;https://badripatro.github.io/SpectFormers/ 摘要视觉变压器已经成功地应用…

【Spring——Spring的基础与创建】

目录 &#x1f367;1. 什么是 Spring &#xff1f; &#x1fad6;1.1 容器 &#x1f359;1.2 IoC &#x1f97d;1.3 汽车类——传统写法 &#x1f358;1.4 汽车类——IoC 写法 &#x1f32d;2. 配置 maven 国内源 &#x1f32e;2.1 在设置中勾选文件 &#x1f364;2.2 在…

SpringCloud学习路线(4)—— Nacos注册中心

一、认识和安装Nacos &#xff08;一&#xff09;概念&#xff1a; Nacos是Alibaba的产品&#xff0c;现在是SpringCloud中的一个组件&#xff0c;相较于Eureka功能更加丰富。 &#xff08;二&#xff09;下载地址&#xff1a; https://github.com/alibaba/nacos/releases &am…

一文看懂《关于网络安全和信息化工作重要指示》

7月14日至15日&#xff0c;全国网络安全和信息化工作会议在京召开。《关于网络安全和信息化工作重要指示》也在会上得到解读与传达。 从近年来党的二十大等重大会议上网络安全和数据安全等相关话题多次被提及、我国陆续发布多部网络安全&数据安全相关政策法规等等&#xf…

个人微信号二次开发的实现,api接口

各位兄弟姐妹们大家好&#xff01;&#xff01;&#xff01;&#xff01; 我又瞧到了好玩的微信机器人分享给大家&#xff01; 社群运营这几年风头正盛&#xff0c;不能落伍啊&#xff0c;我们也赶紧组建了社群。 但是微信群还是社交属性为主的&#xff0c;当我们运营多个社群的…

(栈队列堆) 剑指 Offer 09. 用两个栈实现队列 ——【Leetcode每日一题】

❓ 剑指 Offer 09. 用两个栈实现队列 难度&#xff1a;简单 用两个栈实现一个队列。队列的声明如下&#xff0c;请实现它的两个函数 appendTail 和 deleteHead &#xff0c;分别完成在队列尾部插入整数和在队列头部删除整数的功能。(若队列中没有元素&#xff0c;deleteHead …

如何通过振动传感器实现设备的故障诊断和预测性维护?

在现代工业生产中&#xff0c;设备的故障和停机时间对于企业的生产效率和经济效益有着巨大的影响。为了提高设备的可靠性和降低维护成本&#xff0c;越来越多的企业开始采用振动传感器作为关键的监测工具。振动传感器能够实时监测设备的振动情况&#xff0c;并通过数据分析和算…

自制编译器代码4.6含义

规则一&#xff0c;识别一个" 规则二&#xff0c;识别除了",,\n.\r的其他字符 规则三&#xff0c;这里第一个\意思是一个转义字符\&#xff0c;意思是ASCII码 比如\077就是八进制ASCII码的? 规则四&#xff0c;识别其他所有字符 规则五&#xff0c;回到defailt状态 …

不懂代码也不用怕!10款无代码网站搭建平台

作为设计师&#xff0c;对网站满脑子的构思&#xff0c;却受限于时间和技能&#xff08;比如写代码&#xff09;&#xff0c;这是何其无奈&#xff01;那个在你脑中盘桓许久的网站&#xff0c;或许是一个博客&#xff0c;可能是作品展示网站&#xff0c;但无论是哪种&#xff0…

华为OD机试真题 Java 实现【等差数列】【2023 B卷 100分】,附详细解题思路

目录 专栏导读一、题目描述二、输入描述三、输出描述四、解题思路五、Java算法源码六、效果展示 专栏导读 本专栏收录于《华为OD机试&#xff08;JAVA&#xff09;真题&#xff08;A卷B卷&#xff09;》。 刷的越多&#xff0c;抽中的概率越大&#xff0c;每一题都有详细的答…

GUI实例

运行代码&#xff1a; //GUI实例 #include"std_lib_facilities.h" #include"GUI/Simple_window.h" #include"GUI/GUI.h" #include"GUI/Graph.h" #include"GUI/Point.h"struct Lines_window :Window {Lines_window(Point xy…

《5.linux驱动开发-第2部分-5.2.字符设备驱动基础》最简单的模块源码分析 lsmod insmod modinfo rmmod dmesg

5.1.6.驱动应该这么学 5.1.6.1、先学好C语言 5.1.6.2、掌握相关预备知识 (1)硬件操作方面 (2)应用层API 5.1.6.3、驱动学习阶段 (1)注重实践&#xff0c;一步一步写驱动 (2)框架思维&#xff0c;多考虑整体和上下层 (3)先通过简单设备学linux驱动框架 (4)学会总结、记录&#x…

自动化测试框架性能测试报告模板

目录 一、项目概述 二、测试环境说明 三、测试方案 四、测试结果 五、结果分析 总结&#xff1a; 一、项目概述 1.1 编写目的 本次测试报告&#xff0c;为自动化测试框架性能测试总结报告。目的在于总结我们课程所压测的目标系统的性能点、优化历史和可优化方向。 1.2 …

高效太阳能太阳光模拟器

高效太阳能太阳光模拟器是一种能够高效模拟太阳光的设备。它能够产生与太阳光相近的光谱分布、光强和光照角度等特性的光线&#xff0c;用于太阳能相关的研究和应用中。 为了实现高效太阳能太阳光模拟器&#xff0c;以下几个方面是需要考虑的&#xff1a; 1. 光源&#xff1a;…

C++进阶—C++11新特性(移动语义右值引用可变参数模板lambda表达式function包装器bind函数)

目录 0. C11简介 1. 统一的列表初始化 1.1 {}初始化 1.2 std::initializer_list 2. 声明 2.1 auto 2.2 decltype 2.3 nullptr 3. 范围for循环 4. 智能指针 5. STL中一些变化 6. 右值引用和移动语义 6.1 左值引用和右值引用 6.2 左值引用与右值引用比较 6.3 右值…

三菱FX以太网模块设置ModbusTCP通讯

大家好&#xff0c;今天我们要来聊一聊转以太网捷米特JM-ETH-FX&#xff0c;这款设备内部集成了ModbusTCP通讯服务器&#xff0c;这意味着什么&#xff1f;ModbusTCP客户机&#xff0c;比如支持ModbusTCP的组态软件、OPC服务器、PLC以及使用高级语言开发的实现ModbusTCP客户机软…

改善压降过大的六种方法

改善压降过大的六种方法 当进行完压降仿真完之后,如果结果都是PASS的话是我们最希望看到的,但是时常会因为某些原因,导致压降不通过,下面介绍几种弥补压降的几种措施 方法一 靠近用电端 如下图,电源放的离用电端太远将电源模块尽量靠近用电端放置,尤其是小电压大电流的电…

flink1.16读取hive数据存到es 本地和服务器上遇到的问题和解决思路

话不多说 直接上官网 Overview | Apache Flink hive版本 3.1.3000 ​ hadoop 版本 3.1.1.7.1.7 ​ flink 1.16.2 ​ 代码 很简单我还是贴下 import com.fasterxml.jackson.databind.ObjectMapper import com.typesafe.config.{Config, ConfigFactory} import org.apache…