大数据-124 - Flink State 01篇 状态原理和原理剖析:状态类型 执行分析

news2024/9/20 12:38:06

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(已更完)
  • Flink(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Flink 并行度
  • Flink 并行度详解
  • Flink 并行度 案例
    在这里插入图片描述

状态类型

Flink根据是否需要保存中间结果,把计算分为有状态计算和无状态计算。

  • 有状态计算:依赖之前或之后的事件
  • 无状态计算:独立

根据数据结构不同,Flink定义了多种State,应用于不同的场景。

  • ValueState:即类型为T的单值状态,这个状态与对应的Key绑定,是最简单的状态了。它可以通过update方法更新状态值,通过 value() 方法获取状态值
  • ListState:即Key上的状态值为一个列表,可以通过add方法往列表中附加值,也可以通过get()方法返回一个Iterable来遍历状态值
  • ReducingState:这种状态通过用户传入的ReduceFunction,每次调用add方法添加值的时候,会调用ReduceFunction,最后合并到一个单一的状态值。
  • FoldingState:跟ReducingState有点类似,不过它的状态值类型可以与add方法中传入的元素类型不同(这种状态会在未来的Flink版本当中删除)
  • MapState:即状态值为一个Map,用户通过put和putAll方法添加元素

State按照是否有Key划分为:

  • KeyedState
  • OperatorState

案例1 利用State求平均值

实现思路

  • 读数据源
  • 将数据源根据Key分组
  • 按照Key分组策略,对流式数据调用状态化处理:实例化出一个状态实例,随着流式数据的到来更新状态,最后输出结果

编写代码

package icu.wzk;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;


public class FlinkStateTest01 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Tuple2<Long, Long>> data = env
                .fromElements(
                        Tuple2.of(1L, 3L),
                        Tuple2.of(1L, 5L),
                        Tuple2.of(1L, 7L),
                        Tuple2.of(1L, 4L),
                        Tuple2.of(1L, 2L)
                );
        KeyedStream<Tuple2<Long, Long>, Long> keyed = data
                .keyBy(new KeySelector<Tuple2<Long, Long>, Long>() {
                    @Override
                    public Long getKey(Tuple2<Long, Long> value) throws Exception {
                        return value.f0;
                    }
                });
        SingleOutputStreamOperator<Tuple2<Long, Long>> flatMapped = keyed
                .flatMap(new RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
                    private transient ValueState<Tuple2<Long, Long>> sum;

                    @Override
                    public void flatMap(Tuple2<Long, Long> value, Collector<Tuple2<Long, Long>> out) throws Exception {
                        Tuple2<Long, Long> currentSum = sum.value();
                        if (currentSum == null) {
                            currentSum = Tuple2.of(0L, 0L);
                        }
                        // 更新
                        currentSum.f0 += 1L;
                        currentSum.f1 += value.f1;
                        System.out.println("currentValue: " + currentSum);
                        // 更新状态值
                        sum.update(currentSum);
                        // 如果 count >= 5 清空状态值 重新计算
                        if (currentSum.f0 >= 5) {
                            out.collect(new Tuple2<>(value.f0, currentSum.f1 / currentSum.f0));
                            sum.clear();
                        }
                    }

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>(
                                "average",
                                TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {})
                        );
                        sum = getRuntimeContext().getState(descriptor);
                    }
                });
        flatMapped.print();
        env.execute("Flink State Test");
    }
}

运行结果

在这里插入图片描述

执行分析

在这里插入图片描述
在这里插入图片描述

Keyed State

表示和Key相关的一种State, 只能用于KeyedStream类型数据集对应的Function和Operator之上,KeyedState是OperatorState的特例,区别在于KeyedState事先按照Key对数据集进行了区分,每个KeyState仅对应一个Operator和Key的组合。

KeyedState可以通过KeyGroups进行管理,主要用于当算子并行度发生变化时,自动重新分布KeyedState数据。在系统运行过程中,一个Keyed算子实例可能运行一个或者多个KeyGroups的Keys。

Operator State

与 Keyed State 不同的是,Operator State 只和并行的算子实例绑定,和数据元素中的Key无关,每个算子实例中持有所有数据元素中的一部分状态数据。Operator State 支持算子实例并行度发生变化时自动重新分配状态数据。

同时在Flink中KeyedState和OperatorState均具有两种形式,其中一种为托管状态(Managed State)形式,由FlinkRuntime中控制和管理状态数据,并将状态数据转换为内存HashTables或RocksDB的对象存储,然后将这些状态数据通过内部的接口持久话到CheckPoints中,任务异常时可以通过这些状态数据恢复任务。另外一种是原生状态(Row State)形式,由算子自己管理数据结构,当触发CheckPoint中,当从CheckPoint恢复任务时,算子自己再返序列化出状态的数据结构。

DataStreamAPI支持使用ManagedState和RawState两种状态形式,在Flink中推荐用户使用ManagedState管理状态数据,主要原因是ManagedState能够更好地支持状态数据的重平衡以及更加完善的内存管理。

状态描述

在这里插入图片描述

State既然是暴露给用户的,那么就需要有一些属性需要指定:

  • State名称
  • Value Serializer
  • State Type Info

在对应的StateBackend中,会去调用对应的create方法获取到stateDescriptor中的值。
Flink通过StateDescriptor来定义一个状态,这是一个抽象类,内部定义了状态名称、类型、序列化器等基础信息,与上面的状态对应,从StateDescriptor派生ValueStateDescriptor、ListStateDescriptor等等

  • ValueState getState(ValueStateDescriptor)
  • ReducingState getReducingState(ReducingStateDescriptor)
  • ListState getListState(ListStateDescriptor)
  • FoldingState getFoldingState(FoldingStateDescriptor)
  • MapState getMapState(MapStateDescriptot)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2114365.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Pr:首选项 - 音频硬件

Pr菜单&#xff1a;编辑/首选项 Edit/Preferences Premiere Pro 首选项中的“音频硬件” Audio Hardware选项卡可以指定计算机的音频设备和设置&#xff0c;还可以指定 Pr 用于音频回放和录制的 ASIO 和 MME 设置&#xff08;仅限 Windows&#xff09;或 CoreAudio 设置&#x…

C#使用GDI+实现生成验证码

目录 下面是代码的详细解释&#xff1a; 类声明和成员变量 构造函数 窗体加载事件 生成验证码方法 pictureBox1 点击事件 按钮点击事件 完整代码&#xff1a; 总结 这段代码是一个C# Windows Forms应用程序的一部分&#xff0c;用于生成和验证验证码。 下面是代码的详细解…

C# Windows Forms实现绘制画板

目录 C# Windows Forms上绘制画板&#xff1a; 详细解释&#xff1a; TempData临时数据&#xff0c;用来保存画笔相关的信息&#xff0c;如&#xff1a;颜色&#xff0c;大小&#xff0c;坐标等 类声明和成员变量 构造函数 文件菜单项点击事件 保存菜单项点击事件 画笔大…

浙大数据结构:03-树2 List Leaves

这道题我借用了一点上一题的代码思路&#xff0c;这题考察的主要是层序遍历&#xff0c;即用队列来实现&#xff0c;当然此处我依然采用数组模拟队列来实现。 机翻 1、条件准备 map的键存下标&#xff0c;后面值分别存左右子树的下标&#xff0c;没有子树就存-1. head数组只…

python脚本源码如何使用PyOxidizer编译Windows可执行文件

使用 PyOxidizer 将上述代码编译为 Windows 可执行文件&#xff0c;可以按照以下步骤进行&#xff1a; 一、准备工作 确保已经安装了 PyOxidizer 和 Rust 开发环境&#xff0c;如前文所述。 二、创建 PyOxidizer 配置文件 创建一个名为pyoxidizer.toml的配置文件&#xff0c;内…

go急速入门API开发

go急速入门 1、安装Go和对应编辑器2、编写helloWord3、项目目录开发4、编写一个http服务器5、 使用Gin框架基本使用使用部分中间件自定义中间件 6、部署 1、安装Go和对应编辑器 go官网&#xff0c;下载自己电脑对应版本即可。安装完成之后打开cmd输入go即可弹出对应提示。 对…

ffmpeg(各个系统版本安装- Windows11-Mac-Linux)

各个系统上的安装不建议使用编译安装&#xff0c;大佬的话可以 编译安装会各种环境问题&#xff0c;直接使用别人安装好的就行 1.Windows11上安装ffmpeg 1.官网下载ffmpeg 进入Download FFmpeg网址&#xff0c;点击下载windows版ffmpeg&#xff0c;使用别人编译好的版本即可 …

文法的例题

答案&#xff1a;B 知识点&#xff1a; 文法 一个形式文法是一个有序四元组G{V,T,S,P}&#xff0c;其中&#xff1a; V&#xff1a;非终结符。不是语言组成部分&#xff0c;不是最终结果&#xff0c;可理解为占位符 T&#xff1a;终结符。是语言的组成部分&#xff0c;是最…

自用NAS系列1-设备

拾光坞 拾光坞多账号绑定青龙面板SMBWebdav小雅alist下载到NASDocker安装迅雷功能利用qBittorrentEEJackett打造一站式下载工具安装jackett插件 外网访问内网拾光客户端拾光穿透公网ipv6路由器配置ipv6拾光坞公网验证拾光坞域名验证 拾光坞 多账号绑定 手机注册拾光坞账号&am…

Web Bluetooth 与点对点连接

前言 需求需要实现手持终端设备与 web 网页的点对点数据传输&#xff0c;不希望有服务器参与&#xff0c;想到了 web 的 USB 与 Bluetooth API&#xff0c;对 Web Bluetooth API 进行了研究。 蓝牙 GATT 基础知识 GATT&#xff08;通用属性配置文件&#xff0c;蓝牙低功耗&a…

K8S 发布应用

前言 昨儿个用 unbuntu20.04 又装了一次K8S 用的 kubeadm containerd Cilium (CNI) 又重新撸了一遍 这里只记录 应用发布的笔记 正文 #创建deployment kubectl create deployment nginx --imagenginx #我这边大约30秒后显示为 ready kubectl get deployments kubectl desc…

4.7 Sensors -- useScroll

4.7 Sensors – useScroll https://vueuse.org/core/useScroll/ 作用 响应式的监听滚动位置和状态。 官方示例 <script setup lang"ts"> import { useScroll } from vueuse/coreconst el ref<HTMLElement | null>(null) const { x, y, isScrolling…

【Python系列】只更新非空的字段

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…

数字电路专题:verilog 阻塞赋值和非阻塞赋值

verilog 阻塞赋值 和 非阻塞赋值 “”阻塞赋值&#xff0c; ”<”非阻塞赋值。阻塞赋值为执行完一条赋值语句&#xff0c;再执行下一条&#xff0c;可理解为顺序执行&#xff0c;而且赋值是立即执行&#xff1b; 非阻塞赋值可理解为并行执行&#xff0c;不考虑顺序&#x…

【python计算机视觉编程——7.图像搜索】

python计算机视觉编程——7.图像搜索 7.图像搜索7.1 基于内容的图像检索&#xff08;CBIR&#xff09;从文本挖掘中获取灵感——矢量空间模型&#xff08;BOW表示模型&#xff09;7.2 视觉单词**思想****特征提取**&#xff1a; 创建词汇7.3 图像索引7.3.1 建立数据库7.3.2 添加…

构建并训练卷积神经网络(CNN)对CIFAR-10数据集进行分类

深度学习实践&#xff1a;构建并训练卷积神经网络&#xff08;CNN&#xff09;对CIFAR-10数据集进行分类 引言 在计算机视觉领域中&#xff0c;CIFAR-10数据集是一个经典的基准数据集&#xff0c;广泛用于图像分类任务。本文将介绍如何使用PyTorch框架构建一个简单的卷积神经…

微信小程序uniappvue3版本-控制tabbar某一个的显示与隐藏

1. 首先在pages.json中配置tabbar信息 2. 在代码根目录下添加 tabBar 代码文件 直接把微信小程序文档里面的四个文件复制到自己项目中就可以了 3. 根据自己的需求更改index.js文件 首先我这里需要判断什么时候隐藏某一个元素&#xff0c;需要引入接口 然后在切换tabbar时&#…

git:认识git和基本操作(1)

目录 一、版本控制器 1.安装git 2.创建git本地仓库 3.配置git 二、git操作&#xff08;1&#xff09; 1.工作区、暂存区、版本库 2.添加文件 3.查看.git 4.修改文件 一、版本控制器 所谓的版本控制器&#xff0c;就是能让你了解到每一个文件的修改历史。相应的&#x…

Maven的安装

一、安装 压缩包解压完的目录如下所示&#xff08;此处为绿色免安装版&#xff09;&#xff1a; &#xff08;其余三个文件是针对Maven版本&#xff0c;第三方软件等简要介绍&#xff09; 二、环境变量 前提&#xff1a; jdk最低版本为JAVA7&#xff08;即jdk17&#xff09…

R语言统计分析——重复测量方差分析

参考资料&#xff1a;R语言实战【第2版】 所谓重复测量方差分析&#xff0c;即受试者被测量不止一次。本例使用数据集市co2数据集&#xff1a;因变量是二氧化碳吸收量&#xff08;uptake&#xff09;&#xff0c;自变量是植物类型&#xff08;Type&#xff09;和七种水平的二氧…