207.Flink(二):架构及核心概念,flink从各种数据源读取数据,各种算子转化数据,将数据推送到各数据源

news2024/10/6 12:30:43

一、Flink架构及核心概念

1.系统架构

  • JobMaster是JobManager中最核心的组件,负责处理单独的作业(Job)。
  • 一个job对应一个jobManager

 2.并行度

(1)并行度(Parallelism)概念

一个特定算子的子任务(subtask)的个数被称之为其并行度(parallelism)。这样,包含并行子任务的数据流,就是并行数据流,它需要多个分区(stream partition)来分配并行任务。

流程序的并行度 = 其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。

(2)设置并行度

对某个具体算子设置并行度:

stream.map(word -> Tuple2.of(word, 1L)).setParallelism(2);

全局设置并行度:

env.setParallelism(2);

提交任务时指定:

  • 通过页面上传jar的时候可以指定
  • 可以在命令行启动的时候通过 -p 3指定

flink-conf.yaml中配置:

parallelism.default: 2

优先级:

代码中具体算子 > 代码中全局 > 提交任务指定 > 配置文件中指定

3.算子链

(1)算子间的数据传输

*1)一对一(One-to-one,forwarding)

这种模式下,数据流维护着分区以及元素的顺序。它们之间不需要重新分区,也不需要调整数据的顺序。map、filter、flatMap等算子都是这种one-to-one的对应关系。这种关系类似于Spark中的窄依赖。

*2)重分区(Redistributing)

在这种模式下,数据流的分区会发生改变。每一个算子的子任务,会根据数据传输的策略,把数据发送到不同的下游目标任务。这些传输方式都会引起重分区的过程,这一过程类似于Spark中的shuffle。

(2)合并算子链

在Flink中,并行度相同的一对一(one to one)算子操作,可以直接链接在一起形成一个“大”的任务(task),这样原来的算子就成为了真正任务里的一部分 

// 禁用算子链,该算子不会和前面和后面串在一起
.map(word -> Tuple2.of(word, 1L)).disableChaining();

// 全局禁用算子链
env.disableChaining();

// 从当前算子开始新链
.map(word -> Tuple2.of(word, 1L)).startNewChain()

  • 当一对一的时候,每个运算量都很大,这个时候不适合串在一起。
  • 当需要定位具体问题的时候,不串在一起更容易排查问题

4.任务槽

(1)任务槽(Task Slots)概念

Flink中每一个TaskManager都是一个JVM进程,它可以启动多个独立的线程,来并行执行多个子任务(subtask)。

TaskManager的计算资源是有限的,为了控制并发量,TaskManager对每个任务运行所占用的内存资源做出明确的划分,这就是所谓的任务槽(task slots)。

每个任务槽的大小是均等的,且任务槽之间的资源不可以互相借用。

如图,每个TaskManager有三个任务槽,每个槽运行自己的任务。槽的大小均等。

(2)任务槽数量的设置

在Flink的/opt/module/flink-1.17.0/conf/flink-conf.yaml配置文件中,可以设置TaskManager的slot数量,默认是1个slot。

taskmanager.numberOfTaskSlots: 8

slot目前仅仅用来隔离内存,不会涉及CPU的隔离。在具体应用时,建议将slot数量配置为机器的CPU核心数。

(3)任务对任务槽的共享

在同一个作业中,不同任务节点的并行子任务可以放在同一个slot上执行

 可以共享:

  • 同一个job中,不同算子的子任务才可以共享同一个slot。这些子任务是同时运行
  • 前提是:属于同一个slot共享组,默认都是“default”

手动指定共享组:

.map(word -> Tuple2.of(word, 1L)).slotSharingGroup("1");

共享的好处:允许我们保存完整的作业管道。这样一来,即使某个TaskManager出现故障宕机,其他节点也可以完全不受影响,作业的任务可以继续执行

(4)任务槽和并行度的关系

  • 任务槽是静态的概念,是指TaskManager具有的并发执行能力,可以通过参数taskmanager.numberOfTaskSlots进行配置
  • 并行度是动态概念,也就是TaskManager运行程序时实际使用的并发能力,可以通过参数parallelism.default进行配置

如果是yarn模式,申请的TaskManager的数量 = job并行度 / 每个TM的slot数量,向上取整

即:假设10个并行度的job,每个TM的slot是3个,那么需要10/3,向上取整,即需要最少4个TaskManager

二、作业提交流程

1.Standalone会话模式作业提交流程

逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)。

  • 逻辑流图:列出并行度,算子,各算子之间关系(一对一还是需要重分区)
  • 作业图:将一对一的算子做算子链的优化,作业中间会有中间结果集
  • 执行图:将并行度展开,并标注每个并行处理的算子
  • 物理图:基本同执行图,是执行图的落地

2.Yarn应用模式作业提交流程

三、 DataStream API

DataStream API是Flink的核心层API。一个Flink程序,其实就是对DataStream的各种转换。

1.执行环境(Execution Environment)

(1)创建执行环境

*1)StreamExecutionEnvironment.getExecutionEnvironment();

它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境

*2)StreamExecutionEnvironment.createLocalEnvironment();

这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的CPU核心数

*3)StreamExecutionEnvironment
          .createRemoteEnvironment(
            "host",                   // JobManager主机名
            1234,                     // JobManager进程端口号
               "path/to/jarFile.jar"  // 提交给JobManager的JAR包
        );

这个方法返回集群执行环境。需要在调用时指定JobManager的主机名和端口号,并指定要在集群中运行的Jar包。

 (2)执行模式(Execution Mode)

流批一体:代码api是同一套,可以指定为 批,也可以指定为 流。

通话代码配置:

env.setRuntimeMode(RuntimeExecutionMode.BATCH);

通过命令行配置:

bin/flink run -Dexecution.runtime-mode=BATCH

(3)触发程序执行

当main()方法被调用时,并没有真正处理数据。只有等到数据到来,才会触发真正的计算,这也被称为“延迟执行”或“懒执行”。

所以我们需要显式地调用执行环境的execute()方法,来触发程序执行。execute()方法将一直等待作业完成,然后返回一个执行结果(JobExecutionResult)。

如果在一段代码里面执行多个任务,可以使用env.executeAsync();

package com.atguigu.env;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 */
public class EnvDemo {
    public static void main(String[] args) throws Exception {

        Configuration conf = new Configuration();
        conf.set(RestOptions.BIND_PORT, "8082");

        StreamExecutionEnvironment env = StreamExecutionEnvironment
//                .getExecutionEnvironment();  // 自动识别是 远程集群 ,还是idea本地环境
                .getExecutionEnvironment(conf); // conf对象可以去修改一些参数

//                .createLocalEnvironment()
//        .createRemoteEnvironment("hadoop102", 8081,"/xxx")

        // 流批一体:代码api是同一套,可以指定为 批,也可以指定为 流
        // 默认 STREAMING
        // 一般不在代码写死,提交时 参数指定:-Dexecution.runtime-mode=BATCH
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);


        env
//                .socketTextStream("hadoop102", 7777)
                .readTextFile("input/word.txt")
                .flatMap(
                        (String value, Collector<Tuple2<String, Integer>> out) -> {
                            String[] words = value.split(" ");
                            for (String word : words) {
                                out.collect(Tuple2.of(word, 1));
                            }
                        }
                )
                .returns(Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(value -> value.f0)
                .sum(1)
                .print();

        env.execute();
        /** TODO 关于execute总结(了解)
         *     1、默认 env.execute()触发一个flink job:
         *          一个main方法可以调用多个execute,但是没意义,指定到第一个就会阻塞住
         *     2、env.executeAsync(),异步触发,不阻塞
         *         => 一个main方法里 executeAsync()个数 = 生成的flink job数
         *     3、思考:
         *         yarn-application 集群,提交一次,集群里会有几个flink job?
         *         =》 取决于 调用了n个 executeAsync()
         *         =》 对应 application集群里,会有n个job
         *         =》 对应 Jobmanager当中,会有 n个 JobMaster
         */
//        env.executeAsync();
        // ……
//        env.executeAsync();


    }
}

2.源算子(Source)

从Flink1.12开始,主要使用流批统一的新Source架构:

DataStreamSource<String> stream = env.fromSource(…)

(1)创建pojo对象

需要空参构造器,所有属性的类型都是可以序列化的

package com.atguigu.bean;

import java.util.Objects;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 */
public class WaterSensor {
    public String id;//水位传感器类型
    public Long ts;//传感器记录时间戳
    public Integer vc;//水位记录

    // 一定要提供一个 空参 的构造器
    public WaterSensor() {
    }

    public WaterSensor(String id, Long ts, Integer vc) {
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public Long getTs() {
        return ts;
    }

    public void setTs(Long ts) {
        this.ts = ts;
    }

    public Integer getVc() {
        return vc;
    }

    public void setVc(Integer vc) {
        this.vc = vc;
    }

    @Override
    public String toString() {
        return "WaterSensor{" +
                "id='" + id + '\'' +
                ", ts=" + ts +
                ", vc=" + vc +
                '}';
    }


    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        WaterSensor that = (WaterSensor) o;
        return Objects.equals(id, that.id) &&
                Objects.equals(ts, that.ts) &&
                Objects.equals(vc, that.vc);
    }

    @Override
    public int hashCode() {

        return Objects.hash(id, ts, vc);
    }
}

(2)从集合中读取数据

package com.atguigu.source;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 */
public class CollectionDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // TODO 从集合读取数据
        DataStreamSource<Integer> source = env
                .fromElements(1,2,33); // 从元素读
//                .fromCollection(Arrays.asList(1, 22, 3));  // 从集合读


        source.print();

        env.execute();

    }
}

(3)从文件读取数据

先添加配置:

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>1.17.0</version>
			</dependency>
package com.atguigu.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 */
public class FileSourceDemo {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        // TODO 从文件读: 新Source架构

        FileSource<String> fileSource = FileSource
                .forRecordStreamFormat(
                        new TextLineInputFormat(),
                        new Path("input/word.txt")
                )
                .build();

        env
                .fromSource(fileSource, WatermarkStrategy.noWatermarks(), "filesource")
                .print();


        env.execute();
    }
}
/**
 *
 * 新的Source写法:
 *   env.fromSource(Source的实现类,Watermark,名字)
 *
 */

(4)从Socket读取数据

DataStream<String> stream = env.socketTextStream("localhost", 7777);

(5)从Kafka读取数据

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.17.0</version>
</dependency>
package com.atguigu.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

/**
 * TODO
 *
 * @author cjp
 * @version 1.0
 */
public cl

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1032545.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

八大排序(四)--------直接插入排序

本专栏内容为&#xff1a;八大排序汇总 通过本专栏的深入学习&#xff0c;你可以了解并掌握八大排序以及相关的排序算法。 &#x1f493;博主csdn个人主页&#xff1a;小小unicorn ⏩专栏分类&#xff1a;八大排序汇总 &#x1f69a;代码仓库&#xff1a;小小unicorn的代码仓库…

蓝桥杯每日一题2023.9.22

4960. 子串简写 - AcWing题库 题目描述 题目分析 原本为纯暴力但是发现会超时&#xff0c;可以加入前缀和&#xff0c;从前往后先记录一下每个位置c1出现的次数 再从前往后扫一遍&#xff0c;如果遇到c2就将答案加上此位置前的所有c1的个数&#xff08;直接加上此位置的前缀…

Mybatis学习笔记4 用javassist动态实现DAO接口基于接口的CRUD

Mybatis学习笔记3 在Web中应用Mybatis_biubiubiu0706的博客-CSDN博客 上篇最后在DAO实现类中,代码固定,没有业务逻辑,这篇笔记中对该实现类进行封装,就是说,以后不用写DAO实现类了 我们不难发现&#xff0c;这个dao实现类中的⽅法代码很固定&#xff0c;基本上就是⼀⾏代码&am…

【面试经典150 | 双指针】三数之和

文章目录 写在前面Tag题目来源题目解读解题思路方法一&#xff1a;暴力枚举方法二&#xff1a;双指针 写在最后 写在前面 本专栏专注于分析与讲解【面试经典150】算法&#xff0c;两到三天更新一篇文章&#xff0c;欢迎催更…… 专栏内容以分析题目为主&#xff0c;并附带一些对…

AIGC|从革新内容创作到社会共识建立,迎接全新技术维度

在人工智能的巨浪之下&#xff0c;我们身临一场前所未有的文化演变&#xff0c;一股革命性的力量正在重新定义我们的创造性边界。这股力量不是人类的智慧&#xff0c;而是人工智能生成内容&#xff08;AIGC&#xff09;技术&#xff0c;它正以前所未有的速度和广度改变着我们的…

上PICO,沉浸式观看亚运直播,参与跨国界游戏竞技

备受瞩目的杭州第19届亚运会&#xff0c;将于9月23日正式开幕。据悉&#xff0c;这也是有史以来项目最多的一届亚运会&#xff0c;除部分传统奥运项目外&#xff0c;还包含武术、藤球、板球、克柔术、柔术等亚洲特色项目&#xff0c;以及霹雳舞、电子竞技等深受年轻人喜爱的新兴…

数字赋能 融链发展 ——2023工博会数字化赋能专精特新“小巨人”企业高质量发展论坛顺利举行

编者按&#xff1a;2023年政府工作报告提出“加快传统产业和中小企业数字化转型”要求&#xff0c;按照《“十四五”促进中小企业发展规划》《关于开展中小企业数字化转型城市试点工作的通知》等文件的部署&#xff0c;通过开展城市试点&#xff0c;支持地方政府综合施策&#…

AI视频剪辑:批量智剪技巧大揭秘

对于许多内容创作者来说&#xff0c;视频剪辑是一项必不可少的技能。然而&#xff0c;传统的视频剪辑方法需要耗费大量的时间和精力。如今&#xff0c;有一种全新的剪辑方式正在改变这一现状&#xff0c;那就是批量AI智剪。这种智能化的剪辑方式能够让你在短时间内轻松剪辑大量…

pg数据表同步到hive表数据压缩总结

1、背景 pg库存放了大量的历史数据&#xff0c;pg的存储方式比较耗磁盘空间&#xff0c;pg的备份方式&#xff0c;通过pgdump导出后&#xff0c;进行gzip压缩&#xff0c;压缩比大概1/10&#xff0c;随着数据的积累磁盘空间告警。为了解决pg的压力&#xff0c;尝试采用hive数据…

如何在没有第三方.NET库源码的情况,调试第三库代码?

大家好&#xff0c;我是沙漠尽头的狼。 本方首发于Dotnet9&#xff0c;介绍使用dnSpy调试第三方.NET库源码&#xff0c;行文目录&#xff1a; 安装dnSpy编写示例程序调试示例程序调试.NET库原生方法总结 1. 安装dnSpy dnSpy是一款功能强大的.NET程序反编译工具&#xff0c;…

Qt创建线程(使用moveToThread方法创建子线程)

1.moveTothread方法: &#xff08;1&#xff09;要使用moveToThread方法必须继承与QObject类 &#xff08;2&#xff09;创建任务对象时不能指定父对象 例子&#xff1a; MyWork* work new MyWork(this); // error MyWork* work new MyWork; // ok &#xff08;3&#…

常用的深度学习自动标注软件

0. 简介 自动标注软件是一个非常节省人力资源的操作&#xff0c;而随着深度学习的发展&#xff0c;这些自动化标定软件也越来越多。本文章将会着重介绍其中比较经典的自动标注软件 1. AutoLabelImg AutoLabelImg 除了labelimg的初始功能外&#xff0c;额外包含十多种辅助标注…

vue3——pixi初学,编写一个简单的小游戏,复制粘贴可用学习

pixi官网 小游戏效果 两个文件夹 一个index.html 一个data.js //data.js import { reactive } from "vue"; import { Sprite, utils, Rectangle, Application, Text, Graphics } from "pixi.js";//首先 先创建一个舞台 export const app new Applicat…

基于开源模型的实时人脸识别系统(九):软件说明

续 人脸识别_CodingInCV的博客-CSDN博客 文章目录 前言简介模型选择的要求总体流程图人脸检测人脸跟踪人脸质量人脸关键点人脸识别代码结构人脸识别的逻辑高阶设置 前言 前面的文章我们介绍了整个系统里的关键步骤&#xff0c;基于这些步骤我们就可以搭建出属于自己的人脸识别…

Java 并发编程面试题——Lock 与 AbstractQueuedSynchronizer (AQS)

目录 1.Lock1.1.Lock 是什么&#xff1f;1.2.Lock 接口提供了哪些 synchronized 关键字不具备的主要特性&#xff1f;1.3.✨Lock 与 synchronized 有什么区别&#xff1f;1.4.Lock 接口中有哪些方法&#xff1f;1.5.哪些类实现了 Lock 接口&#xff1f; 2.AbstractQueuedSynchr…

使用YOLOv5-C3模块识别图像天气 - P8

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 | 接辅导、项目定制&#x1f680; 文章来源&#xff1a;K同学的学习圈子 目录 环境步骤环境设置引用包全局设备对象 数据准备数据集信息收集图像预处理读取数据…

【Vue】模块基本语法

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;在这里&#xff0c;我要推荐给大家我的专栏《Vue快速入门》。&#x1f…

什么是WhatsApp群发,WhatsApp协议,WhatsApp云控

那么WhatsApp群控云控可以做什么呢&#xff1f; 1、获客 自动化引流&#xff0c;强大的可控性&#xff0c;产品快速拓客 2、导流 一键式傻瓜化自动加好友&#xff0c;群发&#xff0c;朋友圈营销 3、群控 一键式拉群好友&#xff0c;建群&#xff0c;进群 …

精通git,没用过git cherry-pick?

前言 git cherry-pick是git中比较有用的命令&#xff0c;cherry是樱桃&#xff0c;cherry-pick就是挑樱桃&#xff0c;从一堆樱桃中挑选自己喜欢的樱桃&#xff0c;在git中就是多次commit中挑选一个或者几个commit出来&#xff0c;也可以理解为把特定的commit复制到一个新分支…

大模型应用发展的方向|代理 Agent 的兴起及其未来(上)

“ 介绍了人工智能代理的历史渊源与演进&#xff0c;接着探讨了大型语言模型&#xff08;LLMs&#xff09;的发展&#xff0c;以及它们在知识获取、指令理解、泛化、规划和推理等方面所展现出的强大潜力。在此基础上&#xff0c;提出了一个以大型语言模型为核心的智能代理概念框…