Flink 学习七 Flink 状态(flink state)

news2025/1/9 4:34:47

Flink 学习七 Flink 状态(flink state)

1.状态简介

流式计算逻辑中,比如sum,max; 需要记录和后面计算使用到一些历史的累计数据,

状态就是:用户在程序逻辑中用于记录信息的变量

在Flink 中 ,状态state 不仅仅是要记录状态;在程序运行中如果失败,是需要重新恢复,所以这个状态也是需要持久化;一遍后续程序继续运行

1.1 row state

我们自定义变量来保存数据

public class _01_status_row {

	public static void main(String[] args) throws Exception {
		// 获取环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
		DataStream<String> dataStream = dataStreamSource.map(new MapFunction<String, String>() {
            //自己定义的 变量来保存中间值:这里就无法有效的持久化和恢复
            //状态: raw state  状态
			String oldString = "";

            //如何让flink 来托管我们的状态变量,完成持久化和恢复??
			@Override
			public String map(String value) throws Exception {
                oldString = oldString + value;
				return oldString;
			}
		});
		dataStream.print();
		env.execute();
	}
}

1.2 flink state 托管状态

flink 提供了内置的状态数据管理机制,也叫状态机制: 状态一致性维护,状态数据的访问和存储;

1.3 恢复

Flink 任务是一个JOB .JOB 范围很多Task ,Task 对应示例subtask

是subtask 出错的时候,flink 底层会自动的从帮我们恢复task 的运行

如果是Job失败了 从 flink state 恢复,需要在特殊指定一些参数

2.状态分类

算子状态:

  • 每个subtask 自己持有一份独立的状态数据
  • 算子函数实现CheckpointFunction 后,既可使用算子状态
  • 算子状态: 一般是用于source算子中, 其他场景下建议使用keyedState (键控状态)

键控状态 Keyed State

  • 键控状态,只能使用于KeyedStream 的算子中
  • 算子为每一个key绑定一份独立的状态数据

更多的使用场景是键控状态 Keyed State

3.算子状态 Operator State

每个subtask 自己持有一份独立的状态数据;算子状态,在逻辑上,由算子 task下所有subtask共享;

如何理解:正常运行时,subtask自己读写自己的状态数据;而一旦job重启且带状态算子发生了并行度的变化,则之前的状态数据将在新的一批subtask 间均匀分配

在这里插入图片描述

public class _02_operator_flink_status {

    public static void main(String[] args) throws Exception {
        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        
        //=============配置 ===============
        //需要开启 Checkpoint 机制
        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
        //需要开启持久化的路径  可选hdfs 本地
        env.getCheckpointConfig().setCheckpointStorage("file:///D:/Resource/FrameMiddleware/FlinkNew/sinkout2/");
        //task级别的failover
        //一个task 失败 job 失败 ,有很多重启策略
        //env.setRestartStrategy(RestartStrategies.noRestart());
        //task 失败 重启最多3次 , 失败后1秒重启
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000));
        //=============配置 ===============

        DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
        DataStream<String> dataStream = dataStreamSource.map(new StateMapFunction());
        dataStream.print();
        env.execute();
    }
}


class StateMapFunction implements MapFunction<String,String> , CheckpointedFunction {

    ListState<String> listState;

    //正常的处理逻辑
    @Override
    public String map(String value) throws Exception {
        listState.add(value);
        Iterable<String> strings = listState.get();
        StringBuilder sb = new StringBuilder();
        for (String string : strings) {
            sb.append(string);
        }
        //写一个异常
        if(value.length()==5){
            int a = 1/ 0;
        }
        return sb.toString();
    }

    //持久化之前会调用的方法
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        long checkpointId = context.getCheckpointId();
        System.out.println("执行快照!!!!!"+ checkpointId);
    }

    //算子的任务在启动之前,会调用下面的方法,为用户的状态初始化
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        //context 获取状态存储器
        OperatorStateStore operatorStateStore = context.getOperatorStateStore();
        //定义一个昨天存储结构的描述器
        ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("保存字符串", String.class);
        //获取状态存储器 中获取容器来存储器
        //getListState 方法还会加载之前存储的状态数据
         listState = operatorStateStore.getListState(listStateDescriptor);
    }
}

3.键控状态 Keyed State

3.1 基础概念

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-981L9koP-1687272668448)(flink7手绘/state_partitioning.svg)]1

不同点:

算子状态中,一个算子有一个状态存储空间

Keyed State:每个Key 都是有自己的状态存储空间

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Wq7IOvvT-1687272668448)(flink7手绘/state_keyed.png)]

3.2 示例

public class _03_keyed_flink_status {

    public static void main(String[] args) throws Exception {
        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //需要开启 Checkpoint 机制
        env.enableCheckpointing(1000, CheckpointingMode.EXACTLY_ONCE);
        //需要开启持久化的路径  可选hdfs 本地
        env.getCheckpointConfig().setCheckpointStorage("file:///D:/Resource/FrameMiddleware/FlinkNew/sinkout4/");
        //task级别的failover
        //一个task 失败 job 失败
        env.setRestartStrategy(RestartStrategies.noRestart());
        //task 失败 重启最多3次 , 失败后1秒重启
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,1000));

        DataStream<String> dataStreamSource = env.socketTextStream("192.168.141.141", 9000);
        DataStream<String> dataStream = dataStreamSource.keyBy(x -> x)
                .map(new KeyedStateMapFunction()).setParallelism(2);
        dataStream.print("===>").setParallelism(3);
        env.execute();
    }
}

//flink 状态管理 算子需要实现CheckpointedFunction
class KeyedStateMapFunction extends RichMapFunction<String, String>{
    ListState<String> listState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        RuntimeContext runtimeContext = getRuntimeContext();
        ListStateDescriptor<String> listStateDescriptor = new ListStateDescriptor<>("保存字符串", String.class);
         listState = runtimeContext.getListState(listStateDescriptor);
    }
    //正常的处理逻辑
    @Override
    public String map(String value) throws Exception {
        listState.add(value);
        Iterable<String> strings = listState.get();
        StringBuilder sb = new StringBuilder();
        for (String string : strings) {
            sb.append(string);
        }
        //写一个异常
        if(value.length()==5){
            int a = 1/ 0;
        }
        return sb.toString();
    }
}

//======
[root@localhost ~]# nc -lk 9000
a
a
a
b
b
b
c
c
c
c
d
d
d
 控制台数据输出为
===>:2> a
===>:3> aa
===>:1> aaa
===>:1> b
===>:2> bb
===>:3> bbb
===>:1> c
===>:2> cc
===>:3> ccc
===>:1> cccc    ========> 每个key 都有一个自己的ListState<String> listState;

3.3 状态API 使用

class KeyedStateMapFunction_2 extends RichMapFunction<String, String>{
    ValueState<String> valueState;
    ListState<String> listState;
    MapState<String, String> mapState;
    ReducingState<Integer> reducingState;
    AggregatingState<Integer, Double> aggState;

    @Override
    public void open(Configuration parameters) throws Exception {
        RuntimeContext runtimeContext = getRuntimeContext();

        //单值状态存储器
         valueState = runtimeContext.getState(new ValueStateDescriptor<String>("string", String.class));
         //列表状态存储器
         listState = runtimeContext.getListState(new ListStateDescriptor<>("list", String.class));
         //map 状态存储器
         mapState = runtimeContext.getMapState(new MapStateDescriptor<String, String>("map", String.class, String.class));
         //做累加 reduce
         reducingState = runtimeContext.getReducingState(new ReducingStateDescriptor<Integer>("reduce", new ReduceFunction<Integer>() {
            @Override
            public Integer reduce(Integer value1, Integer value2) throws Exception {
                return value1+value2;
            }
        }, Integer.class));
         //记录聚合状态  --> 平均值
        AggregatingState<Integer, Double> aggState = runtimeContext.getAggregatingState(new AggregatingStateDescriptor<>("aggState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
            @Override
            public Tuple2<Integer, Integer> createAccumulator() {
                return Tuple2.of(0, 0);
            }

            @Override
            public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
                return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
            }

            @Override
            public Double getResult(Tuple2<Integer, Integer> accumulator) {
                return Double.valueOf(accumulator.f1 / accumulator.f0);
            }

            //批处理会使用
            @Override
            public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                return Tuple2.of(a.f0 + b.f0, b.f0 + b.f1);
            }
        }, TypeInformation.of(new TypeHint<Tuple2<Integer, Integer>>() {
        })));
    }
    //正常的处理逻辑
    @Override
    public String map(String value) throws Exception {
        //valueState
        valueState.update("new value");//更新值
        String value1 = valueState.value();//q取值

        //listState
        listState.add(value); //添加一个数据
        listState.addAll(Arrays.asList("1","2")); //添加多个数据
        listState.update(Arrays.asList("1","2")); //替换原有数据

        //mapState
        Iterable<String> keys = mapState.keys(); 
        boolean contains = mapState.contains("1");
        mapState.put("1","2");  //添加数据
        Map<String,String> map = new HashMap<>();
        map.put("1","2");
        mapState.putAll(map);//批量添加数据


        //reducingState
        //做累加
        reducingState.add(Integer.valueOf(value));
        Integer integer = reducingState.get(); //取值
        //计算平均值
        aggState.add(Integer.valueOf(value));
        Double aDouble = aggState.get();//取值
        return value1;
    }
}

3.4 状态的TTL 管理

        RuntimeContext runtimeContext = getRuntimeContext();
        //单值状态存储器
        ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("string", String.class);
        //存活时间和过期 参考
        StateTtlConfig build = StateTtlConfig.newBuilder(Time.milliseconds(5000))  //数据存活时间
                .setTtl(Time.milliseconds(5000)) //数据存活时间 和上面效果一样
                .updateTtlOnCreateAndWrite() //插入和更新时 TTL 重新计算存活时间
                .updateTtlOnReadAndWrite()  //读或者写 TTL 重新计算存活时间  //比如List 是单条数据  Map 则是一个Key value 是一个单独的TTL
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //返回已经过期的数据
                .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) //没清楚可以返回过期数据
                .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)//TTL处理时间语义
                .useProcessingTime() //效果同上
                .cleanupFullSnapshot()//清理过期状态数据 在checkpoint 的时候
                .cleanupInRocksdbCompactFilter(1000) //只对rocksdb 生效 在rockdb Compact机制在Compact 时过期时间清理
                .build();
        valueStateDescriptor.enableTimeToLive(build);
        valueState = runtimeContext.getState(valueStateDescriptor);

4.状态后端

4.1 基础概念

状态数据的存储管理的实现,状态数据的本地读写,远端快照数据存储

状态后端是可插拔替换的,它对上层屏蔽了底层的差异,因为在更换状态后端时,用户的代码不需要做任何更改

4.2 可用的状态后端

  • HashMapStateBacked

    • heap 堆内存,溢出的话就是本地磁盘,对象的形式存在
    • 大规模数据内存不够会溢出到磁盘
    • 支持大规模数据状态,若有溢出到磁盘,则效率会明显降低
  • EmbeddedRocksDBStateBackend

    • 数据状态交给RocksDb 管理和存储
    • 数据是序列化的KV 字节存储 ,
    • RocksDb 中的数据,会存在内存缓存和磁盘
    • RocksDb 对磁盘数据读取较快,性能不会有较大印象

    两种状态后端策略 生成快照checkpoint 文件是一样的 ,重启后改变StateBacked 可以兼容运行;程序在重启后改变状态后端的方式不影响程序运行;

4.3设置状态后端

// HashMapStateBacked    
env.setStateBackend(new HashMapStateBackend());

//EmbeddedRocksDBStateBackend  
env.setStateBackend(new EmbeddedRocksDBStateBackend());

5.广播状态 broadcast state

前面章节说的流的join 的时候 广播就使用到了 broadcast state

Flink 学习三 Flink 流&process function API ==> 1.7.broadcast

new BroadcastProcessFunction();  

状态后端的方式不影响程序运行;**

4.3设置状态后端

// HashMapStateBacked    
env.setStateBackend(new HashMapStateBackend());

//EmbeddedRocksDBStateBackend  
env.setStateBackend(new EmbeddedRocksDBStateBackend());

5.广播状态 broadcast state

前面章节说的流的join 的时候 广播就使用到了 broadcast state

Flink 学习三 Flink 流&process function API ==> 1.7.broadcast

new BroadcastProcessFunction();  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/668171.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于Django的疫情困扰下的民慧钢材销售分析及纾困策略-计算机毕设 附源码87656

基于Django的疫情困扰下的民慧钢材销售分析及纾困策略 摘 要 疫情之下&#xff0c;实体经济面临下行压力。2019年以来&#xff0c;新冠肺炎疫情卷土而来&#xff0c;各地地疫情防控形势严峻&#xff0c;许多中小微企业经营发展屡次遭受打击。面对疫情常态化的社会现实&#x…

[学习笔记] [机器学习] 13. 集成学习进阶(XGBoost、OTTO案例实现、LightGBM、PUBG玩家排名预测)

视频链接数据集下载地址&#xff1a;无需下载 学习目标&#xff1a; 知道 XGBoost 算法原理知道 otto 案例通过 XGBoost 实现流程知道 LightGBM 算法原理知道 PUBG 案例通过 LightGBM 实现流程知道 Stacking 算法原理知道住房月租金预测通过 Stacking 实现流程 1. XGBoost 算…

SPI协议(嵌入式学习)

SPI协议 概念时序SPI通信模式图四种通信模式 优缺点 概念 SPI&#xff08;Serial Peripheral Interface&#xff09;是一种串行外设接口协议&#xff0c;用于在数字系统之间进行通信。它被广泛应用于嵌入式系统和电子设备中&#xff0c;用于连接微控制器、传感器、存储器、显示…

Linux权限管理(超详解哦)

Linux权限 引言文件访问者的分类文件类型与访问权限文件类型访问权限 文件权限值的表示方法修改权限的指令chmod修改文件权限通过角色/-/权限来修改通过三个八进制数修改 chown修改所有者chgrp修改所属组umask修改或查看文件权限掩码文件创建时的权限 目录的权限粘滞位 总结 引…

【命令参数】SVN - 环境配置及常用命令参数

目录 环境配置 基本语法 参数指令 SVN是一款基于C/S架构的版本控制系统&#xff0c;能够实现对产品项目的版本托管以及对源码库的高效管理。而掌握SVN中的一些命令参数&#xff0c;一定程度上可以使日常效率得到进一步提升。 环境配置 为在调用时更加便捷&#xff0c;通常会…

我们如何实现业务操作日志功能?

1. 需求 我们经常会有这样的需求&#xff0c;需要对关键的业务功能做操作日志记录&#xff0c;也就是用户在指定的时间操作了哪个功能&#xff0c;操作前后的数据记录&#xff0c;必要的时候可以一键回退&#xff0c;今天我就为大家实现这个的功能&#xff0c;让大家可以直接拿…

哈尔滨工业大学计算机考研分析

关注我们的微信公众号 姚哥计算机考研 更多详情欢迎咨询 哈尔滨工业大学&#xff08;A&#xff09;考研难度&#xff08;☆☆☆☆☆&#xff09; 哈尔滨工业大学计算机考研招生学院是计算学部、计算学部&#xff08;深圳&#xff09;和计算学部&#xff08;威海&#xff09;…

C++完成烧烤节管理系统

背景&#xff1a; 这次我们结合今年淄博烧烤做一个餐厅管理系统&#xff0c;具体需求如下&#xff0c;我们选择的是餐饮商家信息管理 问题描述&#xff1a; 淄博烧烤今年大火&#xff0c;“进淄赶烤”是大家最想干的事情&#xff0c;淄博烧烤大火特火的原因&#xff0c;火的…

C语言之文件的读写(1)

前面三部分已经给大家介绍过了&#xff0c;网址发给大家方便大家复习 打开方式如下&#xff1a; 文件使用方式 含义 如果指定文件不存在 “r”&#xff08;只读&#xff09; 为了输入数据&#xff0c;打开一个已经存在的文本文件 出错 “w”&#xff08;只写&#xff09; 为了输…

文心一言眼里的Java世界

目录 一、Java基础教程系列二、先听听文心一言怎么说&#xff1f;三、话不多说&#xff0c;开干。1、要有一个正确的Java学习路线&#xff0c;做一个细致的Java学习规划。2、学习资料推荐3、书中自有黄金屋&#xff0c;书中自有颜如玉4、自学周期推荐5、效率为先6、哪吒的学习方…

Redis 数据分布优化:如何应对数据倾斜?

Redis 核心技术与实战 笔记 作者&#xff1a; 蒋德钧 在切片集群中&#xff0c;数据会按照一定的分布规则分散到不同的实例上保存。比如&#xff0c;在使用 Redis Cluster 或 Codis 时&#xff0c;数据都会先按照 CRC 算法的计算值对 Slot&#xff08;逻辑槽&#xff09;取模&a…

Hi3861开发第一节:环境搭建,并顺利完成编译

本次教程在纯Windows下环境搭建&#xff01;&#xff01;! 1.DecEco Device Tool下载和安装 步骤一&#xff1a;下载devicetool-windows-tool-3.1.0.400.zip版&#xff0c;下载网址&#xff1a;https://device.harmonyos.c om/cn/develop/ide#download 步骤二&#xff1a;解压…

Win10连接网络打印机提示0x0000052e?

Win10连接网络打印机提示0x0000052e&#xff1f;Win10电脑中用户连接网络打印机的时候&#xff0c;出现了错误代码0x0000052e&#xff0c;导致用户无法正常使用网络打印机&#xff0c;这时候用户可以通过卸载最新补丁、替换系统文件并修改注册表等方法来解决问题。 方法一&…

Spring6 i18n国际化

随着互联网的发展&#xff0c;越来越多的企业和个人开始关注全球化的需求。在这个背景下&#xff0c;多语言支持成为了一个重要的课题。Spring框架作为一款优秀的Java开发框架&#xff0c;提供了丰富的i18N支持&#xff0c;能帮助搬砖工快速实现多语言应用。 1、i18n概述 国际…

【Android开发基础】计算器逻辑层代码补充

文章目录 一、引言二、设计1、案例2、算法设计 三、编码1、UI界面设计&#xff08;1&#xff09;按钮样式设计&#xff08;2&#xff09;主界面布局设计 2、编码&#xff08;1&#xff09;控件初始化&#xff08;2&#xff09;事件监听器 四、附件 一、引言 描述&#xff1a;关…

【深度学习】4-1 误差反向传播法 - 计算图链式法则反向传播

上一章中神经网络的学习是通过数值微分计算的神经网络的权重参数的梯度。数值微分虽然简单&#xff0c;也容易实现&#xff0c;但缺点是计算上比较费时间。本章将学习一个能够高效计算权重参数的梯度的方法一一误差反向传播法。 误差反向传播法能够高效计算权重参数的梯度的方法…

Angular 安装与创建第一项目

1. 下载nodejs 并且安装 https://nodejs.org/en 2. 打开命令窗口&#xff0c;验证是否安装成功 C:\Users\Harry>node -v v18.16.0C:\Users\Harry>npm -v 9.5.1 3. 安装Angular CLI C:\Users\Harry>npm install -g angular/cliadded 239 packages in 9s npm notic…

Python Anaconda创建虚拟环境及Pycharm使用虚拟环境

目录 前言 一、Anaconda与Pycharm 二、conda常用命令 三、Pycharm使用虚拟环境 总结 前言 我们在做开发任务时可能会创建多个项目&#xff0c;这些项目可能会依赖于不同的Python环境。比如有的用到Python3.6、有的用到Python3.7&#xff1b;有的用Pytorch开发、有的用Tens…

SpringBoot整合模板引擎Thymeleaf(4)

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 概述 在之前的教程中&#xff0c;我们介绍了Thymeleaf的基础知识。在此&#xff0c;以案例形式详细介绍Thymeleaf的基本使用。 项目结构 要点概述&#xff1a; 1、在st…