详解 Flink 的状态管理

news2024/11/25 2:51:18

一、Flink 状态介绍

1. 流处理的无状态和有状态

  • 无状态的流处理:根据每一次当前输入的数据直接转换输出结果的过程,在处理中只需要观察每个输入的独立事件。例如, 将一个字符串类型的数据拆分开作为元组输出或将每个输入的数值加 1 后输出。Flink 中的基本转换算子 (map、filter、flatMap 等) 在计算时不依赖其他数据,所以都属于无状态的算子。

在这里插入图片描述

  • 有状态的流处理:根据每一次当前输入的数据和一些其他已处理的数据共同转换输出结果的过程,这些其他已处理的数据就称之为状态(state),状态由任务维护,可以被任务的业务逻辑访问。例如,做求和(sum)计算时,需要当前输入的数据和保存的之前所有输入数据的和共同计算;窗口操作中会将当前达到的数据和保存的之前已经到达的所有数据共同处理。Flink 中的聚合算子和窗口算子都属于有状态的算子。

    在这里插入图片描述

2. Flink 的状态管理

  • 在传统的事务型处理架构中,状态数据一般是保存在数据库中的,在业务处理过程中与数据库交互进行状态的读取和更新;但对于大数据实时处理架构来说,在业务处理时频繁地读写外部数据库会造成性能达不到要求,因此不能使用数据库进行状态管理
  • 在实时流处理中一般将状态直接保存在内存中来保证性能,但必须使用分布式架构来做扩展,在低延迟、高吞吐的基础上还要保证容错性,一系列复杂的问题随之产生
  • Flink 拥有一套完整的状态管理机制,将底层一些核心功能全部封装起来,包括状态一致性、状态的高效存储和访问、持久化保存和故障恢复以及资源扩展时的调整。开发者只需要调用相应的 API 就可以很方便地使用状态,或对应用的容错机制进行配置,从而将更多的精力放在业务逻辑的开发上

二、Flink 状态分类

1. 托管状态

Managed State,所有的托管状态都由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现

1.1 算子状态

Operator State,状态作用范围限定为当前的算子任务实例,只对当前的并行子任务实例有效;使用较少

在这里插入图片描述

  • 由同一并行任务所处理的所有数据都可以访问到相同的算子状态
  • 算子状态对于同一任务而言是共享的
  • 算子状态不能由相同或不同算子的另一个任务访问
1.1.1 算子状态数据结构
  • 列表状态(List state):将状态表示为一组数据的列表
  • 联合列表状态(Union list state):也是将状态表示为一组数据的列表。与列表状态的区别在于,在发生故障时或者从保存点(savepoint)启动应用程序时恢复的方式不同
  • 广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态
1.1.2 案例
public class TestFlinkOperatorState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //定义一个有状态的map算子,用于统计输入数据个数
        DataStream<Integer> resultStream = dataStream.map(new MyCountMapper());
        
        resultStream.print();
        
        env.execute();
        
    }
    
    //定义有状态的 map 操作
    //实现 ListCheckpointed 接口,泛型为状态数据类型
    public static class MyCountMapper implements MapFunction<SensorReading, Integer>, ListCheckpointed<Integer> {
        //定义一个本地变量作为状态
        private Integer count = 0;
        
        @Override
        public Integer map(SensorReading value) throws Exception {
            count++;
            return count;
        }
        
        //对状态做快照
        @Override
        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(count);
        }
        
        //容错恢复状态
        @Override
        public void restoreState(List<Integer> state) throws Exception {
            for(Integer num : state) {
                count += num;
            }
        }
        
    }
    
}
1.2 按键分区状态

Keyed State,状态的作用范围以 key 来隔离,是根据输入流中定义的键(key)来维护和访问的,所以只能定义在按键分区流(KeyedStream)中,即 keyBy 之后才可以使用

在这里插入图片描述

  • 在进行按键分区(keyBy)之后,具有相同 key 的所有数据,都会分配到同一个并行子任务中,这个任务会维护和处理这个 key 对应的状态实例
  • 一个并行子任务可能会处理多个 key 的数据,所以该任务会为每个 key 都维护一个状态实例
  • 在底层,同一个并行子任务的所有 KeyedState 会根据 key 保存成键值对(key-value)的形式,当一条数据到来时,任务会自动将状态的访问范围限定为当前数据的 key,并从键值对(key-value)存储中读取出对应的状态值
  • 具有相同 key 的所有数据都会到访问相同的状态,而不同 key 的状态之间是彼此隔离的
  • 在应用的并行度改变时,状态也需要随之进行重组。不同 key 对应的 Keyed State 可以进一步组成所谓的键组(key groups),每一组都对应着一个并行子任务。键组是 Flink 重新分配 Keyed State 的单元,键组的数量就等于定义的最大并行度。当算子并行度发生改变时,Keyed State 就会按照当前的并行度重新平均分配,保证运行时各个子任务的负载相同
1.2.1 按键分区状态数据结构
//按键分区状态的实例化方法:在富函数中,调用 getRuntimeContext() 方法获取到运行时上下文之后
ValueState<T> getState(ValueStateDescriptor<T>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
ListState<T> getListState(ListStateDescriptor<T>)
ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
AggregatingState<IN,  OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • 值状态:ValueState<T>,将状态表示为单个的值,值的类型为 T
    • ValueState.value():获取状态值
    • ValueState.update(T value):添加或更新状态值
    • ValueState.clear():清空操作
  • 列表状态:ListState<T>,将状态表示为一组数据的列表,列表里的元素的数据类型为 T
    • ListState.add(T value):追加状态值
    • ListState.addAll(List<T> values):追加状态值列表
    • ListState.get():获取状态值的 Iterable<T>
    • ListState.update(List<T> values):更新状态值列表
    • ListState.clear():清空操作
  • 映射状态:MapState<K, V>,将状态表示为一组 Key-Value 对
    • MapState.get(UK key):获取状态值
    • MapState.put(UK key , UV value):添加或更新状态值
    • MapState.contains(UK key):判断状态值是否存在
    • MapState.remove(UK key):删除状态值
    • MapState.clear():清空操作
  • 聚合状态:ReducingState<T>AggregatingState<I, O>,将状态表示为一个用于聚合操作的列表
    • ReducingState.add():聚合状态值,调用实例化 ReducingState 时自定义 ReduceFunction 中的方法;AggregatingState 同理
    • ReducingState.clear():清空操作,AggregatingState 同理
1.2.2 案例
/**
	按键分区状态的使用步骤:
		1. 在自定义算子Function中声明一个按键分区数据结构,由于声明时需要使用 getRuntimeContext(),因此要使用继承富函数类的方式自定义算子Function
		2. 在自定义算子Function的对应算子方法中进行状态的读写等相关操作
*/
public class TestFlinkKeyedState {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        
        /*
        	需求:自定义有状态的map算子,按sensor_id统计个数
        */
        //使用按键分区状态必须先进行keyBy
        DataStream<Integer> resultStream = dataStream.keyBy("id").map(new MyKeyCountMapper());
        
        resultStream.print();
        
        env.execute();
    }
    
    //使用继承富函数类的方式自定义MapFunction
    public static class MyKeyCountMapper extends RichMapFunction<SensorReading, Integer> {
        
        //定义一个值状态属性
        private ValueState<Integer> myValueState;
        
        //在open方法中实例化值状态
        @Override
        public void open(Configuration parameters) throws Exception {
            myValueState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("value-state", Integer.class));
        }
        
        @Override
        public Integer map(SensorReading value) throws Exception {
            //获取状态值
            Integer count = myValueState.value();
            if(count == null) {
                count = 0;
            }
            
            count++;
                
            //更新状态值
            myValueState.update(count);
            
            return count;
        }
                
    }
}

2. 原始状态

Raw State,原始状态是自定义的,相当于开辟了一块内存,需要开发者自己管理,实现状态的序列化和故障恢复

  • Flink 不会对原始状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储
  • 只有在遇到托管状态无法实现的特殊需求时,才考虑使用原始状态;一般情况下不推荐使用

三、Flink 状态编程案例

/**
	需求:检测同一个传感器的温度值,如果连续的两个温度差值超过 10 度,就输出报警信息
*/
public class FlinkKeyedStateCase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
        });
        
        //定义一个有状态的 flatMap 操作,若同一个传感器连续两个温度的差值超过 10 度,则输出报警
        //报警信息:sensor_id,前一次温度值,当前温度值
        DataStream<Tuple3<String, Double, Double>> warningStream = dataStream.keyBy("id").flatMap(new TempChangeWarning(10.0));
        
        warningStream.print();
        
        env.execute();
    }
    
    //使用继承富函数类的方式自定义FlatMapFunction
    public static class TempChangeWarning extends RichFlatMapFunction<SensorReading, Tuple3<String, Double, Double>> {
        //定义温度差阈值属性
        private Double threshold;
        //定义值状态属性,保存上一次的温度值
        private ValueState<Double> lastTempState;
        
        public TempChangeWarning(Double threshold) {
            this.threshold = threshold;
        }
        
        //在open方法中实例化值状态
        @Override
        public void open(Configuration parameters) throws Exception {
            lastTempState = getRuntimeContext().getState(new ValueStateDescriptor("last-temp", Double.class));
        }
        
        //重写flatMap方法
        @Override
        public void flatMap(SensorReading value, Collector<Tuple3<String, Double, Double>> out) throws Exception {
            //获取上一次温度状态值
            Double lastTemp = lastTempState.value();
            
            //如果状态值不为null,则进行差值判断
            if(lastTemp != null) {
                Double diff = Math.abs(lastTemp - value.getTemperature());
                //差值超过阈值,则输出报警信息
                if(diff >= threshold) {
                    out.collect(new Tuple3<>(value.getId(), lastTemp, value.getTemperature()));
                }
            }
            
            //更新状态值
            lastTempState.update(value.getTemperature());
        }
        
        //在close方法中清空状态
        @Override
        public void close() throws Exception {
            lastTempState.clear();
        }
    }
}

四、Flink 状态后端

State Backends,一个可插入的决定状态的存储、访问以及维护等工作的组件

1. 介绍

​ 在 Flink 中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backends)。状态后端主要负责两件事:一是本地的状态管理,二是将检查点(checkpoint)写入远程的持久化存储。

2. 分类

  • MemoryStateBackend:内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储
    在 TaskManager 的 JVM 堆上;而将 checkpoint 存储在 JobManager 的内存中。

  • FsStateBackend:文件系统级的状态后端,对于本地状态,跟 MemoryStateBackend 一样,也会存储在 TaskManager 的 JVM 堆上,但会将 checkpoint 存储到远程的持久化文件系统(FileSystem)中,如 HDFS。

  • RocksDBStateBackend:将所有状态和 checkpoint 序列化后,存入本地的 RocksDB 中存储。RocksDBStateBackend 的支持并不直接包含在 flink 中,需要引入依赖。

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>
    

3. 配置

3.1 配置文件配置
  • 进入 flink 安装目录下的 conf 目录,打开 flink-conf.yaml 文件

    cd /opt/module/flink/conf
    vim flink-conf.yaml
    
  • 在文件中的 Fault tolerance and checkpointing 部分进行配置

    #Fault tolerance and checkpointing
    #============================================================
    state.backend: filesystem #默认值为 filesystem,可选值为 jobmanager/filesystem/rocksdb
    
    #state.checkpoints.dir: hdfs://namenode:port/flink/checkpoints
    
    jobmanager.execution.failover-strategy: region #容错恢复策略,默认是按区域恢复
    
3.2 代码配置

在代码中为每个作业单独配置状态后端

public class TestStatebackend {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        
        //配置状态后端
        //1.MemoryStateBackend
        env.setStateBackend(new MemoryStateBackend());
        
        //2.FsStateBackend
        env.setStateBackend(new FsStateBackend("hdfs://......"));
        
        //3.RocksDBStateBackend,需要先引入依赖
        env.setStateBackend(new RocksDBStateBackend("checkpointDataUri"));
        
        DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
        
        DataStream<SensorReading> dataStream = inputStream.map(line -> {
            String[] fields = line.split(",");
            return new SensorReading(fileds[0], new Long(fields[1]), new Double(fields[2]));
        });
    
        dataStream.print();
        
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1806133.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言 树与二叉树基础部分

树与二叉树基础部分 树的基础概念二叉树的性质二叉树的遍历前序遍历中序遍历后序遍历层序遍历根据遍历结果恢复二叉树 二叉树的创建第一种第二种 二叉树的其他典型操作查找指定元素&#xff08;一般二叉树&#xff09;二叉树的高度&#xff08;深度&#xff09;二叉树的拷贝二叉…

《python程序语言设计》2018版第5章第48题以0,0为圆心 绘制10个左右的同心圆

在0&#xff0c;0点处绘制10个圆。 其实这个题先要记住python不会0&#xff0c;0为原点进行绘画。 它是按半径来画&#xff0c;所以我们要先把turtle这个小画笔送到它应该去的起点。&#xff08;我经常有这样的错觉&#xff0c;每次都是这样想办法把自己拉回来&#xff09; 我…

【MySQL】(基础篇四) —— 检索数据

检索数据 检索数据是我们使用数据库时进行最多的操作&#xff0c;其中包括了检索条件、排序、过滤、分组等等。我会在后续的多篇博客中为你进行详细地介绍它们。 这次先让我们来粗略的了解一下SELECT&#xff0c;为了使用SELECT检索表数据&#xff0c;必须至少明确两点信息—…

js理解异步编程和回调

什么是异步 计算机在设计上是异步的。 异步意味着事情可以独立于主程序流发生。 当你打开一个网页&#xff0c;网页载入的过程&#xff0c;你又打开了编译器&#xff0c;那么你在网页载入时启动了编译器的行为就是计算机的异步&#xff0c; 可以看出计算机时一个超大的异步…

leetcode(力扣)第15题-三数之和---使用c语言双指针法,二级指针的应用

题目&#xff1a; 15. 三数之和 - 力扣&#xff08;LeetCode&#xff09; 编写过程的问题&#xff1a; 记住线索 1、对数组使用快排排序&#xff1b;2、固定 a 对 b、c 使用双指针&#xff1b;3、注意去重问题。函数返回值的类型。{1&#xff0c;2&#xff0c;-3}。结果作为…

3038. 相同分数的最大操作数目 I(Rust模拟击败100%Rust用户)

题目 给你一个整数数组 nums &#xff0c;如果 nums 至少 包含 2 个元素&#xff0c;你可以执行以下操作&#xff1a; 选择 nums 中的前两个元素并将它们删除。 一次操作的 分数 是被删除元素的和。 在确保 所有操作分数相同 的前提下&#xff0c;请你求出 最多 能进行多少次…

406. 根据身高重建队列(中等)

406. 根据身高重建队列 1. 题目描述2.详细题解3.代码实现3.1 Python3.2 Java 1. 题目描述 题目中转&#xff1a;406. 根据身高重建队列 2.详细题解 做一道题之前先静心&#xff0c;默念三遍一切反动派都是纸老虎。已知一个队列&#xff0c;队列中每个数据表示一个属性&#xf…

百度高级项目经理洪刘生受邀为第十三届中国PMO大会演讲嘉宾

全国PMO专业人士年度盛会 百度在线网络技术&#xff08;北京&#xff09;有限公司IDG智能驾驶业务部高级项目经理洪刘生先生受邀为PMO评论主办的2024第十三届中国PMO大会演讲嘉宾&#xff0c;演讲议题为“互联网PMO赋能战略项目集管理实战分享”。大会将于6月29-30日在北京举办…

【优选算法】优先级队列 {优先级队列解决TopK问题,利用大小堆维护数据流的中位数}

一、经验总结 优先级队列&#xff08;堆&#xff09;&#xff0c;常用于在集合中筛选最值或解决TopK问题。 提示&#xff1a;对于固定序列的TopK问题&#xff0c;最优解决方案是快速选择算法&#xff0c;时间复杂度为O(N)比堆算法O(NlogK)更优&#xff1b;而对于动态维护数据流…

Invalid JSON text:“Invalid value.“ at position 0 in value for column ‘user.info

你们好&#xff0c;我是金金金。 场景 我正在练习mybatis-plus&#xff0c;在插入一条数据的时候报错了&#xff0c;错误信息如上图 排查 排查之前我先贴一下代码 以下为数据库字段类型 在插入的过程中报错&#xff1a;Data truncation: Invalid JSON text: "Invalid val…

后台管理系统开源鉴赏

项目合集 开源仓库组件库vbenjs/vue-vben-adminAnt-Design-Vueflipped-aurora/gin-vue-adminelement-pluschuzhixin/vue-admin-betterelement-pluspure-admin/vue-pure-adminelement-plushonghuangdc/soybean-adminNaive UIHalseySpicy/Geeker-Adminelement-plusjekip/naive-u…

实现手机空号过滤或手机号码有效性验证

手机空号过滤或手机号码有效性验证通常涉及使用专门的API接口来查询手机号码的状态。这些API接口通常由第三方服务提供商提供&#xff0c;它们会与电信运营商合作或利用自己的数据库来验证手机号码是否真实存在、是否已被分配、是否处于空号状态等。 以下是一些步骤和考虑因素…

海洋日特别活动—深海来客——可燃冰

深海中有一种神奇的物质&#xff0c;似冰又不是冰。 别看它其貌不扬&#xff0c;但本领不小&#xff0c;遇火即燃&#xff0c;能量巨大&#xff0c;可谓是能源家族的新宠。它就是被国务院正式批准列为我国第173个矿种的“可燃冰”&#xff01; 可燃冰到底是个啥&#xff1f;它…

【C++】——Stack与Queue(含优先队列(详细解读)

前言 之前数据结构中是栈和队列&#xff0c;我们分别用的顺序表和链表去实现的&#xff0c;但是对于这里的栈和队列来说&#xff0c;他们是一种容器&#xff0c;更准确来说是一种容器适配器 ✨什么是容器适配器&#xff1f; 从他们的模板参数可以看出&#xff0c;第二个参数模…

如何 Logrus IT 的质量评估门户帮助提升在线商店前端(案例研究)

在当今竞争激烈的电子商务环境中&#xff0c;一个运作良好的在线店面对商业成功至关重要。然而&#xff0c;确保目标受众获得积极的用户体验可能是一项挑战&#xff0c;尤其是在使用多种语言和平台时。Logrus IT的质量评估门户是一个强大的工具&#xff0c;可帮助企业简化内容和…

LLVM Cpu0 新后端3

想好好熟悉一下llvm开发一个新后端都要干什么&#xff0c;于是参考了老师的系列文章&#xff1a; LLVM 后端实践笔记 代码在这里&#xff08;还没来得及准备&#xff0c;先用网盘暂存一下&#xff09;&#xff1a; 链接: https://pan.baidu.com/s/1yLAtXs9XwtyEzYSlDCSlqw?…

MacOS中Latex提示没有相关字体怎么办

在使用mactex编译中文的时候&#xff0c;遇到有些中文字体识别不到的情况&#xff0c;例如遇到识别不到Songti.ttc。其实这个时候字体是在系统里面的&#xff0c;但是只不过是latex没有找到正确的字体路径。 本文只针对于系统已经安装了字体库并且能够用find命令搜到&#xff0…

Effective Java 1 用静态工厂方法代替构造器

知识点上本书需要会Java语法和lang、util、io库&#xff0c;涉及concurrent和function包。 内容上主要和设计模式相关&#xff0c;代码风格力求清晰简洁&#xff0c;代码尽量复用&#xff0c;组件尽量少依赖&#xff0c;错误尽早发现。 第1个经验法则&#xff1a;用静态工厂方…

rust学习(字节数组转string)

最新在写数据传输相关的操作&#xff0c;发现string一个有趣的现象&#xff0c;代码如下&#xff1a; fn main() {let mut data:[u8;32] [0;32];data[0] a as u8;let my_str1 String::from_utf8_lossy(&data);let my_str my_str1.trim();println!("my_str len is…

基于JSP技术的社区生活超市管理系统

你好呀&#xff0c;我是计算机学长猫哥&#xff01;如果有相关需求&#xff0c;文末可以找到我的联系方式。 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;JSP技术 工具&#xff1a;MyEclipse开发环境、Tomcat服务器 系统展示 首页 管理员功能模块…