Flink数据流

news2024/11/24 20:33:00

文章目录

    • 一.基本概念
    • 二.Flink和Spark
    • 三. Flink配置文件
    • 四. yarn部署flink
      • 4.1 session-cluster模式
      • 4.2 pre-job-cluster模式
    • 五.Flink运行时架构
      • 5.1 任务提交流程
      • 5.2 如何实现并行计算
      • 5.3 并行任务需要占用多少slot
      • 5.4 一个流处理包含多少任务


一.基本概念

官网介绍

Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计为在所有常见的集群环境中运行,以内存中的速度和任何规模执行计算。

1.无限流有一个开始,但没有定义的结束。它们不会在生成数据时终止并提供数据。必须连续处理无限流,即事件必须在摄取后立即处理。不可能等待所有输入数据到达,因为输入是无限的,并且在任何时间点都不会完成。处理无界数据通常需要按特定顺序(例如事件发生的顺序)引入事件,以便能够推断结果完整性。(即实时数据)

2.有界流具有定义的开始和结束。可以通过在执行任何计算之前引入所有数据来处理有界流。处理有界流不需要有序引入,因为始终可以对有界数据集进行排序。有界流的处理也称为批处理。(即存储的数据)

有状态流处-flink处理流程
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

较为合适的应用场景
在这里插入图片描述
传统事务处理
在这里插入图片描述


二.Flink和Spark

  • 概念区别

    • Spark强劲的分布式大数据处理框架.它使用内存中缓存和优化的查询执行方式,可针对任何规模的数据进行快速分析查询,支持跨多个工作负载重用代码—批处理、交互式查询、实时分析、机器学习和图形处理等。Spark底层基于批处理.(流是批处理不可切分的特殊情况)
    • Flink基于流(批处理是一种有界流)
  • 数据模型

    • spark采用RDD模型,spark streaming 的 DStream 实际上也就是一组组小批数据RDD的集合
    • flink基本数据模型是数据流,以及事件(Event)序列
  • 运行时架构

    • spark是批计算,将DAG划分为不同的 stage,一个完成后才可以计算下一个
    • flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理

三. Flink配置文件

在这里插入图片描述
jobmanager.sh 资源调度,工作分配脚本
taskmanager.sh 工作任务执行脚本
flink 启动集群后,命令执行器

四. yarn部署flink

4.1 session-cluster模式

在这里插入图片描述

# 启动hadhoop集群

# -n(--container) taskManager的数量 不建议指定.动态分配
# -s(--slot) 每个taskManager的slot数量,默认一个slot一个core.默认每个taskmanager的slot个数为1
# -jm: jobManager的内存 mb.
# -tm: 每个taskManager的内存 mb
# -nm: yarn的appName 
./yarn-session.sh -s 2 -jm 1024 -tm 1024 -nm test -d

# 提交job
./flink run -c com.vector.wc.StreamWordCount
FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host localhost -port 7777

./flink list -a

# 取消yarn-session
yarn application --kill application_12451231_0001

4.2 pre-job-cluster模式

在这里插入图片描述

1)启动hadoop集群(略)
2)不启动yarn-session ,直接执行job

./flink run -m yarn-cluster -c com.vector.wc.StreamWordCount
FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host localhost -port 7777

五.Flink运行时架构

flink运行时组件: jobManager,TaskManager,ResourceManager,Dispacher

JobManager控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager所控制执行。

  • JobManager 会先接收到要执行的应用程序,这个应用程序会包括: 作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。
  • JobManager 会把JobGraph转换成一个物理层面的数据流图,这个图被叫做"“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。
  • JobManager 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器( TaskManager)上的插槽((slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的TaskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调

TaskManager

  • Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。
  • 启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
  • 在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。

ResourceManager

  • 主要负责管理任务管理器(TaskManager)的插槽(slot) ,TaskManger插槽是Flink中定义的处理资源单元。
  • Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN.Mesos、K8s,以及standalone部署。
  • 当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。

Dispacher

  • 可以跨作业运行,它为应用提交提供了REST接口。
  • 当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。
  • Dispatcher也会启动一个Web Ul,用来方便地展示和监控作业执行的信息。
  • Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。

5.1 任务提交流程

在这里插入图片描述

5.2 如何实现并行计算

并行度 可以在代码中指定,提交job指定,也可以在集群配置给默认的并行度.
优先级:代码>提交job>集群配置的并行度

  • 一个特定算子的子任务 (subtask)的个数被称之为其并行度(parallelism) 。一般情况下,一个stream 的并行度,可以认为就是其所有算子中最大的并行度。

slots
推荐按照cpu核心数设置slot

  • Flink 中每一个TaskManager都是一个JVM进程,它可能会在独立的线程上执行一个或多个子任务
  • 为了控制一个TaskManager能接收多少个task,taskManager通过task slot来进行控制(一个TaskManager至少有一个slot)
  • 默认情况下,Flink允许子任务共享slot,即使它们是不同任务的子任务。这样的结果是,一个slot可以保存作业的整个管道。
  • Task Slot是静态的概念,是指TaskManager具有的并发执行能力

至少需要的slot数 = SUM(MAX(同一个共享组的任务数,同一个共享组的任务数的最大并行度))

情况1
在这里插入图片描述
情况2
在这里插入图片描述

.setParallelism(4).slotSharingGroup("01"); 设置并行度和共享组 显示设置共享组可以指定不同的slot并行执行.如果有地方没配,则和前一个处于同一个共享组.如果为首部.则为defalut共享组

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(8);
        // 从文件中读取数据 有界流
//        String inputPath = System.getProperty("user.dir") + "/src/main/resources/text.txt";
//        FileSource<String> source = FileSource
//                .forRecordStreamFormat(
//                        new TextLineInputFormat("UTF-8"),
//                        new Path(inputPath))
//                        .build();
//        DataStream<String> inputDataStream =
//                env.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "text");
        
//        ParameterTool parameterTool = ParameterTool.fromArgs(args);
//        String host = parameterTool.get("host");
//        int port = parameterTool.getInt("port");
        // 从socket文本流读取数据 nc -lk 7777 无界流
        DataStreamSource<String> inputDataStream =
                env.socketTextStream("localhost", 7777);
        // 基于数据流进行转换计算
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultSet =
                inputDataStream.flatMap(new WordCount.MyFlatMapper())
                .slotSharingGroup("02")
                .keyBy(KeySelector -> KeySelector.f0)
                .sum(1).setParallelism(4).slotSharingGroup("01");
        resultSet.print();
        // 执行任务
        env.execute();
    }

    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            // 按句号分词
            String[] words = s.split("");
            // 遍历所有word,包成二元组输出
            for (String word : words) {
                collector.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

在这里插入图片描述
在这里插入图片描述

5.3 并行任务需要占用多少slot

5.4 一个流处理包含多少任务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/765290.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

css 禁止多次点击导致的选中了目标div的文字

像下面这样的情况&#xff0c;就可以用这种方法避免掉 禁止多次点击&#xff0c;导致的&#xff0c;选中了目标div的文字 或者 禁止多次点击&#xff0c;导致&#xff0c;html结构被选中显示出来 .targetDiv {-webkit-user-select: none;-moz-user-select: none;-ms-user-sel…

Vue3卡片(Card)

可自定义设置以下属性&#xff1a; 卡片宽度&#xff08;width&#xff09;&#xff0c;类型&#xff1a;number | string&#xff0c;默认 ‘auto’是否有边框&#xff08;bordered&#xff09;&#xff0c;类型&#xff1a;boolean&#xff0c;默认 true卡片右上角的操作区域…

所有语言数据类型大汇总(持续更新)

一 c语言 参考 C语言-整数&#xff1a;short、int、long、long long&#xff08;signed和unsigned&#xff09;、原码、反码、补码_c语言signed是什么类型_Talent Q的博客-CSDN博客https://blog.csdn.net/qq_43177371/article/details/105703234 二 system verilog

服务器数据恢复-网站服务器宕机无法重启的数据恢复案例

服务器数据恢复环境&#xff1a; 一台linux操作系统网站服务器&#xff0c;该服务器上部署了几十个网站&#xff0c;服务器上只有一块SATA硬盘。 服务器故障&分析&#xff1a; 服务器正常运行中突然宕机&#xff0c;管理员尝试多次重新启动服务器失败&#xff0c;将服务器…

软件测试之测试用例设计方法

目录 1.基于需求设计测试用例 2.具体的测试用例设计方法 1.等价类 2.边界值法 3.判定表 1.基于需求设计测试用例 需求文档->梳理分析需求&#xff08;掌握需求&#xff09;->针对文档设计测试用例 在分析测试需求时&#xff0c;一般分为功能测试需求和非功能测试…

职工管理系统

woker.h #pragma once #include<iostream> #include<string> using namespace std; class worker { public://显示岗位信息virtual void showInfo() 0;//获取岗位名称virtual string getDeptName() 0;int m_Id;//职工编号string m_Name;//职工姓名int m_DeptId;…

大学生用一周时间给麦当劳做了个App(uni-app版)

背景 有个大学生粉丝最近私信联系我&#xff0c;说基于我之前开源的多语言项目做了个仿麦当劳的项目&#xff0c;虽然只是个样子货&#xff0c;但是收获颇多&#xff0c;希望把自己写的代码开源出来供大家一起学习进度。这个小伙伴确实是非常积极上进&#xff0c;很多大学生&a…

ssh 连接出现错误: kex_exchange_identification: Connection closed by remote host

错误如下表示&#xff1a; windstormLocalHost-Server ~> ssh webase-front192.168.122.22 Couldnt get a file descriptor referring to the console. fish: Unknown command: nc fish: exec nc -X connect -x 127.0.0.1:15732 192.168.122.22 22 ^^ kex_exchange_id…

个人博客系统(二)

该博客系统共有八个页面,即注册页面、登录页面、添加文章页面、修改文章页面、我的博客列表页面、主页、查看文章详情页面、个人中心页面。 1 注册页面 该页面如图所示: 首先,要先判断注册的用户名、密码、确认密码以及验证码是否为空,若有一个为空,点击提交,则会提醒 …

代码随想录二刷day56 | 动态规划之 583. 两个字符串的删除操作 72. 编辑距离

day56 583. 两个字符串的删除操作1.确定dp数组&#xff08;dp table&#xff09;以及下标的含义2.确定递推公式3.dp数组如何初始化4.确定遍历顺序5.举例推导dp数组 72. 编辑距离1. 确定dp数组&#xff08;dp table&#xff09;以及下标的含义2. 确定递推公式3. dp数组如何初始化…

信号采样基本概念 —— 4. 移动平均滤波(Moving Average Filtering)

对于信号的滤波算法中&#xff0c;除了FFT和小波&#xff08;wavelet&#xff09;以外&#xff0c;还有其他一些常见的滤波算法可以对信号denoising。接下来的几个章节里&#xff0c;将逐一介绍这些滤波算法。而今天首先要介绍的就是&#xff0c;移动平均滤波&#xff08;Movin…

android studio 离线打包配置push模块

1.依赖引入 SDK\libs aps-release.aar, aps-unipush-release.aar, gtc.aar, gtsdk-3.2.11.0.aar, 从android studio的sdk中找到对应的包放到HBuilder-Integrate-AS\simpleDemo\libs下面 2.打开build.gradle&#xff0c;在defaultConfig添加manifestPlaceholders节点&#xff0c…

浅谈vue3与vue2的区别

vue3已经出来有一段时间了&#xff0c;相信很多公司项目都已经在用vue3重构项目&#xff0c;或者在新项目中直接用vue3搭建&#xff0c;那么我们学习vue3的必要性就有了。 v2 与 v3 的区别 v3 采用的是 monorepo 方式进行管理&#xff0c;将模块拆分到 package 目录中v3 采用…

用 PerfView 洞察.NET程序非托管句柄泄露

一&#xff1a;背景 1. 讲故事 前几天写了一篇 如何洞察 .NET程序 非托管句柄泄露 的文章&#xff0c;文中使用 WinDbg 的 !htrace 命令实现了句柄泄露的洞察&#xff0c;在文末我也说了&#xff0c;WinDbg 是以侵入式的方式解决了这个问题&#xff0c;在生产环境中大多数情况…

C++ cin

cin 内容来自《C Primer》 cin使用>>运算符从输入流中抽取字符 int carrots;cin >> carrots;如下的例子&#xff0c;用户输入的字符串有空格 #include <iostream>int main() {using namespace std;const int ArSize 20;char name[ArSize]; //用户名char …

HIVE SQL实现通过两字段不分前后顺序去重

--数据建表 drop table if exists db.tb_name; create table if not exists db.tb_name ( suj1 string,suj2 string ) ;insert overwrite table db.tb_name values ("语文","数学") ,("语文","英语") ,("数学","语文&…

[禁止登录]登录失败,建议升级最新版本后重试,或通过问题反馈与我们联系。(错误码:45)

token失效:[禁止登录]登录失败&#xff0c;建议升级最新版本后重试&#xff0c;或通过问题反馈与我们联系。(错误码:45。 [禁止登录]登录失败&#xff0c;建议升级最新版本后重试&#xff0c;或通过问题反馈与我们联系。 使用go-cqhttp开发QQ机器人的时候遇到的问题&#xff0c…

小白入门深度学习 | 6-5:Inception-v1(2014年)详解

1. 理论知识 GoogLeNet首次出现在2014年ILSVRC 比赛中获得冠军。这次的版本通常称其为Inception V1。Inception V1有22层深,参数量为5M。同一时期的VGGNet性能和Inception V1差不多,但是参数量也是远大于Inception V1。 Inception Module是Inception V1的核心组成单元,提出…

市面上的充电桩分类以及系统分析

摘要&#xff1a;智能用电小区是国家电网为了研究智能电网智能用电的先进技术如何运用于居民区&#xff0c;提高人民的生活水平&#xff0c;提高电网智能化水平以及提升用电服务质量而进行的一项尝试。电动汽车作为智能用电小区建设的一个组成部分同样也逐渐被纳入发展规划&…

聊聊传统监控与云原生监控的区别

传统监控的本质就是收集、分析和使用信息来观察一段时间内监控对象的运行进度&#xff0c;并且进行相应的决策管理的过程&#xff0c;监控侧重于观察特定指标。 但是随着云原生时代的到来&#xff0c;我们对监控提出了更多的要求&#xff1a; 通过监控了解数据趋势&#xff0c…