Flink基础篇(基础算子+WaterMarker)

news2025/1/10 23:27:34

Flink

  • 高可用
    • HA 依赖于zk
    • Flink ON Yarn
      • 两种模式
        • Session模式
        • Per-Job模式
  • 前置说明
  • Flink原理
    • 数据在两个operator算子之间传递的时候有两种模式:
    • Operator Chain
    • TaskSlot And Sharing
  • Flink执行图(ExecutionGraph)
  • API
    • Source
    • Transformation
    • Sink
    • 控制台
    • 自定义Sink
    • Connectors
  • Flink四大基石
    • Window
    • Time And Watermark
      • Time分类
      • WaterMarker![请添加图片描述](https://img-blog.csdnimg.cn/25b8596e6dd1425f886e0ae6cbd7c97e.png)

高可用

HA 依赖于zk

请添加图片描述

Flink ON Yarn

请添加图片描述
请添加图片描述

两种模式

Session模式

请添加图片描述

Per-Job模式

请添加图片描述

前置说明

请添加图片描述

Flink原理

请添加图片描述
Task 属于进程 TaskSlot属于线程请添加图片描述

  1. DataFlow:Flink程序在执行的时候会被映射成一个数据流程模型
  2. Operator:数据流模型中得每一个操作都被称作为Operator,Operator分为:Source/Transfrom/Sink
  3. Partition:数据流模型是分布式的和并行的,执行中会形成1-n个分区
  4. SubTask:多个分区任务可以并行,每个都是运行在一个线程中的,也就是一个SubTask子任务
  5. Parallelism:并行度,就是可以同时执行的子任务数/分区数

数据在两个operator算子之间传递的时候有两种模式:

  1. One to One模式:
    两个Operatior用此模式传递的时候,会保持数据的分区数和数据的排序;如上图中的Source1到Map1,他就保留的Source的分区特性,以及分区元素处理的有序性
  2. Redistributing模式
    这种模式会改变数据的分区数;每个operator subtask会根据选择transformation把数据发送到不同的目标subtaks,比如keyBy()会通过hashcode重新分区,broadcast()和rebalance()方法会随机重新分区

Operator Chain

请添加图片描述
客户端在提交任务的时候对Operator进行优化操作,能进行合并的Operator会被合并为一个Operator,合并后后的Operator称为Operator Chain,实际上就是一个执行链,
每个执行链会在TaskManager上独立的线程中执行–就是SubTask

TaskSlot And Sharing

请添加图片描述
每个TaskManager是一个JVM的进程,为了控制一个TaskManager(worker)能接收多少个task,Flink通过TaskSlot来控制。TaskSloat数量是用来限制一个TaskManager工作进程中卡可以同时运行多少个工作线程,TaskSlot是一个TaskManager中的最小资源单位,一个TaskManager中有多少个TaskSlot就意味着能支持多少个并发Task处理

Flink将进程的内存进行
了划分到多个slot之后可以获得如下好处:

  • TaskManager最多能同时并发执行的子任务数是可以通过TaskSlot数量控制的
  • TaskSolt有独占的内存空间,这样在一个TaskManager中可以运行多个不同的作业,作业之间不受影响

个人理解:分布式线程池,可复用
请添加图片描述

Flink执行图(ExecutionGraph)

请添加图片描述

API

Source

基于集合

  1. env.fromElements(可变参数)
  2. env.fromCollection(各种集合)
  3. env.generateSequence(开始,结束)
  4. env.fromSequence(开始,结束)

基于文件
env.readTextFile(本地/hdsf/文件夹)
基于Socket
env.socketTextStream(host,port)

自定义
Flink还提供了数据源接口,实现自定义数据源:

  1. SourceFunction:非并行度数据源(并行度只能 = 1)
  2. RichSourceFunction:多功能并行数据源(并行度只能 = 1)
  3. ParallelSourceFunction:并行数据源(并行度能够 >= 1)
  4. RichParallelSourceFunction: 多功能并
    行数据源(并行度能够>=1)–>Kafka数据源使用欧冠的就是该接口

Transformation

  1. map/flatMap/keyBy/filter/reduce
  2. union和connect
    请添加图片描述
    union:只能合并同类型
    connect:可以合并不同类型,之后需要做其他处理,不能输出
  3. Side Outputs 侧流
  4. rebalance 重平衡分区请添加图片描述
  5. 其他分区
    请添加图片描述

Sink

控制台

  1. ds.print() 直接输出控制台
  2. ds.printToErr() 直接输出控制台红色
  3. ds.writeAsText(“本地/hdsf”,WriteMode.OVERWRITE)
    注意输出到path的时候可以在前面设置并行度,如果
    并行度>1,则path为目录
    并行度=1,则path为文件夹

自定义Sink

类似于Source ,4个Sink
  1. SinkFunction:非并行度数据源(并行度只能 = 1)
  2. RichSinkFunction:多功能并行数据源(并行度只能 = 1)
  3. ParallelSinkFunction:并行数据源(并行度能够 >= 1)
  4. RichParallelSinkFunction: 多功能并

Connectors

JDBCSink,KafkaSink,RedisSink

Flink四大基石

请添加图片描述

Window

window分类
请添加图片描述
总结

  1. 基于时间的滚动窗口 tumbling-time-window
  2. 基于时间的滑动窗口 sliding-time-window
  3. 基于数量的滚动窗口 tumbling-count-window
  4. 基于数量的滑动窗口 sliding-count-window
    flink还支持一个特殊的窗口:session会话窗口,需要设置一个会话超时时间,如30s,则表示30s内没有数据到来,则触发上一个窗口计算

使用keyby的流,应该使用window方法
未使用keyby的流,应该使用windowAll方法

请添加图片描述

请添加图片描述
基于时间

public class CarDemo {
    /**
     * 需求
     *  nc -lk 9999
     *  有如下数据表示
     *      信号灯编号和通过该信号灯的车的数量
9,3
9,2
2,3
9,2
3,3
3,2
4,3
9,2
        要求1:
            每隔5秒钟统计一次,最近5秒内,各个路口通过红路灯汽车的数量--基于时间的滚动窗口
        要求2:
            每隔5秒钟统计一次,最近10秒内,各个路口通过红路灯汽车的数量--基于时间的滑动窗口
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = env.socketTextStream("hadoop102", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = stream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] split = s.split(",");
                return Tuple2.of(split[0], Integer.valueOf(split[1]));

            }
        });
        KeyedStream<Tuple2<String, Integer>, String> tStream = mapStream.keyBy(t -> t.f0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> tumblingStream = tStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);

        SingleOutputStreamOperator<Tuple2<String, Integer>> slidingStream = tStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .sum(1);

        tumblingStream.print("tumbling");
        slidingStream.printToErr("sliding");
        env.execute();

    }
}

基于数量

public class CarDemo {
    /**
     * 需求
     *  nc -lk 9999
     *  有如下数据表示
     *      信号灯编号和通过该信号灯的车的数量
9,3
9,2
2,3
9,2
9,2
9,2
        要求1:
            统计在最近的5条消息中,各自路口通过的机车数量,相同的key每出现5次进行统计--基于数量的滚动窗口
        要求2:
            统计在最近的5条消息中,各自路口通过的机车数量,相同的key每出现3次进行统计--基于时间的滑动窗口
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = env.socketTextStream("hadoop102", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = stream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] split = s.split(",");
                return Tuple2.of(split[0], Integer.valueOf(split[1]));

            }
        });
        KeyedStream<Tuple2<String, Integer>, String> tStream = mapStream.keyBy(t -> t.f0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> tumblingStream = tStream.countWindow(5).sum(1);
        SingleOutputStreamOperator<Tuple2<String, Integer>> slidingStream = tStream.countWindow(5, 3).sum(1);

        tumblingStream.print("tumbling");
        slidingStream.printToErr("sliding");
        env.execute();

    }
}

会话窗口

public class CarDemo {
    /**
     * 需求
     *  nc -lk 9999
     *  有如下数据表示
     *      信号灯编号和通过该信号灯的车的数量
9,3
9,2
2,3
9,2
9,2
9,2
        要求1:
           设置会话超时时间10s,10s内没有数据到来,则触发窗口计算
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = env.socketTextStream("hadoop102", 9999);

        SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = stream.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String s) throws Exception {
                String[] split = s.split(",");
                return Tuple2.of(split[0], Integer.valueOf(split[1]));

            }
        });
        KeyedStream<Tuple2<String, Integer>, String> tStream = mapStream.keyBy(t -> t.f0);

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = tStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum(1);

        sum.printToErr();
        env.execute();

    }
}

Time And Watermark

Time分类

请添加图片描述

请添加图片描述
请添加图片描述
请添加图片描述
请添加图片描述
请添加图片描述

WaterMarker请添加图片描述

请添加图片描述
请添加图片描述
请添加图片描述
请添加图片描述
WaterMarker重点请添加图片描述
案例

public class WTOrderDemo {
    public static void main(String[] args) throws Exception {

        /**
         * 需求
         *  有订单数据,格式: ( 订单Id,用户Id,时间戳/事件时间 ,订单金额)
         *  要求每隔5s,计算5s内,每个用户的订单总金额
         *
         *  并添加WaterMaker来解决一定程度的 数据延迟以及数据乱序 问题
         */

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Order> orderDs = env.addSource(new SourceFunction<Order>() {
            private Boolean flag = true;

            @Override
            public void run(SourceContext<Order> ct) throws Exception {
                Random random = new Random();
                while (flag) {
                    String orderId = UUID.randomUUID().toString();
                    int userId = random.nextInt(2);
                    int money = random.nextInt(101);
                    long eventTime = System.currentTimeMillis() - random.nextInt(5) * 1000;
                    Order order = new Order(orderId, userId, money, eventTime);
                    ct.collect(order);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                flag = false;
            }

        });

        // TODO transformation
        /**
         * Flink 1.12 版本
         * 每隔5s计算最近的数据球每个用户的订单总金额,要求:基于时间时间计算+WaterMarker
         *
         * env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 在新版本中默认就是EventTime
         * 设置 WaterMarker = 当前最大事件时间 - 最大允许的延时 或 乱序时间
         */
        SingleOutputStreamOperator<Order> orderDsWithWatermark = orderDs.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 指定延时时间或乱序时间
                        .withTimestampAssigner(((order, timestamp) -> order.eventTime))
        );
        SingleOutputStreamOperator<Order> result = orderDsWithWatermark.keyBy(order -> order.userId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("money");

        result.print();
        env.execute();

    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order{
        public String orderId;
        public Integer userId;
        public Integer money;
        public Long eventTime;


    }
}

测到输出机制(解决严重迟到问题)
请添加图片描述

public class WTOrderDemo {
    public static void main(String[] args) throws Exception {

        /**
         * 需求
         *  有订单数据,格式: ( 订单Id,用户Id,时间戳/事件时间 ,订单金额)
         *  要求每隔5s,计算5s内,每个用户的订单总金额
         *
         *  并添加WaterMaker来解决一定程度的 数据延迟以及数据乱序 问题
         *
         *  并使用outputTag + allowedLateness 来解决数据丢失问题(严重的 数据延迟以及数据乱序 问题)
         */

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Order> orderDs = env.addSource(new SourceFunction<Order>() {
            private Boolean flag = true;

            @Override
            public void run(SourceContext<Order> ct) throws Exception {
                Random random = new Random();
                while (flag) {
                    String orderId = UUID.randomUUID().toString();
                    int userId = random.nextInt(2);
                    int money = random.nextInt(101);
                    long eventTime = System.currentTimeMillis() - random.nextInt(20) * 1000;
                    Order order = new Order(orderId, userId, money, eventTime);
                    ct.collect(order);
                    Thread.sleep(1000);
                }
            }

            @Override
            public void cancel() {
                flag = false;
            }

        });

        // TODO transformation
        /**
         * Flink 1.12 版本
         * 每隔5s计算最近的数据球每个用户的订单总金额,要求:基于时间时间计算+WaterMarker
         *
         * env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);// 在新版本中默认就是EventTime
         * 设置 WaterMarker = 当前最大事件时间 - 最大允许的延时 或 乱序时间
         */

        // TODO 侧道输出机制,解决严重的迟到问题
        OutputTag<Order> orderOutputTag = new OutputTag<>("seriousLate", TypeInformation.of(Order.class));
        SingleOutputStreamOperator<Order> orderDsWithWatermark = orderDs.assignTimestampsAndWatermarks(
                WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 指定延时时间或乱序时间
                        .withTimestampAssigner(((order, timestamp) -> order.eventTime))
        );
        SingleOutputStreamOperator<Order> result1 = orderDsWithWatermark.keyBy(order -> order.userId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .allowedLateness(Time.seconds(3))
                .sideOutputLateData(orderOutputTag)
                .sum("money");

        DataStream<Order> result2 = result1.getSideOutput(orderOutputTag);
        result1.print("正常的数据");
        result2.printToErr("迟到的数据");
        env.execute();

    }
    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class Order{
        public String orderId;
        public Integer userId;
        public Integer money;
        public Long eventTime;


    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/25199.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【图像识别-车牌识别】基于BP神经网络求解车牌识别问题含GUI界面和报告

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

11个销售心理学方法,帮你搞定客户!

销售心理学中&#xff0c;站在客户的角度&#xff0c;客户都会有以下几个疑问&#xff1a; 1、你是谁&#xff1f; 2、你要跟我介绍什么&#xff1f; 3、你介绍的产品和服务对我有什么好处&#xff1f; 4、如何证明你介绍的是真实的&#xff1f; 5、为什么我要跟你买&…

Linux学习之expect操作详解

一、expect安装介绍 1.expect命令安装 安装语句:yum install expect 2.expect命令含义 expect是一种脚本语言&#xff0c;它能够代替人工实现与终端的交互&#xff0c;主要应用于执行命令和程序时&#xff0c;系统以交互形式要求输入指定字符串&#xff0c;实现交互通信。 …

疟原虫蛋白复合物疫苗科研

疟疾是一种蚊媒疾病&#xff0c;感染者通常会出现发烧、发冷和流感样疾病。如果不及时治疗&#xff0c;严重者甚至会危及生命。世卫组织新近发布的数据表明&#xff0c;2019 年全球估计发生 2.29 亿疟疾病例&#xff0c;死于该病的人数超过 40 万例。 图 1. 2000 年有病例的国…

Flutter 应用程序中的 Quick Actions

Flutter 应用程序中的 Quick Actions 原文 https://medium.com/vijay-r/quick-actions-in-flutter-app-75b63acc420b 前言 在这篇文章中&#xff0c;我们将讨论如何添加 Quick Actions 在我们的 Flutter 应用程序&#xff0c;使我们的应用程序更加友好的用户。 正文 插件 quick…

LVM逻辑卷

要求&#xff1a;在系统下做LVM逻辑卷2G&#xff0c;并将LVM进行扩容到5G 操作环境&#xff1a;7.8.2003 [rootlocalhost ~]# lsblk #列出所有可用块设备信息 我们使用vdb和vdc两块硬盘做lvm 先将一个盘进行分区&#xff08;/dev/vdb&#xff09; [r…

Leetcode808. 分汤

文章目录题目链接题目大意解题思路代码(C)动态规划记忆化搜索题目链接 点我(^_^) 题目大意 注意不是两个概率加和除以2 解题思路 考虑动态规划&#xff0c;因为汤的分配都是以 25 的倍数进行分配的&#xff0c;所以这里把 25 ml 的汤看作一份&#xff0c;总的份数⌈汤的体…

A-Level经济例题解析及练习Analysis of Tax

今日知识点&#xff1a;Analysis of Tax 例题A. Compute consumer surplus, producer surplus, and total surplus without a tax. B. If $100 tax per ticket, compute consumer surplus, producer surplus, tax revenue, total surplus, and deadweight loss.解析下面我们为大…

搭载北京君正X2000主控芯片的成功案例

汉王e典笔S20 Plus搭载北京君正研发的X2000多核异构跨界处理器。X2000多核异构跨界处理器主要面向于智能音频、图像识别、智能家电、智能家居、智能办公等五大领域。CPU采取三核结构&#xff0c;搭载双XBurst2&#xff0c;主频1.2GHz&#xff0c;跨界第三核XBurst0(240MHz)&…

Linux--进程概念

前言&#xff1a; 在学习操作系统的过程中&#xff0c;我们常常能听到进程这一概念以及相关的一些知识。例如什么是父进程&#xff0c;什么是子进程&#xff0c;如何创建子进程&#xff0c;如何杀死进程等。这些知识后面会一一介绍&#xff0c;在迈入学习进程的第一步我只需要知…

[附源码]java毕业设计校园二手交易平台的设计

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

养殖废水总氮超标的解决方法

养殖废水除总氮。入水是100mg/L&#xff0c;处理水量大概在3T/H&#xff0c;要求养殖废水总氮小于5PPM。 生化前置过滤树脂的工艺&#xff0c;通过阳树脂中含有的阳离子会与水中的游离氨&#xff08;NH3&#xff09;和铵离子&#xff08;NH4&#xff09;进行交换&#xff0c;将…

2.15 这样的小红书图片内容,最容易“踩雷”!【玩赚小红书】

1、笔记中所有图片只展示一件单品 这类内容有可能会被系统判定为广告&#xff0c;或被用户怀疑为软广而举报。据介绍&#xff0c;小红书平台上的用户对软广的接受度较低&#xff0c;这类内容时常因为被举报而限流。 小红书引入“生态官”维护社区内容后&#xff0c;生态官也会…

三层vlan实验

目录 实验拓扑 实验需求 实验解法 5&#xff0c;按照图示配置 IP 地址&#xff0c;其中只有SW1需要配置三层vlan地址&#xff0c;电脑设备为DHCP获取地址 6&#xff0c;在sw1配置DHCP动态地址池塘&#xff0c;DNS为114.114.114.114 7&#xff0c;各台电脑通过交换机划分不…

你的系统如何支撑高并发?大佬手写高并发架构设计笔记帮你圆满回答!

开篇&#xff0c;我们聊聊大量同学问我的一个问题&#xff0c;面试的时候被问到一个让人特别手足无措的问题&#xff1a;你的系统如何支撑高并发&#xff1f; 大多数同学被问到这个问题压根儿没什么思路去回答&#xff0c;不知道从什么地方说起&#xff0c;其实本质就是没经历…

m虚拟MIMO系统的配对调度算法的matlab仿真,对比Random配对,Orthogonal配对以及Deteminant配对

目录 1.算法概述 2.仿真效果预览 3.MATLAB部分代码预览 4.完整MATLAB程序 1.算法概述 利用多输入多输出&#xff08;MIMO&#xff0c;Multiple InputMultiple Output&#xff09;技术通过空间复用能够显著的提高通信系统的容量&#xff0c;并可以很好的缓解时/频资源日益紧…

BCN衍生物:endo-BCN-PEG4-TAMRA/Palmitic/DSPE

凯新生物公司小编分享&#xff1a;endo-BCN-PEG4-TAMRA &#xff0c;endo-BCN-PEG4-Palmitic&#xff0c;endo-BCN-PEG4-DSPE这几种的物理相关数据。 1、endo-BCN-PEG4-TAMRA 四甲基罗丹明&#xff08;TAMRA&#xff09;-叠氮化物是一种化学探针&#xff0c;用于直接在活细胞中…

食品连锁企业怎样结合快解析打造智能安全管理系统

食品质量安全是食品类企业的生命线&#xff0c;对产品质量的安全管控必须十分严苛。A企业是全国知名的一家大型食品连锁企业&#xff0c;为了做好质量监控&#xff0c;A企业研发了一套智能安全管理预警系统&#xff0c;可以从多维度对生产一线的违规事件进行预警记录&#xff0…

MCE | 细胞实验——多溶剂集合

■ 不到不得已&#xff0c;谁也不想现用现配细胞实验的工作浓度通常在 μM 级别&#xff0c;有的还是 nM 和 pM&#xff0c;要做到现用现配&#xff0c;难度系数有点高&#xff0c;好比让一个日均不到 1000 步的人去爬 5000 米的山峰&#xff0c;这难度&#xff0c;你细品&…

Bot代码的执行(微服务)

负责接收一段代码&#xff0c;把代码扔到我们的队列当中、每一次我们去运行一段代码 运行结束之后、把我们的结果返回给我们的服务器 先把依赖复制过来、我们需要动态的把用户传过来的Java代码编译然后执行 需要加入依赖joor-java-8、用Java的代码的写法举例子 未来自己实现的时…