大数据之Flink(二)

news2024/9/21 12:33:08

4、部署模式

flink部署模式:

  • 会话模式(Session Mode)
  • 单作业模式(Per-Job Mode)
  • 应用模式(Application Mode)

区别在于集群的生命周期以及资源的分配方式;以及应用的main方法到底在哪里执行。

4.1、会话模式

先启动一个集群,保持一个会话,在这个会话中通过客户端提交作业。集群启动时资源已经确定,所有提交的作业会晶振集群中的资源。适合规模小、执行时间短的大量作业。

在这里插入图片描述

4.2、单作业模式

资源共享会导致问题,为了隔离资源要为每个提交的作业启动一个集群,即单作业模式。
在这里插入图片描述
作业完成后集群关闭,资源释放,一般借助yarn、K8s资源管理框架启动集群。实际应用首选模式

4.3、应用模式

前面两种模式,代码都在客户端上执行,由客户端提交给JobManager,导致客户端需要占用大量网络带宽,加重客户端所在节点的资源消耗。应用模式把应用提交到JobManager运行,每个提交的应用单独启动一个JobManager,执行结束后JobManager关闭。
在这里插入图片描述
总结:

  • 应有模式与单作业模式是提交作业后才创建集群
  • 单作业模式是通过客户端来提交,客户端解析出的每一个作业对应一个集群
  • 应用模式直接由JobManager执行应用程序

5、YARN运行模式

5.1、会话模式部署

yarn部署过程:

  1. 客户端把Flink应用提交给yarn的ResourceManager,yarn的ResourceManager向yarn的NodeManager申请容器
  2. Flink部署JobManager和TaskManager到容器上,在启动集群
  3. Flink根据运行在JobManager上的作业所需要的Slot数量动态分配TaskManager资源

配置准备

  1. 修改配置文件

     vim /etc/profile
    
  2. 添加环境变量

    export HADOOP_CLASSPATH=`hadoop classpath`
    
  3. 环境变量生效

     source /etc/profile
    
  • 会话模式部署

启动测试
在这里插入图片描述
提交jar任务
在这里插入图片描述
运行状态
在这里插入图片描述

5.2、应用模式部署(生产环境推荐)

与但作业模式类似,直接执行flink run-application命令即可,先将jar拷贝到flink根文件夹下

使用命令提交作业

 bin/flink run-application -t yarn-application -c FlinkDemo.StreamWordCount ./flinkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar

使用HDFS提交(生产环境推荐)

1、上传flink依赖到HDFS的flink-dist文件夹

hadoop fs -put lib/ /flink-dist
hadoop fs -put lib/ /flink-dist
hadoop fs -put plugins/ /flink-dist

2、上传jar包到HDFS到flink-jars文件夹

hadoop fs -mkdir /flink-jars
hadoop fs -put flinkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar/ /flink-jars

3、运行jar包

bin/flink run-application -t yarn-application -Dyarn.provided.lib.dirs="hdfs://hadoop100:8020/flink-dist" -c  FlinkDemo.StreamWordCount hdfs://hadoop100:8020/flink-jars/flinkDemo-1.0-SNAPSHOT-jar-with-dependencies.jar

历史服务器:查看停止的job的统计信息

6、flink运行时架构

以Standalone会话模式为例
在这里插入图片描述

  • 作业管理器(JobManager)

    JobManager是一个Flink集群任务管理和调度的核心,是控制应用执行的主进程,每个应用都有一JobManager。JobMaster是JobManager中最核心的组件,负责处理单独作业。JobMaster和job一一对应,多个job可运行在同一集群中,每个job有一个对应的JobMaster。

  • 资源管理器

    负责资源的分配和管理。资源主要指TaskManager的任务槽(task slot)。任务槽为flink集群中资源调配单元,包含执行计算的cpu和内存。每个task要分配到一个slot上执行。

  • 分发器

    负责提供rest接口,用来提交应用并且负责为每一个新提交的作业启动一个新的JobMaster,也会启动webUI

  • 任务管理器(TaskManager)

    TaskManager是flink中的工作进程,负责数据流的具体计算。flink集群中必须至少有一个TaskManager,每个TaskManager包含一定数量的task slot。slot为资源调度的最小单位,slot的数量限制了TaskManager能够并行处理的任务数量

7、核心概念

7.1、并行度

当要处理大量数据时,可以把算子操作“复制”多分,每个算子都可执行计算任务。一个任务就拆分成多个子任务,实现了并行计算。flink执行过程中,每个算子包含一个或多个子任务,这些子任务在不同的线程、不同的物理机或不同容器中执行。

一个算子的子任务个数为其并行度。一个流程序并行度是其所有算子中最大的并行度。
在这里插入图片描述
可通过代码设置并行度(优先度最高),默认为电脑CPU的线程数。

算子.setParallelism(并行度)

全局环境设置并行度

env.setParallelism(并行度)

通过web页面/命令行提交时设置并行度(优先度低)

flink的yaml配置文件配置并行度(优先度最低)

优先级:算子指定>env指定>提交时设置>配置文件配置

7.2、算子链
  1. 一对一one-to-one(forwarding)

    这种模式数据流维护着分区以及元素的顺序。如source和map算子,source算子读取数据后直接发给map算子处理,他们之间不需要重新分区和调整顺序,保持着一对一的关系。map、source、flatMap都是这种一对一的关系。

  2. 重分区redistributing

    数据流的分区会发生改变。map和,keyBy/window算子之间,keyBy/window算子之间与sink算子之间都是重分区关系。

  • 合并算子链:将并行度相同的一对一算子操作可以直接链接在一起形成一个子任务,被一个线程执行。
7.3、任务槽

flink每一个taskManager都是一个JVM进程,它可以启动多个独立的线程来并行执行多个子任务。为了控制并发量就需要再taskManager上对每个任务运行所占用的资源做出明确的划分,就是任务槽。

每个任务槽表示taskManager拥有计算资源的固定大小的子集。这些资源用来独立执行一个子任务。

假如一个taskManager有三个slot,就会将管理的内存均分成三份,每个slot独占一份,slot不会去争抢资源。**slot仅用来隔离内存,不隔离CPU。**建议slot数量为cpu核心数,避免争抢cpu资源。

同一作业不同算子的并行子任务可以放到同一slot上执行。
在这里插入图片描述

7.4、任务槽与并行度关系

任务槽是静态概念,是指taskManager具有并发执行能力;并行度是动态概念,程序运行时的实际使用的并发能力。slot的数量是最大并行度。并行度超过slot数量flink不能运行。

使用yarn动态申请资源:申请taskManager数量=并行度/每个taskManager的slot数(向上取整)

8、作业提交流程

8.1、Standalone会话模式

在这里插入图片描述

8.2、yarn应用模式

在这里插入图片描述

9、DataStream API

DataStream API是flink核心层API。一个flink程序就是对DataStream的各种转换。代码一般由几个部分组成:
在这里插入图片描述

9.1、执行环境

1、创建环境

StreamExecutionEnvironment类的对象是所有flink程序的基础。最常用

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2、程序执行

flink是由事件驱动的,只有等数据到来才会触发计算。需要显式调用执行环境的execute()方法来触发程序执行。execute()将一直等待作业完成返回一个执行结果(JobExecutionResult)。

env.execute();
9.2、源算子source
9.2.1、准备工作

数据模型

字段名数据类型说明
idString水位传感器类型
tsLong传感器记录时间戳
vcInteger水位记录

定义类

package bean;

import java.util.Objects;

/**
 * @Title: WaterSensor
 * @Author lizhe
 * @Package bean
 * @Date 2024/5/29 21:06
 * @description: 水位类
 */
public class WaterSensor {
    public String id;
    public Long ts;
    public Integer vc;

    public String getId() {
        return id;
    }

    public WaterSensor setId(String id) {
        this.id = id;
        return this;
    }

    public Long getTs() {
        return ts;
    }

    public WaterSensor setTs(Long ts) {
        this.ts = ts;
        return this;
    }

    public Integer getVc() {
        return vc;
    }

    public WaterSensor setVc(Integer vc) {
        this.vc = vc;
        return this;
    }

    @Override
    public String toString() {
        return "WaterSensor{" +
                "id='" + id + '\'' +
                ", ts=" + ts +
                ", vc=" + vc +
                '}';
    }

    public WaterSensor(String id, Long ts, Integer vc) {
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }
    public WaterSensor() {

    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        WaterSensor that = (WaterSensor) o;
        return Objects.equals(id, that.id) &&
                Objects.equals(ts, that.ts) &&
                Objects.equals(vc, that.vc);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, ts, vc);
    }
}

package source;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * @Title: CollectionDemo
 * @Author lizhe
 * @Package source
 * @Date 2024/5/29 22:06
 * @description:
 */
public class CollectionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //从数组读取数据
        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 22, 34));
        source.print();
        
        //直接读取数据
        DataStreamSource<Integer> source1 = env.fromElements(22, 4456, 66);
        source1.print();

        env.execute();
    }
}

9.2.2、从集合中读数据
package source;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * @Title: CollectionDemo
 * @Author lizhe
 * @Package source
 * @Date 2024/5/29 22:06
 * @description:
 */
public class CollectionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //从数组读取数据
        DataStreamSource<Integer> source = env.fromCollection(Arrays.asList(1, 22, 34));
        source.print();

        //直接读取数据
        DataStreamSource<Integer> source1 = env.fromElements(22, 4456, 66);
        source1.print();

        env.execute();
    }
}

9.2.3、从文件中读数据

导入依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>1.13.0</version>
        </dependency>
package source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Title: FileSourceDemo
 * @Author lizhe
 * @Package source
 * @Date 2024/5/29 22:13
 * @description:
 */
public class FileSourceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        FileSource<String> fileSource = FileSource.forRecordStreamFormat(new TextLineFormat(), new Path("input/words.txt")).build();
        env.fromSource(fileSource, WatermarkStrategy.noWatermarks(),"filesource").print();
        env.execute();
    }
}

9.2.4、从Kafka读数据

官网地址:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

导入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.13.6</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-base</artifactId>
    <version>1.13.6</version>
</dependency>

启动kafka集群
在这里插入图片描述
编写代码

package source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Title: FileSourceDemo
 * @Author lizhe
 * @Package source
 * @Date 2024/5/29 22:13
 * @description:
 */
public class KafkaSourceDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
                .setBootstrapServers("192.168.132.100:9092,192.168.132.101:9092,192.168.132.102:9092")
                .setGroupId("test")
                .setTopics("test")
                .setValueOnlyDeserializer(new SimpleStringSchema())
                .setStartingOffsets(OffsetsInitializer.latest())
                .build();
        env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(),"filesource").print();
        env.execute();
    }
}

9.2.5、flink数据类型

查看TypeInformation(实现序列化)。TypeInformation类是flink中所有类型描述符的基类。涵盖了类型的一些基本属性,并为每个数据类型生成特定的序列化、反序列化、比较器。

  • 基本类型:java基本类型及其包装类,还有void、string、date、bigDecimal、bigInteger
  • 数组类型:基本类型数组和对象数组
  • 复合数据类型:元组类型(Tuple)
  • 辅助类型:List、Map
  • 泛型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2115728.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WireShark过滤器

文章目录 一、WireShark过滤器概念1. 捕获过滤器&#xff08;Capture Filters&#xff09;2. 显示过滤器&#xff08;Display Filters&#xff09;3. 捕获过滤器与显示过滤器的区别4. 过滤器语法结构实际应用场景 二、WireShark捕获数据包列表1. **No.&#xff08;序号&#xf…

vulhub ThinkPHP5 5.0.23远程代码执行漏洞

步骤一&#xff1a;.执行以下命令启动靶场环境并在浏览器访问 cd thinkphp/5.0.23-rcedocker-compose up -ddocker ps 步骤二&#xff1a;访问靶机环境 步骤三&#xff1a;/index.php?scaptcha 步骤四&#xff1a;利用HackBar _method__construct&filter[]system&me…

心理辅导新篇章:Spring Boot学生评估系统

1 绪论 1.1 研究背景 现在大家正处于互联网加的时代&#xff0c;这个时代它就是一个信息内容无比丰富&#xff0c;信息处理与管理变得越加高效的网络化的时代&#xff0c;这个时代让大家的生活不仅变得更加地便利化&#xff0c;也让时间变得更加地宝贵化&#xff0c;因为每天的…

优化边缘设备上的大型语言模型(LLM)--tinychat

文章目录 一、项目启动1.背景&#xff1a;针对不同操作系统架构的4bit权重重排2.初始环境配置下载LLaMA2-7B-chat模型 3.项目启动项目结构说明评估不同优化技术可能遇到的bug以及措施1.macOS上部署 二、各种优化技术实现1.前置条件2.优化----循环展开3.优化----多线程4.优化---…

OpenCV结构分析与形状描述符(6)带统计的连通组件计算函数connectedComponentsWithStats()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 connectedComponentsWithStats 函数计算布尔图像的连通组件标记图像&#xff0c;并为每个标记产生统计信息。 该函数接受一个具有4或8连通性的二…

盘点4款可以免费帮你将语音转换成文字的工具

我们在寻找语音转文字的工具的时候&#xff0c;不能只考虑他是否免费&#xff0c;还需要关注这个工具的转换准确度&#xff0c;减少第二次修改的麻烦&#xff0c;以及它的转换速度&#xff0c;以便可以有效的提高我们工作效率。基于这些&#xff0c;我要给大家推荐几个既可以免…

2024Java基础总结+【Java数据结构】(2)

面向对象07&#xff1a;简单小结类与对象 面向对象08&#xff1a;封装详解 面向对象09&#xff1a;什么是继承 ctrlh看类的关系&#xff0c;所有的类都默认的或间接继承Object 面向对象10&#xff1a;Super详解 super注意点: super调用父类的构造方法&#xff0c;必须在构造方…

白小白为波司登新品创作歌曲《登峰之路》,穿越风雨守护前行者

随着天气渐凉&#xff0c;波司登品牌推出全新新品——轻薄羽绒叠变系列&#xff0c;作为波司登品牌的新品推荐官&#xff0c;歌手白小白为波司登创作并演唱《轻薄羽绒叠变》系列主题曲《登峰之路》。歌曲中&#xff0c;白小白以激昂澎湃&#xff0c;明快有力的旋律以及深情又充…

【Unity小技巧】物体遮挡轮廓描边效果

前言&#xff1a; 效果展示&#xff1a; 遮挡描边 Demo下载 所用插件 QuickOutline描边插件&#xff08;在Demo里&#xff09; 实现步骤 物体挂载Outline组件&#xff0c;做如下处理 Outline Mode&#xff08;描边模式&#xff09;&#xff1a;Outline Hidden(遮挡模式显示…

让中学生也能一下子认识5000年都无人能识的无穷大自然数

黄小宁 5000多年来数学一直未能证明存在>N一切数的标准无穷大自然数及其倒数&#xff0c;从而一直否定存在这类数&#xff0c;正如西医否定人体存在经络系统那样。 x轴各元点的坐标x变为的有序数对 ( x , y2 x)是平面点p的坐标&#xff0c;点p的全体是直线y2x。 x可变成一…

HOT100(八)动态规划

1、爬楼梯 ①动态规划 &#xff08;1&#xff09;时间复杂度 O(n) &#xff0c;空间复杂度 O(n)的做法 开辟一个长度为 n1 的状态数组f&#xff0c;f[i]表示走到第i个台阶的方案数。初始化f[0]1&#xff08;在台阶底部&#xff0c;不需要移动也视为一种方法&#xff09;,f[1…

HNU-2023电路与电子学-实验3

写在前面&#xff1a; 本次实验是完成cpu设计的剩余部分&#xff0c;整体难度比上一次要小&#xff0c;细心完成就能顺利通过全部测评 一、实验目的 1.了解简易模型机的内部结构和工作原理。 2.分析模型机的功能&#xff0c;设计 8 重 3-1 多路复用器。 3.分析模型机的功能…

Oracle再度发起开发人员调查,细节满满

作者&#xff1a;IT邦德 中国DBA联盟(ACDU)成员&#xff0c;10余年DBA工作经验&#xff0c; Oracle、PostgreSQL ACE CSDN博客专家及B站知名UP主&#xff0c;全网粉丝10万 擅长主流Oracle、MySQL、PG、高斯及Greenplum备份恢复&#xff0c; 安装迁移&#xff0c;性能优化、故障…

GO学习笔记(4) strconv/time

目录 strconv包1、string与bool之间的转换2、string与int之间的转换 time包1、常用常量定义2、Now&#xff08;&#xff09;获取当前年月日时分秒3、Format&#xff08;&#xff09;时间格式化4、Parse&#xff08;&#xff09;/ ParseInLocation&#xff08;&#xff09;解析时…

anaconda启动jupyter notebook

1.在Windows搜索框搜索anaconda prompt点击打开 2.然后输入命令jupyter notebook 3.在这个页面编写你的程序

基于Java的宿舍报修管理系统的设计与实现(论文+源码)_kaic

基于Java的宿舍报修管理系统的设计与实现(论文源码)_kaic 摘  要 随着教育改革‎‏的不断‎‏深入&#xff0c;‎‏学校宿‎‏舍的管‎‏理体系‎‏也在不‎‏断地完‎‏善&#xff0c;校园后勤服务是学校管理的重要工作&#xff0c;学校提供优秀的后勤服务&#xff0c;能提…

【Jupyter Notebook】安装与使用

打开Anaconda Navigator点击"Install"(Launch安装前是Install)点击"Launch"点击"File"-"New"-"Notebook"​ 5.点击"Select"选择Python版本 6.输入测试代码并按"Enter+Shift"运行代码: 代码如下: …

C++万字解析类和对象(上)

1.类的定义 class为定义类的关键字&#xff0c;Stack为类的名字&#xff0c;{}中为类的主体&#xff0c;注意类定义结束时后面分号不能省略。类体中内容称为类的成员&#xff1a;类中的变量称为类的属性或成员变量; 类中的函数称为类的方法或者成员函数。 为了区分成员变量&…

Linux(RedHat或CentOS)下如何开启telnet服务

一、Telnet服务介绍 Telnet协议是TCP/IP协议族中的一员&#xff0c;是Internet远程登录服务的标准协议和主要方式。它为用户提供了在本地计算机上完成远程主机工作的能力。在终端使用者的电脑上使用telnet程序&#xff0c;用它连接到服务器。终端使用者可以在telnet程序中输入…

【STM32】串口

1.串口-printf 使用printf函数向串口发送东西 使用微库&#xff0c;用到了printf, 但是我们发现是不能发送的 因为底层printf是fputc,我们需要自己实现 后面FILE*P不用管&#xff0c;在fputc中调用 第一个参数为uart1的句柄 第二个为要输出的字符 第三个为一次要发送几…