Flink高手之路6-Flink四大基石

news2024/10/6 12:23:25

文章目录

  • Flink四大基石
    • 一、Flink的四大基石
      • 1. Checkpoint
      • 2. State
      • 3. Time
      • 4. Window
    • 二、案例
      • 1.需求
      • 2.代码实现
      • 3.运行,查看结果
      • 4.增加需求2的实现
      • 5.重启程序,查看结果

Flink四大基石

一、Flink的四大基石

Flink之所以能这么流行,离不开它最重要的四个基石:Checkpoint、State、Time、Window。

image-20230420181709188

1. Checkpoint

这是Flink最重要的一个特性。

Flink基于Chandy-Lamport算法实现了一个分布式的一致性的快照,从而提供了一致性的语义。

Chandy-Lamport算法实际上在1985年的时候已经被提出来,但并没有被很广泛的应用,而Flink则把这个算法发扬光大了。

Spark最近在实现Continue streaming,Continue streaming的目的是为了降低处理的延时,其也需要提供这种一致性的语义,最终也采用了Chandy-Lamport这个算法,说明Chandy-Lamport算法在业界得到了一定的肯定。

https://zhuanlan.zhihu.com/p/53482103

2. State

提供了一致性的语义之后,Flink为了让用户在编程时能够更轻松、更容易地去管理状态,还提供了一套非常简单明了的State API,包括ValueState、ListState、MapState,BroadcastState。

3. Time

除此之外,Flink还实现了Watermark的机制,能够支持基于事件的时间的处理,能够容忍迟到/乱序的数据。

4. Window

另外流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算。Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口。

二、案例

官网API:https://nightlies.apache.org/flink/flink-docs-release-1.12/learn-flink/event_driven.html#example

在这里插入图片描述

1.需求

基于时间的滚动和滑动窗口

nc -lk 9999

有如下的数据表示:

信号灯的编号和通过该信号灯的车的数量

9,3

9,2

9,7

4,9

2,6

1,5

2,3

5,7

5,5

需求1:每10秒钟统计一次,最近10秒内,各个路口通过红绿灯的汽车的数量,也就是基于时间的滚动窗口

需求2:每10秒钟统计一次,最近20秒内,各个路口通过红绿灯的汽车的数量,也就是基于时间的滑动窗口

2.代码实现

package cn.edu.hgu.bigdata20.flink.window;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * description:flink的Window演示
 * author 王
 * date 2023/04/20
 */
public class FlinkWindowDemo {
    public static void main(String[] args) throws Exception {
        // 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.source
        DataStream<String> socketDS = env.socketTextStream("hadoop001", 9999);

        // 3. Transformation
        //将9,3转为CartInfo(9,3)
        SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });

        // 需求:设置会话超时时间为10s,10s内没有数据到来,则触发上个窗口的计算(前提是上一个窗口有数据)
        SingleOutputStreamOperator<CartInfo> result = cartInfoDS.keyBy(CartInfo::getSensorId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .sum("count");

        //4.Sink
        result.print();

        //5.execute
        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}


3.运行,查看结果

输入数据:

9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,5

在这里插入图片描述

4.增加需求2的实现

package cn.edu.hgu.bigdata20.flink.window;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * description:flink的Window演示
 * author 王
 * date 2023/04/20
 */
public class FlinkWindowDemo {
    public static void main(String[] args) throws Exception {
        // 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.source
        DataStream<String> socketDS = env.socketTextStream("hadoop001", 9999);

        // 3. Transformation
        //将9,3转为CartInfo(9,3)
        SingleOutputStreamOperator<CartInfo> cartInfoDS = socketDS.map(new MapFunction<String, CartInfo>() {
            @Override
            public CartInfo map(String value) throws Exception {
                String[] arr = value.split(",");
                return new CartInfo(arr[0], Integer.parseInt(arr[1]));
            }
        });

        // 需求1:每10秒钟统计一次,最近10秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滚动窗口
        SingleOutputStreamOperator<CartInfo> result = cartInfoDS.keyBy(CartInfo::getSensorId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .sum("count");

        // 需求2:每10秒钟统计一次,最近20秒钟内,各个路口/信号灯通过红绿灯汽车的数量--基于时间的滑动窗口
        SingleOutputStreamOperator<CartInfo> result1 = cartInfoDS.keyBy(CartInfo::getSensorId)
                .window(SlidingProcessingTimeWindows.of(Time.seconds(20), Time.seconds(10)))
                .sum("count");
        //4.Sink
        //result.print();
        result1.print();

        //5.execute
        env.execute();
    }

    @Data
    @AllArgsConstructor
    @NoArgsConstructor
    public static class CartInfo {
        private String sensorId;//信号灯id
        private Integer count;//通过该信号灯的车的数量
    }
}

5.重启程序,查看结果

输入数据:

9,3
9,2
9,7
4,9
2,6
1,5
2,3
5,7
5,5
9,3
9,2
9,7
4,9
2,6
1,5

结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/442243.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

python和PyTorch知识

解包操作 可变参数和关键字参数是 Python 函数的两种参数类型。 b, *_ t.shape python中这个用法是什么意思 我们使用 _” 来“忽略”一个或多个值&#xff08;表示我们不需要这些值&#xff09;&#xff0c;然后将“t.shape”元组的第一个元素赋值给变量“b”。 pytorc…

使用ChatGPT的方法和替代方案

作为互联网应用&#xff0c;ChatGPT也有国内化的替代方案。在国内&#xff0c;一些公司已经开始利用深度学习技术开发本地化的语言模型&#xff0c;例如阿里巴巴的通义千问、华为的盘古大语言模型&#xff0c;以及百度的文心一言等等&#xff0c;这些模型可以完成自然语言处理任…

javassist 字节码处理库

目录 一、快速入门 1.1 创建class文件1.2 ClassPool的相关方法1.3 CtClass的相关方法1.4 CtMethod的相关方法1.5 调用生成的类对象 1.5.1 通过反射调用1.5.2 通过接口调用1.6 修改现有的类对象二、将类冻结三、类搜索路径四、$开头的特殊字符五、ProxyFactory的使用 我们知道J…

论文排版怎么排?教您3分钟搞定!

案例&#xff1a;如何对论文进行排版&#xff1f; 【我把写好的毕业论文交给老师检查&#xff0c;老师说我的格式不正确&#xff0c;叫我按照正确的格式进行排版修改。如何对论文进行排版&#xff1f;有没有小伙伴知道论文的正确格式&#xff1f;】 临近毕业&#xff0c;很多…

3.java程序员必知必会类库之junit

前言 单元测试技术的使用&#xff0c;是区分一个一般的开发者和好的开发者的重要指标。程序员经常有各种借口不写单元测试&#xff0c;但最常见的借口就是缺乏经验和知识。常见的单测框架有 JUnit , Mockito 和PowerMock 。本文就Junit展开介绍。 1.介绍 JUnit 是一个 Java …

webservice使用帮助手册

什么是Webservice 简单讲就是一种RPC的实现方式 参考&#xff1a;WebService是什么 SOAP1.1和SOAP1.2的区别 参考&#xff1a;https://www.cnblogs.com/yefengmeander/p/4176771.html 发布Webservice服务 1.用WebService编写一个webservice服务 2. 发布服务 3. 查看发布…

实现匹配搜索词高亮(Vue3)

1.使用插件实现&#xff08;ohlight&#xff09; (1).下载插件 // pnpm pnpm i ohlight // npm npm i ohlight // yarn yarn add ohlight如果让选择版本就按照提示的版本选择 (2).基本使用 >1.(Vue3)的使用 首先在vite.config.js中加入以下代码&#xff1a; export de…

AD9208调试经验分享

背景概述 FMC137 是一款基于 VITA57.4 标准规范的 JESD204B 接口FMC 子 卡 模 块 &#xff0c; 该 模 块 可 以 实 现 4 路 14-bit 、 2GSPS/2.6GSPS/3GSPSADC 采集功能。该板卡 ADC 器件采用 ADI 公司的 AD9208 芯片&#xff0c;&#xff0c;与 ADI 公司的 AD9689 可以实现 …

记一次adb查找安卓App崩溃报错记录

记一次adb查找安卓App崩溃报错记录 首先先说结论&#xff0c;是因为内存不足的时候会出现这种问题 在小米手机上有这么一个设置 可以很方面的模拟出这个异常 然后我们再设置一下logcat日志的大小 如果你的操作真的很多&#xff0c;最好设置一下&#xff0c;如果你的操作很短就…

「区间DP-步入」石子合并(环形)

石子合并&#xff08;环形&#xff09; 题目描述 在一个圆形操场的四周摆放 N 堆石子&#xff0c;现要将石子有次序地合并成一堆&#xff0c;规定每次只能选相邻的2堆合并成新的一堆&#xff0c;并将新的一堆的石子数&#xff0c;记为该次合并的得分。 试设计出一个算法,计算…

Java调用 webService 接口报错

问题描述 本文采用的是 jdk1.8.0_202&#xff0c;装在 D 盘中&#xff0c;配置了系统环境变量 path。 D:\Java\jdk8\bin;D:\Java\jdk8\jre\bin;本文调用 webservice 的依赖如下&#xff1a; <!-- webservice调用依赖cxf--><dependency><groupId>org.apache…

Vue3 + TS4.8其他踩坑记录

坑1&#xff1a;导入SFC组件的时候&#xff0c;直接提示错误&#xff0c;导入 Module "/Users/xinxiang/ProjectSpace/Practice/Vue/vue3-typescript4.8-issue-6305/src/components/HelloWorld.vue" has no default export.Vetur(1192) 解决方案1&#xff1a;Vue3项…

《剑指offer》——刷题日记

本期&#xff0c;给大家带来的是《剑指offer》几道题目的讲解。希望对大家有所帮助&#xff01;&#xff01;&#xff01; 本文目录 &#xff08;一&#xff09;JZ36 二叉搜索树与双向链表 1、题意分析 2、思路讲解 3、代码演示 4、最终结果 &#xff08;二&#xff09;B…

Nginx安装使用记录

参考文章&#xff1a;https://www.runoob.com/linux/nginx-install-setup.html 系统平台&#xff1a;Ubuntu 20.04.4 LTS (GNU/Linux 5.4.0-146-generic x86_64) 下载编译工具 sudo apt-get install zlib zlib-devel gcc-c libtool openssl openssl-develPCRE 官网&#xf…

Python每日一练(20230421)

目录 1. 组合总和 II &#x1f31f;&#x1f31f; 2. 加一 &#x1f31f; 3. 从中序与后序遍历序列构造二叉树 &#x1f31f;&#x1f31f; &#x1f31f; 每日一练刷题专栏 &#x1f31f; Golang每日一练 专栏 Python每日一练 专栏 C/C每日一练 专栏 Java每日一练 …

如何通过开源项目搭建私有云平台--第四步中:安装rancher longhorn

第四步中&#xff1a;安装rancher longhorn&#xff0c;实现容器文件挂载 之前在国企&#xff0c;曾经专门针对容器挂载立专项来调研&#xff0c;当时选择主流的ceph&#xff0c;最终结果是成功搭建了用于k8s集群的共享存储&#xff0c;但要独立找服务器来部署&#xff0c;增加…

Cloud computing(后续慢慢补充)

Cloud computing 可以看到右侧的容器虚拟化架构中&#xff0c;不需要运行额外的OS&#xff0c;这样启动的服务性能会相比于通过虚拟化软件实现的架构更优秀。但是虚拟机同样也有它的优点&#xff0c;比如它的安全、隔离性&#xff0c;可以运行不同的操作系统等等。 Virtualiz…

【linux】linux入门级别指令

一些基础指令 前言用户登录新建用户 ls指令pwd命令cd 指令which指令alias指令touch指令mkdir指令rmdir指令 && rm 指令rmdirrmman指令cp指令mv指令catmoreless指令head 指令tail指令输出重定向时间相关的指令cal指令find指令grep指令zip/unzip指令tar指令bc指令uname指…

Bootstrap01【前端开发框架】家居商城首页之导航轮播图

目录 一.WWW 1.What&#xff1f; 2.Why&#xff1f; 3.Where&#xff1f; 二.环境安装 三.案例 案例1&#xff1a;查询按钮原生态实现对比Bootstrap方式实现 案例3&#xff1a;首页导航原生态实现&#xff08;divcss&#xff09; 案例4&#xff1a;首页导航Bootstrap实…