Flink之状态TTL机制内容详解

news2024/9/27 9:26:26

1 状态TTL机制

状态的 TTL机制就是Flink提供的自动化删除状态中的过期数据,配置 TTL的 API可以做到对状态中的数据进行冷热数据分离,将热数据一直保存在状态存储器中,将冷数据进行定期删除.
1.1 API简介

TTL常用API如下:

API注解
setTtl(Time.seconds(…))配置过期时长,当状态中的数据到达这个时长则判定为过期数据,在new StateTtlConfig.Builder(Time.seconds(...))也可以配置,如果同时调用setTtl()方法则进行覆盖
updateTtlOnCreateAndWrite()当该条数据在State中插入或者更新的时候,刷新计时,可用于冷热数据分离
updateTtlOnReadAndWrite()读或写都刷新该数据的TTL计时,可用于冷热数据分离
setStateVisibility(…)用于控制状态中过期数据的可见性,当方法中设置StateTtlConfig.StateVisibility.NeverReturnExpired)时则不可见过期未被清理的数据,如果设置StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp则可见过期未被清理的数据.setStateVisibility(...)由异步线程执行,默认是NeverReturnExpired.
setTtlTimeCharacteristic(…)指定TTL的时间语义,默认是event time,可以配置process time,将StateTtlConfig.TtlTimeCharacteristic.ProcessingTime填入方法的参数即可.
disableCleanupInBackground()禁用后台清理过期数据,使用后则不会清理过期数据
cleanupIncrementally(… , …)针对本地状态后端,即HashMapStateBackend. 增量清理, 每当访问状态数据时都会驱动一次过期检查,清除其中部分数据, 这也是HashMapBackend状态后端唯一能真正清理过期数据的方法,cleanupIncrementally(... , ...)方法中需要传入两个参数int cleanupSizeboolean runCleanupForEveryRecord,cleanupSize是指key的数据量,runCleanupForEveryRecord是指是否清理所有过期数据,如果runCleanupForEveryRecord设置的值为true此时cleanupSize就会失效,但是状态数据较多时会严重影响时效性.
cleanupFullSnapshot()针对快照数据,即checkpoint快照. 全量清理, 在做快照时将所有的过期数据进行清理保证快照中没有过期数据,但是状态后端中的过期数据没有进行清理.
cleanupInRocksdbCompactFilter(xxx)针对于RocksdbStateBackend. 只生效于RocksDB状态后端,通过Flink将CompactFilter传给RocksDB,在RocksDBCompact过程中根据过滤条件将过期数据删除,传入的参数为过期时间.
1.2 代码模板
  • 代码

    class StateMapFunc2 implements MapFunction<String, String>, CheckpointedFunction {
        private ListState<String> listState;
    
        @Override
        public String map(String s) throws Exception {
            // 将数据添加到状态存储器中,split[0]为用户ID
            listState.add(s);
            // 获取状态存储器中的数据
            Iterable<String> iter = listState.get();
            StringBuffer buffer = new StringBuffer();
            for (String str : iter) {
                buffer.append(str);
            }
            // 将数据添加到状态存储中
            return buffer.toString();
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
        }
    
        @Override
        public void initializeState(FunctionInitializationContext ctx) throws Exception {
            OperatorStateStore operatorStateStore = ctx.getOperatorStateStore();
            // 配置State TTL
            StateTtlConfig ttlConfig = new StateTtlConfig.Builder(Time.seconds(10)) // 设置数据存活时长,当该数据在State中存活时间超过10s时删除该数据
                    // 这个方法也是设置数据存活时长,和StateTtlConfig.Builder(Time.seconds(10))的作用一样,可以不用这个方法,如果用了会覆盖上面设置的时长
                    .setTtl(Time.seconds(10))
                    /**
                     * updateTtlOnCreateAndWrite和updateTtlOnReadAndWrite二选一即可, 这两个方法的主要作用就是配合setTtl方法将冷热数据进行分离
                     **/
                    // 当该条数据在State中插入或者更新的时候,刷新计时
                    .updateTtlOnCreateAndWrite()
                    // 读或写都刷新该数据的TTL计时
                    .updateTtlOnReadAndWrite()
                    /**
                     * setStateVisibility就是设置状态的可见性,前面setTtl方法是设置删除过期数据,删除过期数据实际上是由另一个异步线程周期性(定时器)的完成,也就是说超过10s的数据不一定会马上被删除,但是
                     * 获取数据的时候底层会将超过存活时间的数据进行判断过滤,setStateVisibility就是可以设置是否可以查询到这些过期的数据,NeverReturnExpired和ReturnExpiredIfNotCleanedUp二选一.
                     **/
                    // 不返回过期数据,这个也是默认策略
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    // 返回还没有被清除的过期数据
                    .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
                    // 指定TTL计时时间语义(默认处理时间)
                    .setTtlTimeCharacteristic(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime)
                    // 禁用后台清理过期数据
                    .disableCleanupInBackground()
                    /**
                     * 针对本地状态后端,即HashMapStateBackend
                     * 增量清理, 每当获取状态数据时,迭代器都会向前推进。对遍历的状态数据进行检查,并清理过期的数据
                     * 参数1: 设置每次清理的key的数据量(copyOnWriteStateMap中的key的条目数量)
                     * 参数2: 设置是否清理所有条目也就是key对应的数据,如果设置为true则参数1失效,在状态数据较多时不建议设置为true,会严重影响时效性
                    **/
                    .cleanupIncrementally(10, false)
                    /**
                     * 针对快照数据,即checkpoint快照
                     * 全量清理, 在做快照时将所有的过期数据进行清理保证快照中没有过期数据,但是不会清状态后端中的过期数据
                    **/
                    .cleanupFullSnapshot()
                    /**
                     * 针对于RocksdbStateBackend
                     * 只生效于RocksDB状态后端,通过Flink将CompactFilter传给RocksDB,在RocksDB在Compact过程中根据过滤条件将过期数据删除,传入的参数为过期时间(也就是发生Compact时的过滤条件)
                    **/
                    .cleanupInRocksdbCompactFilter(10000)
                    .build();
            // 配置状态描述,在ListStateDescriptor构造器中声明数据类型,简单类型可以使用xxx.class,符合类型需要使用到TypeInformation.of()
            ListStateDescriptor descriptor = new ListStateDescriptor("MapState", String.class);
            // 状态描述器加载TTL配置
            descriptor.enableTimeToLive(ttlConfig);
            listState = operatorStateStore.getListState(descriptor);
        }
    }
    

    代码中是以Operator State为例,如果是Keyed State则在open方法中配置TTL.

1.3 TTL机制详解

在代码模板中有API的使用方式,但是TTL机制不同的方法之间存在互斥或者互不影响的关系.

1.3.1 过期时间设置策略
过期时间设置有两种方式:
  • new StateTtlConfig.Builder(Time.seconds(10))
  • setTtl(Time.seconds(10))

这两种方式都是设置过期时间使用的,但是只需要选用其中一种即可,如果在创建StateTtlConfig对象时就设置了过期时间,又在setTtl方法中设置了过期时间,则会对过期时间进行覆盖,本质上二者都是对同一个变量进行赋值.

  • 源码

    new StateTtlConfig.Builder(Time.seconds(10))

    public static class Builder {
        private UpdateType updateType = OnCreateAndWrite;
        private StateVisibility stateVisibility = NeverReturnExpired;
        private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;
        private Time ttl;
        private boolean isCleanupInBackground = true;
        
        // ...
      
        // 调用Builder时对ttl变量进行了赋值
        public Builder(@Nonnull Time ttl) {
            this.ttl = ttl;
        }
        // ...
    }
    

    setTtl(Time.seconds(10))

    public static class Builder {
        private UpdateType updateType = OnCreateAndWrite;
        private StateVisibility stateVisibility = NeverReturnExpired;
        private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;
        private Time ttl;
        private boolean isCleanupInBackground = true;
    
        // ...
    
        // 这里同样是对ttl进行了赋值
        @Nonnull
        public Builder setTtl(@Nonnull Time ttl) {
            this.ttl = ttl;
            return this;
        }
        // ...
    }
    

    通过源码可以看出,使用此API时在创建StateTtlConfig对象时给一个过期时间即可,不需要再调用setTtl方法

1.3.2 过期时间刷新策略
过期时间刷新策略有两种:
  • updateTtlOnCreateAndWrite()
  • updateTtlOnReadAndWrite()

这两方法就是互斥的,只能生效一个,同样是因为二者都是对同一个变量进行赋值,就是说在二者同时调用的情况下,谁在后面调用谁就生效,如代码模板中线调用的updateTtlOnCreateAndWrite()后调用的updateTtlOnReadAndWrite()那么生效的就是updateTtlOnReadAndWrite()策略.

  • 源码

    public static class Builder {
        private UpdateType updateType = OnCreateAndWrite;
        private StateVisibility stateVisibility = NeverReturnExpired;
        private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;
        private Time ttl;
        private boolean isCleanupInBackground = true;
    
        // ...
    
        // 此方法给updateType进行赋值
        @Nonnull
        public Builder setUpdateType(UpdateType updateType) {
            this.updateType = updateType;
            return this;
        }
    
        /*
         * 二者方法体中调用的都是同一个方法setUpdateType
         */
        @Nonnull
        public Builder updateTtlOnCreateAndWrite() {
            return setUpdateType(UpdateType.OnCreateAndWrite);
        }
    
        @Nonnull
        public Builder updateTtlOnReadAndWrite() {
            return setUpdateType(UpdateType.OnReadAndWrite);
        }
        // ...
    }
    

    源码可以看出二者调用同一个方法setUpdateType,而setUpdateType方法又是给updateType赋值的一个方法,所以再使用时要根据实际的业务场景选择updateTtlOnCreateAndWrite()updateTtlOnReadAndWrite()中的一个.

1.3.3 返回过期数据策略
回返过期数据策略有两种:
  • setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  • setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
  • 源码

    public static class Builder {
        private UpdateType updateType = OnCreateAndWrite;
        private StateVisibility stateVisibility = NeverReturnExpired;
        private TtlTimeCharacteristic ttlTimeCharacteristic = ProcessingTime;
        private Time ttl;
        private boolean isCleanupInBackground = true;
    
        // ...
    
        // 此方法给stateVisibility进行赋值
        @Nonnull
        public Builder setStateVisibility(@Nonnull StateVisibility stateVisibility) {
            this.stateVisibility = stateVisibility;
            return this;
        }
    
        // 下面两个方法体中都是调用setStateVisibility方法
        @Nonnull
        public Builder returnExpiredIfNotCleanedUp() {
            return setStateVisibility(StateVisibility.ReturnExpiredIfNotCleanedUp);
        }
    
        @Nonnull
        public Builder neverReturnExpired() {
            return setStateVisibility(StateVisibility.NeverReturnExpired);
        }
        // ...
    }
    

    这二者同样是互斥的原则,使用选其一即可,即使都调用也是后被调用者生效.

1.3.4 过期数据清除策略
过期数据清除策略有三种:
  • cleanupIncrementally(10, false)
  • cleanupFullSnapshot()
  • cleanupInRocksdbCompactFilter(10000)

这三种过期数据清除策略针对的是不同的场景(本地状态后端、快照、RocksDB状态后端),所以三者是可以同时使用的,不会存在同时调用后者会对前者进行覆盖的问题,在API简介章节介绍了这种三策略的作用,这里着重介绍cleanupIncrementally策略.

HashMapStateBackend使用的存储结构是Flink团队自己开发的一种数据存储结构copyOnWriteStateMap,说这个存储结构是因为cleanupIncrementally策略删除过期数据的操作和这种结构息息相关.

关于copyOnWriteStateMap的结构可以简单的理解为K,V形式存储的结构,其中的Key就是使用keyBy时指定的key,如果没有使用keyBy那么所有数据key都会给一个相同的默认值,其中的Value是指ListStateMapState等,也就是在构建状态存储器时候选择存储形式,如下图:
ttl02

在本地状态后端(HashMapStateBackend)中默认使用的就是cleanupIncrementally清除策略,默认值为cleanupIncrementally(5, false),也就是说只要设置了TTL的过期时间,HashMapStateBackend就会使用cleanupIncrementally策略来清理过期数据,只不过cleanupIncrementally对用户提供了选择方式,这里将结合图解说明cleanupIncrementally如何清除过期数据的.
ttl02

  1. 只要访问状态数据就会触发cleanupIncrementally执行.
  2. 如果用户没有设置cleanupIncrementally,TTL会根据cleanupIncrementally(5, false)来删除过期数据,如果用户指定了参数则按照用户定义的参数删除数据.
  3. 比如现在是cleanupIncrementally(10, false),迭代器会从k1开始,到k10结束,将这10个条目的key中的ListState中的过期数据进行清理.
  4. CopyOnWriteStateMap中的数据存放是无序的,而且Flink在创建CopyOnWriteStateMap时候给的默认大小是128,也就说处理数据中key的数量超过128,否则就算只有一个key,CopyOnWriteStateMap的大小也是128,迭代器最少也要迭代128次.
  5. 当设置cleanupIncrementally(10, false)时,假如数据中只有一个key,那么这个k -> ListState(...)CopyOnWriteStateMap中的存放位置是任意的,假设在CopyOnWriteStateMap中存放的位置是22,就会出现当第一次访问状态数据时,并不会删除这个key对应的ListState中的数据,访问状态数据时同样还是不会删除过期数据,只有第三次访问时,才会删除过期数据,因为cleanupSize设置的大小为10,迭代器每次只会迭代10个条目的key,每当访问状态数据时,迭代器都会从最后一次迭代的指针位置开始继续推进.
  6. 当迭代器的指针推进位置到128时,又会从0的位置从新开始推进(这里是指CopyOnWriteStateMap的大小是128),以此类推.
  7. 如果cleanupIncrementally(10, true)中的runCleanupForEveryRecordtrue时,那就是说每次访问状态数据迭代器都会把CopyOnWriteStateMap中的所有条目都清理一遍,所以说为true时第一个参数(cleanupSize)会失效.

cleanupIncrementally的执行机制就很好的解释了,为什么在使用本地状态后端(HashMapStateBackend)时经常会出现明明已经来了7,8条数据,数据过期数据还没有清理到,或者距离上一次访问状态数据过了1h甚至更久都没有清理过期数据的情况.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1243286.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何使用Fiddler进行弱网测试

测试APP、web经常需要用到弱网测试&#xff0c;也就是在信号差、网络慢的情况下进行测试。我们自己平常在使用手机APP时&#xff0c;在地铁、电梯、车库等场景经常会遇到会话中断、超时等情况&#xff0c;这种就属于弱网。 普通的弱网测试可以选择第三方工具对带宽、丢包、延时…

量子计算概述

目录 1.量子计算介绍 2.量子计算应用 3.量子计算研究机构 1.量子计算介绍 量子计算是一种遵循量子力学规律调控量子信息单元进行计算的新型计算模式。经典计算使用2进制进行运算&#xff0c;但2进制只有0和1两种状态&#xff0c;而量子计算除了包含0和1两种状…

经典滑动窗口试题(一)

&#x1f4d8;北尘_&#xff1a;个人主页 &#x1f30e;个人专栏:《Linux操作系统》《经典算法试题 》《C》 《数据结构与算法》 ☀️走在路上&#xff0c;不忘来时的初心 文章目录 一、将x减到0的最小操作数1、题目讲解2、讲解算法原理3、代码实现 二、无重复的最长子串1、题…

Python入门02 算术运算符及优先级

目录 1 REPL2 启动3 算术运算符4 算术运算符的优先级5 清除屏幕总结 上一节我们安装了Python的开发环境&#xff0c;本节我们介绍一下REPL的概念 1 REPL 首先解释一下python执行代码的一个交互环境的定义&#xff1a; Python REPL&#xff08;Read-Eval-Print Loop&#xff0c…

Android组件化搭建学习

什么是组件化&#xff1f; 为什么要用组件化&#xff1f;在项目的开发过程中&#xff0c;随着开发人员的增多及功能的增加&#xff0c;如果提前没有使用合理的开发架构&#xff0c;那么代码会越来臃肿&#xff0c;功能间代码耦合也会越来越严重&#xff0c;这时候为了保证项目…

Linux加强篇001-部署Linux系统

目录 一、前言 1.1准备工具 1.2安装配置VM虚拟机 1.3安装软件 1.4系统初始化进程 1.5重置root密码 二、巩固练习 1&#xff0e;为什么建议读者在下载系统文件后先进行校验而不是直接安装呢&#xff1f; 2&#xff0e;使用虚拟机安装Linux系统时&#xff0c;为什么要先…

【攻防世界-misc】can_has_stdio?

1.用记事本打开文件是这样子的&#xff0c; 这是一段BF&#xff08;Brainfuck&#xff09;编程语言代码&#xff0c;属于一种极简化的编程语言&#xff0c;用于演示计算机程序设计概念。这段代码包含了一些操作符&#xff0c;如">"表示指针向右移动&#xff0c;&q…

APP测试要点有哪些?本文已经给你梳理好了!

我们日常购物、旅游、支付等活动都离不开手机&#xff0c;由此衍生了很多APP。 比如每天使用频率非常高的微信、支付宝、微博、抖音、王者荣耀等等。 APP测试主要进行功能测试、性能测试、自动化测试、安全性测试、兼容性测试、专项测试。 01 APP测试流程 APP测试流程与web…

GEE:梯度提升树(Gradient Boosting Tree)分类教程(样本制作、特征添加、训练、精度、参数优化、贡献度、统计面积)

作者:CSDN @ _养乐多_ 本文将介绍在Google Earth Engine (GEE)平台上进行梯度提升树(Gradient Boosting Tree)分类的方法和代码,其中包括制作样本点教程(本地、在线和本地在线混合制作样本点,合并样本点等),加入特征变量(各种指数、纹理特征、时间序列特征、物候特征…

【数据结构/C++】栈和队列_顺序栈

#include<iostream> using namespace std; #define MaxSize 10 // 1. 顺序栈 typedef int ElemType; struct Stack {ElemType data[MaxSize];int top; } SqStack; // 初始化栈 void init(Stack &s) {// 初始化栈顶指针s.top -1; } // 入栈 bool push(Stack &s, …

LabVIEW中如何达到NI SMU最大采样率

LabVIEW中如何达到NI SMU最大采样率 NISMU的数字化仪功能对于捕获SMU详细的瞬态响应特性或表征待测设备&#xff08;DUT&#xff09;响应&#xff08;例如线性调整率和负载调整率&#xff09;至关重要。没有此功能&#xff0c;将需要一个外部示波器。 例如&#xff0c;假设在…

【Java】定时器的简单应用

在写代码的过程中&#xff0c;如果我们遇到了隔一段时间就要进行一项任务时&#xff0c;采用定时器会提高我们的效率。下面对定时器的使用进行简单说明 1、应用说明 首先我们要创建一个Timer类 Timer timer new Timer(); 然后在timer中调用schedule()方法添加任务 timer.…

Excel中出现“#NAME?”怎么办?(文本原因)

excel 单元格出现 #NAME? 错误的原因有二&#xff1a; 函数公式输入不对导致 #NAME? 错误。 在单元格中字符串的前面加了号&#xff0c;如下图中的--GoJG7sEe6RqgTnlUcitA&#xff0c;本身我们想要的是--GoJG7sEe6RqgTnlUcitA&#xff0c;但因为某些不当的操作在前面加了号&…

完美解决RuntimeError: implement_array_function method already has a docstring

文章目录 一、报错原因--numpy版本太低二、更新numpy总结 一、报错原因–numpy版本太低 当收到 "RuntimeError: implement_array_function method already has a docstring" 错误时&#xff0c;这可能是由于在numpy的某个版本中&#xff0c;该方法的文档字符串&…

如何用网格交易做ETF套利

ETF套利是指利用ETF基金的交易机制&#xff0c;通过短期的买卖差价或组合投资来获取利润。 具体来说&#xff0c;ETF套利最常用的套利方法则是&#xff1a;价格套利和波动套利。 1. 价格套利&#xff1a;当ETF二级市场的价格与一级市场的净值出现偏差时&#xff0c;投资者可以通…

嵌入式单片机方向和Linux驱动开发方向哪个发展前景好?

嵌入式单片机方向和Linux驱动开发方向哪个发展前景好&#xff1f; 在某些平台上看到很多人鼓吹嵌入式Linux开发比单片机开发要好&#xff0c;让所有人都去做嵌入式Linux开发。说这种话的人大多数是嵌入式Linux的培训机构&#xff0c;或者是一开始就以嵌入式Linux入门的那一批人…

外地人可以在上海当老师吗

随着社会的发展&#xff0c;越来越多的人涌入大城市&#xff0c;其中也包括上海。在这个繁华的城市里&#xff0c;许多人都梦想成为一名老师&#xff0c;但是外地人可以在上海当老师吗&#xff1f; 首先需要了解上海的教育政策。根据相关规定&#xff0c;外地人可以在上海当老师…

【python FastAPI】fastapi中如何限制输入参数,如何让docs更好看,如何自定义错误码json返回

原则&#xff1a; 输入输出都基于BaseModel依靠JSONResponse制定返回错误的json信息依靠装饰器中app.post制定responses字典从而让docs文档更丰富 import uvicorn from pydantic import BaseModel, Field from fastapi import FastAPI, HTTPException from fastapi.middleware…

Halcon学习笔记

目录 一.简介 一.简介 Halcon和OpenCV在工业应用中的区别&#xff1a; OpenCV的精度没Halcon高&#xff1b;OpenCV没有模板匹配&#xff0c;Halcon有&#xff0c;而且Halcon匹配的精度更高。

浅析基于智能音视频技术的城市重要场馆智能监控系统设计

了解旭帆科技的朋友都知道&#xff0c;旭帆科技一直都乐于和大家分享各类场景的视频解决方案&#xff0c;今天小编就基于智能音视频技术的城市重要场馆智能监控系统设计和大家探讨一下。 基于智能音视频技术的城市重要场馆智能监控系统设计&#xff0c;主要包含以下要素&#x…