【Flink】状态管理

news2024/9/22 5:08:08

目录

1、状态概述

1.1 无状态算子

1.2 有状态算子

2、状态分类

​编辑 2.1 算子状态

2.1.1 列表状态(ListState)

2.1.2 联合列表状态(UnionListState)

2.1.3 广播状态(BroadcastState)

2.2 按键分区状态 

2.2.1 值状态(ValueState)

2.2.2 列表状态(ListState)

2.2.3 Map状态(MapState)

2.2.4 归约状态(ReducingState)

2.2.5 聚合状态(AggregatingState)

2.2.6 状态生存时间(TTL)

3、状态后端(State Backends)

3.1 状态后端的分类(HashMapStateBackend/RocksDB)

3.1.1 哈希表状态后端(HashMapStateBackend)

3.1.2 内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)

3.2 如何选择正确的状态后端

3.3 状态后端的配置


1、状态概述

1.1 无状态算子

根据当前的输入可以直接转换得到输出结果,这种鼻子就是无状态算子,如map,flatMap,filter

1.2 有状态算子

除当前处理之外,还需要其他处理才能得到计算结果。如聚合算子,窗口算子等

2、状态分类

        Flink的状态有两种:托管状态(Managed State)原始状态(Raw State)托管状态就是由Flink统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由Flink实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。

        通常我们采用Flink托管状态来实现需求。

 

2.1 算子状态

          一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的slot在计算资源上是物理隔离的,所以Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。

算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。         

算子状态的实际应用场景不如Keyed State多,一般用在Source或Sink等与外部系统连接的算子上,或者完全没有key定义的场景。比如Flink的Kafka连接器中,就用到了算子状态。

算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState。

2.1.1 列表状态(ListState)

与Keyed State中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。

案例实操:在map算子中计算数据的个数。

public class OperatorListStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        env
                .socketTextStream("hadoop102", 7777)
                .map(new MyCountMapFunction())
                .print();


        env.execute();
    }


    // TODO 1.实现 CheckpointedFunction 接口
    public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {

        private Long count = 0L;
        private ListState<Long> state;


        @Override
        public Long map(String value) throws Exception {
            return ++count;
        }

        /**
         * TODO 2.本地变量持久化:将 本地变量 拷贝到 算子状态中,开启checkpoint时才会调用
         *
         * @param context
         * @throws Exception
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("snapshotState...");
            // 2.1 清空算子状态
            state.clear();
            // 2.2 将 本地变量 添加到 算子状态 中
            state.add(count);
        }

        /**
         * TODO 3.初始化本地变量:程序启动和恢复时, 从状态中 把数据添加到 本地变量,每个子任务调用一次
         *
         * @param context
         * @throws Exception
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("initializeState...");
            // 3.1 从 上下文 初始化 算子状态
            state = context
                    .getOperatorStateStore()
                    .getListState(new ListStateDescriptor<Long>("state", Types.LONG));

            // 3.2 从 算子状态中 把数据 拷贝到 本地变量
            if (context.isRestored()) {
                for (Long c : state.get()) {
                    count += c;
                }
            }
        }
    }
}

2.1.2 联合列表状态(UnionListState)

与ListState类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。

UnionListState的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。

如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。

使用方式同ListState,区别在:getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));

state = context
              .getOperatorStateStore()
              .getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));

2.1.3 广播状态(BroadcastState)

有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。

案例实操:水位超过指定的阈值发送告警,阈值可以动态修改

public class OperatorBroadcastStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);


        // 数据流
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        // 配置流(用来广播配置)
        DataStreamSource<String> configDS = env.socketTextStream("hadoop102", 8888);

        // TODO 1. 将 配置流 广播
        MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);
        BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);

        // TODO 2.把 数据流 和 广播后的配置流 connect
        BroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS);

        // TODO 3.调用 process
        sensorBCS
                .process(
                        new BroadcastProcessFunction<WaterSensor, String, String>() {
                            /**
                             * 数据流的处理方法: 数据流 只能 读取 广播状态,不能修改
                             * @param value
                             * @param ctx
                             * @param out
                             * @throws Exception
                             */
                            @Override
                            public void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                                // TODO 5.通过上下文获取广播状态,取出里面的值(只读,不能修改)
                                ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
                                Integer threshold = broadcastState.get("threshold");
                                // 判断广播状态里是否有数据,因为刚启动时,可能是数据流的第一条数据先来
                                threshold = (threshold == null ? 0 : threshold);
                                if (value.getVc() > threshold) {
                                    out.collect(value + ",水位超过指定的阈值:" + threshold + "!!!");
                                }

                            }

                            /**
                             * 广播后的配置流的处理方法:  只有广播流才能修改 广播状态
                             * @param value
                             * @param ctx
                             * @param out
                             * @throws Exception
                             */
                            @Override
                            public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                                // TODO 4. 通过上下文获取广播状态,往里面写数据
                                BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
                                broadcastState.put("threshold", Integer.valueOf(value));

                            }
                        }

                )
                .print();

        env.execute();
    }
}

2.2 按键分区状态 

而很多有状态的操作(比如聚合、窗口)都是要先做keyBy进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前key有效,所以状态也应该按照key彼此隔离。

它的特点非常鲜明,就是以key为作用范围进行隔离。

需要注意,使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问Keyed State。

2.2.1 值状态(ValueState)

public interface ValueState<T> extends State {
    T value() throws IOException;
    void update(T value) throws IOException;
}
  • T value():获取当前状态的值;
  • update(T value):对状态进行更新,传入的参数value就是要覆写的状态值。

        在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState的状态描述器构造方法如下:

public ValueStateDescriptor(String name, Class<T> typeClass) {
    super(name, typeClass, null);
}

案例需求:检测每种传感器的水位值,如果连续的两个水位值超过10,就输出报警。

public class KeyedValueStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
                );

        sensorDS.keyBy(r -> r.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {

                            // TODO 1.定义状态
                            ValueState<Integer> lastVcState;


                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                // TODO 2.在open方法中,初始化状态
                                // 状态描述器两个参数:第一个参数,起个名字,不重复;第二个参数,存储的类型
                                lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));
                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
//                                lastVcState.value();  // 取出 本组 值状态 的数据
//                                lastVcState.update(); // 更新 本组 值状态 的数据
//                                lastVcState.clear();  // 清除 本组 值状态 的数据


                                // 1. 取出上一条数据的水位值(Integer默认值是null,判断)
                                int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();
                                // 2. 求差值的绝对值,判断是否超过10
                                Integer vc = value.getVc();
                                if (Math.abs(vc - lastVc) > 10) {
                                    out.collect("传感器=" + value.getId() + "==>当前水位值=" + vc + ",与上一条水位值=" + lastVc + ",相差超过10!!!!");
                                }
                                // 3. 更新状态里的水位值
                                lastVcState.update(vc);
                            }
                        }
                )
                .print();

        env.execute();
    }

2.2.2 列表状态(ListState)

将需要保存的数据,以列表(List)的形式组织起来。在ListState<T>接口中同样有一个类型参数T,表示列表中数据的类型。ListState也提供了一系列的方法来操作状态,使用方式与一般的List非常相似。

  • Iterable<T> get():获取当前的列表状态,返回的是一个可迭代类型Iterable<T>;
  • update(List<T> values):传入一个列表values,直接对状态进行覆盖;
  • add(T value):在状态列表中添加一个元素value;
  • addAll(List<T> values):向列表中添加多个元素,以列表values形式传入。

类似地,ListState的状态描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。

案例:针对每种传感器输出最高的3个水位值

public class KeyedListStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
                );

        sensorDS.keyBy(r -> r.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {

                            ListState<Integer> vcListState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT));
                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                // 1.来一条,存到list状态里
                                vcListState.add(value.getVc());

                                // 2.从list状态拿出来(Iterable), 拷贝到一个List中,排序, 只留3个最大的
                                Iterable<Integer> vcListIt = vcListState.get();
                                // 2.1 拷贝到List中
                                List<Integer> vcList = new ArrayList<>();
                                for (Integer vc : vcListIt) {
                                    vcList.add(vc);
                                }
                                // 2.2 对List进行降序排序
                                vcList.sort((o1, o2) -> o2 - o1);
                                // 2.3 只保留最大的3个(list中的个数一定是连续变大,一超过3就立即清理即可)
                                if (vcList.size() > 3) {
                                    // 将最后一个元素清除(第4个)
                                    vcList.remove(3);
                                }

                                out.collect("传感器id为" + value.getId() + ",最大的3个水位值=" + vcList.toString());

                                // 3.更新list状态
                                vcListState.update(vcList);


//                                vcListState.get();            //取出 list状态 本组的数据,是一个Iterable
//                                vcListState.add();            // 向 list状态 本组 添加一个元素
//                                vcListState.addAll();         // 向 list状态 本组 添加多个元素
//                                vcListState.update();         // 更新 list状态 本组数据(覆盖)
//                                vcListState.clear();          // 清空List状态 本组数据
                            }
                        }
                )
                .print();

        env.execute();
    }
}

2.2.3 Map状态(MapState)

把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组key-value映射的列表。

MapState提供了操作映射状态的方法,与Map的使用非常类似。

  • UV get(UK key):传入一个key作为参数,查询对应的value值;
  • put(UK key, UV value):传入一个键值对,更新key对应的value值;
  • putAll(Map<UK, UV> map):将传入的映射map中所有的键值对,全部添加到映射状态中;
  • remove(UK key):将指定key对应的键值对删除;
  • boolean contains(UK key):判断是否存在指定的key,返回一个boolean值。

另外,MapState也提供了获取整个映射相关信息的方法;

  • Iterable<Map.Entry<UK, UV>> entries():获取映射状态中所有的键值对;
  • Iterable<UK> keys():获取映射状态中所有的键(key),返回一个可迭代Iterable类型;
  • Iterable<UV> values():获取映射状态中所有的值(value),返回一个可迭代Iterable类型;
  • boolean isEmpty():判断映射是否为空,返回一个boolean值。

案例需求:统计每种传感器每种水位值出现的次数。

public class KeyedMapStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
                );

        sensorDS.keyBy(r -> r.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {

                            MapState<Integer, Integer> vcCountMapState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Types.INT, Types.INT));
                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                // 1.判断是否存在vc对应的key
                                Integer vc = value.getVc();
                                if (vcCountMapState.contains(vc)) {
                                    // 1.1 如果包含这个vc的key,直接对value+1
                                    Integer count = vcCountMapState.get(vc);
                                    vcCountMapState.put(vc, ++count);
                                } else {
                                    // 1.2 如果不包含这个vc的key,初始化put进去
                                    vcCountMapState.put(vc, 1);
                                }

                                // 2.遍历Map状态,输出每个k-v的值
                                StringBuilder outStr = new StringBuilder();
                                outStr.append("======================================\n");
                                outStr.append("传感器id为" + value.getId() + "\n");
                                for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {
                                    outStr.append(vcCount.toString() + "\n");
                                }
                                outStr.append("======================================\n");

                                out.collect(outStr.toString());


//                                vcCountMapState.get();          // 对本组的Map状态,根据key,获取value
//                                vcCountMapState.contains();     // 对本组的Map状态,判断key是否存在
//                                vcCountMapState.put(, );        // 对本组的Map状态,添加一个 键值对
//                                vcCountMapState.putAll();  // 对本组的Map状态,添加多个 键值对
//                                vcCountMapState.entries();      // 对本组的Map状态,获取所有键值对
//                                vcCountMapState.keys();         // 对本组的Map状态,获取所有键
//                                vcCountMapState.values();       // 对本组的Map状态,获取所有值
//                                vcCountMapState.remove();   // 对本组的Map状态,根据指定key,移除键值对
//                                vcCountMapState.isEmpty();      // 对本组的Map状态,判断是否为空
//                                vcCountMapState.iterator();     // 对本组的Map状态,获取迭代器
//                                vcCountMapState.clear();        // 对本组的Map状态,清空

                            }
                        }
                )
                .print();

        env.execute();
    }
}

2.2.4 归约状态(ReducingState)

归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍reduce聚合算子时讲到的ReduceFunction,所以状态类型跟输入的数据类型是一样的。

public ReducingStateDescriptor(
    String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}

案例:计算每种传感器的水位和

.process(new KeyedProcessFunction<String, WaterSensor, Integer>() {
    private ReducingState<Integer> sumVcState;
    @Override
    public void open(Configuration parameters) throws Exception {
        sumVcState = this
          .getRuntimeContext()
          .getReducingState(new ReducingStateDescriptor<Integer>("sumVcState",Integer::sum,Integer.class));
    }

    @Override
    public void processElement(WaterSensor value, Context ctx, Collector<Integer> out) throws Exception {
        sumVcState.add(value.getVc());
        out.collect(sumVcState.get());
    }
})

2.2.5 聚合状态(AggregatingState)

与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与ReducingState不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。

案例需求:计算每种传感器的平均水位

public class KeyedAggregatingStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
                );

        sensorDS.keyBy(r -> r.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {

                            AggregatingState<Integer, Double> vcAvgAggregatingState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcAvgAggregatingState = getRuntimeContext()
                                        .getAggregatingState(
                                                new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>(
                                                        "vcAvgAggregatingState",
                                                        new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
                                                            @Override
                                                            public Tuple2<Integer, Integer> createAccumulator() {
                                                                return Tuple2.of(0, 0);
                                                            }

                                                            @Override
                                                            public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
                                                                return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
                                                            }

                                                            @Override
                                                            public Double getResult(Tuple2<Integer, Integer> accumulator) {
                                                                return accumulator.f0 * 1D / accumulator.f1;
                                                            }

                                                            @Override
                                                            public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
//                                                                return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
                                                                return null;
                                                            }
                                                        },
                                                        Types.TUPLE(Types.INT, Types.INT))
                                        );
                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                // 将 水位值 添加到  聚合状态中
                                vcAvgAggregatingState.add(value.getVc());
                                // 从 聚合状态中 获取结果
                                Double vcAvg = vcAvgAggregatingState.get();

                                out.collect("传感器id为" + value.getId() + ",平均水位值=" + vcAvg);

//                                vcAvgAggregatingState.get();    // 对 本组的聚合状态 获取结果
//                                vcAvgAggregatingState.add();    // 对 本组的聚合状态 添加数据,会自动进行聚合
//                                vcAvgAggregatingState.clear();  // 对 本组的聚合状态 清空数据
                            }
                        }
                )
                .print();

        env.execute();
    }
}

2.2.6 状态生存时间(TTL)

在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。

配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的.enableTimeToLive()方法启动TTL功能。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(10))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();

ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);

stateDescriptor.enableTimeToLive(ttlConfig);

这里用到了几个配置项:

  • .newBuilder()

状态TTL配置的构造器方法,必须调用,返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig了。方法需要传入一个Time作为参数,这就是设定的状态生存时间

  • .setUpdateType()

设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为OnCreateAndWrite。

  • .setStateVisibility()

设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能继续存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的NeverReturnExpired是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。

public class StateTTLDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
                );

        sensorDS.keyBy(r -> r.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {

                            ValueState<Integer> lastVcState;


                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);

                                // TODO 1.创建 StateTtlConfig
                                StateTtlConfig stateTtlConfig = StateTtlConfig
                                        .newBuilder(Time.seconds(5)) // 过期时间5s
//                                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入(更新) 更新 过期时间
                                        .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入(更新) 更新 过期时间
                                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值
                                        .build();

                                // TODO 2.状态描述器 启用 TTL
                                ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);
                                stateDescriptor.enableTimeToLive(stateTtlConfig);


                                this.lastVcState = getRuntimeContext().getState(stateDescriptor);

                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                // 先获取状态值,打印 ==》 读取状态
                                Integer lastVc = lastVcState.value();
                                out.collect("key=" + value.getId() + ",状态值=" + lastVc);

                                // 如果水位大于10,更新状态值 ===》 写入状态
                                if (value.getVc() > 10) {
                                    lastVcState.update(value.getVc());
                                }
                            }
                        }
                )
                .print();

        env.execute();
    }
}

3、状态后端(State Backends)

在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责管理本地状态的存储方式和位置

3.1 状态后端的分类(HashMapStateBackend/RocksDB

Flink中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌RocksDB状态后端”(EmbeddedRocksDBStateBackend)。

系统默认的状态后端是HashMapStateBackend。

3.1.1 哈希表状态后端(HashMapStateBackend)

HashMapStateBackend是把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在Taskmanager的JVM堆上

普通的状态,以及窗口中收集的数据和触发器,都会以键值对的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。

3.1.2 内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)

RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘

配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里

3.2 如何选择正确的状态后端

HashMap和RocksDB两种状态后端最大的区别,就在于本地状态存放在哪里。

HashMapStateBackend是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。

而RocksDB是硬盘存储,所以可以根据可用的磁盘空间进行扩展,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级。

3.3 状态后端的配置

3.3.1 配置默认的状态后端

#flink-conf.yaml

# 默认状态后端
state.backend: hashmap

# 存放检查点的文件路径
# 这里的state.checkpoints.dir配置项,定义了检查点和元数据写入的目录。
state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints

3.3.2 为每个作业(Per-job/Application)单独配置状态后端

通过执行环境设置,HashMapStateBackend

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStateBackend(new HashMapStateBackend());

通过执行环境设置,EmbeddedRocksDBStateBackend。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStateBackend(new EmbeddedRocksDBStateBackend());

需要注意,如果想在IDE中使用EmbeddedRocksDBStateBackend,需要为Flink项目添加依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb</artifactId>
    <version>${flink.version}</version>
</dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1244946.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

案例012:Java+SSM+uniapp基于微信小程序的科创微应用平台设计与实现

文末获取源码 开发语言&#xff1a;Java 框架&#xff1a;SSM JDK版本&#xff1a;JDK1.8 数据库&#xff1a;mysql 5.7 开发软件&#xff1a;eclipse/myeclipse/idea Maven包&#xff1a;Maven3.5.4 小程序框架&#xff1a;uniapp 小程序开发软件&#xff1a;HBuilder X 小程序…

ElementUI table+dialog实现一个简单的可编辑的表格

table组件如何实现可编辑呢&#xff1f; 我的需求是把table组件那样的表格&#xff0c;实现它点击可以弹出一个框&#xff0c;然后在这个框里面输入你的东西&#xff0c;然后将他回显回去&#xff0c;当然&#xff0c;输入的有可能是时间啥的。 为什么要弹出弹层不在框上直接…

文件名称管理文件:抓关键字归类文件,让文件管理变得简单明了

在当今数字时代&#xff0c;每天都要处理大量的文件&#xff0c;无论是文本、图片、视频还是其他类型的文件。如何有效地管理这些文件&#xff0c;能够迅速找到所需的信息&#xff0c;已经成为了一个重要的问题。文件名称是文件内容的第一反映&#xff0c;也是识别和检索文件的…

在arm 64 环境下使用halcon算法

背景&#xff1a; halcon&#xff0c;机器视觉领域神一样得存在&#xff0c;在windows上&#xff0c;应用得特别多&#xff0c; 但是arm环境下使用得很少。那如何在arm下使用halcon呢。按照官方说明&#xff0c;arm下只提供了运行时环境&#xff0c;并且需要使用价值一万多人民…

element中el-switch的v-model自定义值

一、问题 element中的el-switch的值默认都是true或false&#xff0c;但是有些时候后端接口该字段可能是0或者1&#xff0c;如果说再转换一次值&#xff0c;那就有点太费力了。如下所示&#xff1a; <template><el-switchinactive-text"否"active-text&quo…

【miniQMT实盘量化5】获取财务报表数据

前言 上面文章&#xff0c;我们介绍了如何获取实时数据&#xff0c;这篇文章&#xff0c;我们继续往下探讨&#xff0c;介绍关于财务报表数据的获取。 财务报表数据 财务报表数据&#xff0c;也就是常说的基本面数据&#xff0c;是除了行情数据之外&#xff0c;辅助我们投资…

PMP 考试的含金量怎么样?

这里可以三个思考题和三个价值点帮你认识PMP考试。 三个思维题 1.工作环境 PMP证书含金量的一个很大因素&#xff0c;就是考证的人是否对PMP证书有比较强的实际需求。相反&#xff0c;如果只是听别人说&#xff0c;PMP证书很好&#xff0c;不管工作中是否有需要&#xff0c;…

〖大前端 - 基础入门三大核心之JS篇㊷〗- DOM事件对象及它的属性

说明&#xff1a;该文属于 大前端全栈架构白宝书专栏&#xff0c;目前阶段免费&#xff0c;如需要项目实战或者是体系化资源&#xff0c;文末名片加V&#xff01;作者&#xff1a;不渴望力量的哈士奇(哈哥)&#xff0c;十余年工作经验, 从事过全栈研发、产品经理等工作&#xf…

黑马点评12-实现好友关注/取关功能,查看好友共同关注列表

好友关注 数据模型 数据库中的tb_follow记录博主与粉丝的关系 tb_follow表对应的实体类 Data EqualsAndHashCode(callSuper false) Accessors(chain true) TableName("tb_follow") public class Follow implements Serializable {private static final long ser…

【RtpRtcp】1: webrtc m79:audio的ChannelReceive 创建并使用

m79中,RtpRtcp::Create 的调用很少 不知道谁负责创建ChannelReceiveclass ChannelReceive : public ChannelReceiveInterface,public MediaTransportAudioSinkInterface {接收编码后的音频帧:接收rtcp包:

nodejs微信小程序+python+PHP -留学信息查询系统的设计与实现-安卓-计算机毕业设计

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

如何提高希音、亚马逊、国际站店铺流量转化,自养号优势及测评底层环境逻辑

随着全球贸易数字化程度加快&#xff0c;尤其是跨境电商的发展日新月异&#xff0c;在外贸出口占比越来越高&#xff0c;在这其中&#xff0c;亚马逊作为全球实力强劲的在线零售平台之一&#xff0c;吸引了大量的优秀卖家。 而这也加剧了亚马逊平台的竞争程度&#xff0c;尤其…

Java核心知识点整理大全9-笔记

目录 null文章浏览阅读9w次&#xff0c;点赞7次&#xff0c;收藏7次。Java核心知识点整理大全https://blog.csdn.net/lzy302810/article/details/132202699?spm1001.2014.3001.5501 Java核心知识点整理大全2-笔记_希斯奎的博客-CSDN博客 Java核心知识点整理大全3-笔记_希斯…

二十四、RestClient操作文档

目录 一、新增文档 1、编写测试代码 二、查询文档 1、编写测试代码 三、删除文档 1、编写测试代码 四、修改文档 1、编写测试代码 五、批量导入文档 批量查询 一、新增文档 1、编写测试代码 SpringBootTest public class HotelDocumentTest {private RestHighLevelC…

PCIE链路训练-状态机描述2

Configuration.Lanenum.Accept 如果use_modified_TS1_TS2_Ordered_Set为1&#xff0c;需要注意&#xff1a; &#xff08;1&#xff09;tx需要发送Modified TS1而不是正常的TS1&#xff1b; &#xff08;2&#xff09;rx端必须检查是否收到Modified TS1&#xff08;注意一开…

node版本管理工具-nvm

1、 下载地址 https://github.com/coreybutler/nvm-windows/releases/tag/1.1.11 2、 选择安装地址不能有空格&#xff0c;中文 3、 使用命令

【图论】关键路径求法c++

代码结构如下图&#xff1a; 其中topologicalSort(float**, int, int*, bool*, int, int)用来递归求解拓扑排序&#xff0c;topologicalSort(float**, int*&, int, int, int)传参图的邻接矩阵mat与结点个数n&#xff0c;与一个引用变量数组topo&#xff0c;返回一个布尔值…

【网络奇缘】- 计算机网络|分层结构|ISO模型

&#x1f308;个人主页: Aileen_0v0&#x1f525;系列专栏: 一见倾心,再见倾城 --- 计算机网络~&#x1f4ab;个人格言:"没有罗马,那就自己创造罗马~" 目录 计算机网络分层结构 OSI参考模型 OSI模型起源 失败原因: OSI模型组成 协议的作用 &#x1f4dd;全文…

C语言——求分段函数 y=f(x)的值

求分段函数 yf(x)的值,f(x)的表达式如下: #define _CRT_SECURE_NO_WARNINGS 1#include<stdio.h> int main() {int x,y;printf("请输入x的值&#xff1a;");scanf("%d",&x);if(x>5){yx3;}else if(x>0 && x<5){y0;}elsey2*x30;pr…

使用vcpkg安装库失败的解决方法

1、前言 vcpk是是一款开源的c/c库管理工具&#xff0c;尤其是在windows平台&#xff0c;可以帮助我们很好的管理各种依赖包。 在windows环境做c/c开发的人应该都深有体会&#xff0c;有时候编译需要下载一堆依赖库&#xff0c;导致搭建编译环境特别麻烦。但是&#xff0c;通过v…