《Flink学习笔记》——第八章 状态管理

news2024/10/6 0:36:25

8.1 Flink中的状态

8.1.1 概述

在Flink中,算子任务可以分为无状态和有状态两种情况。

**无状态的算子:**每个事件不依赖其它数据,自己处理完就输出,也不需要依赖中间结果。例如:打印操作,每个数据只需要它本身就可以完成。

**有状态的算子:**事件需要依赖中间或者外其它数据才能完成计算。比如计算累加和,我们需要记录当前的和是多少,等下一个数据来的时候我们直接将当前和加上该数更新当前累加和。所以我们需要保存当前和。而这里的中间结果和其它数据就是“状态”。

8.1.2 状态的分类

1)托管状态和原始状态

Flink的状态有两种:

  • 托管状态:由Flink统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由Flink实现,我们只需要调接口就可以。

  • 原始状态:自定义的,相当于开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。

通常我们采用Flink托管状态来实现需求。

2)算子状态和按键分区状态

托管状态又可以分为两类:

  • 算子状态:只对当前并行子任务实例有效,这就意味着对于一个并行子任务,占据了一个“分区”,它所处理的所有数据都会访问到相同的状态。

    image-20230827025328858

  • 按键分区状态:状态是根据输入流中定义的键来维护和访问的,所以只能定义在按键分区流中,也就是keyBy之后。

    image-20230827030014586

1.另外,也可以通过富函数类(Rich Function)来自定义Keyed State,所以只要提供了富函数类接口的算子,也都可以使用Keyed State。所以即使是map、filter这样无状态的基本转换算子,我们也可以通过富函数类给它们“追加”Keyed State。所以从这个角度讲,Flink中所有的算子都可以是有状态的。

2.无论是Keyed State还是Operator State,它们都是在本地实例上维护的,也就是说每个并行子任务维护着对应的状态,算子的子任务之间状态不共享。

8.2 按键分区状态

任务按照键key来访问和维护的状态,特点就是以key为作用范围进行隔离

实际的值如何存储,或者说存储结构有以下类型:

8.2.1 值状态

状态中只保留一个“值”。源码中定义如下:

public interface ValueState<T> extends State {
    T value() throws IOException;
    void update(T value) throws IOException;
}

T value():获取当前状态的值;

update(T value):对状态进行更新,传入的参数value就是要覆写的状态值

​ 在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState的状态描述器构造方法如下:

public ValueStateDescriptor(String name, Class<T> typeClass) {
  super(name, typeClass, null);
}

**案例需求:**检测每种传感器的水位值,如果连续的两个水位值超过10,就输出报警。

public class KeyValueStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> sensorDs = env.socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
                );

        sensorDs.keyBy(WaterSensor::getId).process(new MyKeyedProFunc()).print();
        env.execute();
    }
}
package com.zlin.state.po;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.util.Objects;

/**
 * @author ZLin
 * @since 2023/8/27
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class WaterSensor {
    // 传感器id
    public String id;
    // 时间戳
    public Long ts;
    // 水位值
    public Integer vc;

    @Override
    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || getClass() != o.getClass()) {
            return false;
        }
        WaterSensor that = (WaterSensor) o;
        return Objects.equals(id, that.id) &&
                Objects.equals(ts, that.ts) &&
                Objects.equals(vc, that.vc);
    }

    @Override
    public int hashCode() {
        return Objects.hash(id, ts, vc);
    }

}
public class WaterSensorMapFunction implements MapFunction<String, WaterSensor> {
    @Override
    public WaterSensor map(String value) throws Exception {
        String[] values = value.split(",");
        return new WaterSensor(values[0], Long.valueOf(values[1]), Integer.valueOf(values[2]));
    }
}
public class MyKeyedProFunc extends KeyedProcessFunction<String, WaterSensor, String> {
    // TODO 1.定义状态
    ValueState<Integer> lastVcState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // TODO 2.在open方法中,初始化状态
        // 状态描述器两个参数:第一个参数,起个名字,不重复;第二个参数,存储的类型
        lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));
    }


    @Override
    public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context context, Collector<String> out) throws Exception {
        //     lastVcState.value();  // 取出 本组 值状态 的数据
        //     lastVcState.update(); // 更新 本组 值状态 的数据
        //     lastVcState.clear();  // 清除 本组 值状态 的数据

        // 1. 取出上一条数据的水位值(Integer默认值是null,判断)
        int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();

        // 2. 求差值的绝对值,判断是否超过10
        Integer vc = value.getVc();
        if (Math.abs(vc - lastVc) > 10) {
            out.collect("传感器=" + value.getId() + "==>当前水位值=" + vc + ",与上一条水位值=" + lastVc + ",相差超过10!!!!");
        }

        // 3. 更新状态里的水位值
        lastVcState.update(vc);

    }
}

8.2.2 列表状态

将需要保存的数据,以列表(List)的形式组织起来。在ListState接口中同样有一个类型参数T,表示列表中数据的类型。ListState也提供了一系列的方法来操作状态,使用方式与一般的List非常相似。

类似地,ListState的状态描述器就叫作ListStateDescriptor

ListState:

  • Iterable get():获取当前的列表状态,返回的是一个可迭代类型Iterable;

  • update(List values):传入一个列表values,直接对状态进行覆盖;

  • add(T value):在状态列表中添加一个元素value;

  • addAll(List values):向列表中添加多个元素,以列表values形式传入

  • clear():清空

案例:针对每种传感器输出最高的3个水位值

package com.zlin.state;

import com.zlin.state.func.WaterSensorMapFunction;
import com.zlin.state.po.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
 * @author ZLin
 * @since 2023/8/27
 */
public class KeyedListStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
                );

        sensorDS.keyBy(r -> r.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {

                            ListState<Integer> vcListState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT));
                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                // 1.来一条,存到list状态里
                                vcListState.add(value.getVc());

                                // 2.从list状态拿出来(Iterable), 拷贝到一个List中,排序, 只留3个最大的
                                Iterable<Integer> vcListIt = vcListState.get();
                                // 2.1 拷贝到List中
                                List<Integer> vcList = new ArrayList<>();
                                for (Integer vc : vcListIt) {
                                    vcList.add(vc);
                                }
                                // 2.2 对List进行降序排序
                                vcList.sort((o1, o2) -> o2 - o1);
                                // 2.3 只保留最大的3个(list中的个数一定是连续变大,一超过3就立即清理即可)
                                if (vcList.size() > 3) {
                                    // 将最后一个元素清除(第4个)
                                    vcList.remove(3);
                                }

                                out.collect("传感器id为" + value.getId() + ",最大的3个水位值=" + vcList.toString());

                                // 3.更新list状态
                                vcListState.update(vcList);


//                                vcListState.get();            //取出 list状态 本组的数据,是一个Iterable
//                                vcListState.add();            // 向 list状态 本组 添加一个元素
//                                vcListState.addAll();         // 向 list状态 本组 添加多个元素
//                                vcListState.update();         // 更新 list状态 本组数据(覆盖)
//                                vcListState.clear();          // 清空List状态 本组数据
                            }
                        }
                )
                .print();

        env.execute();

    }
}

8.2.3 Map状态

把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组key-value映射的列表。对应的MapState<UK, UV>接口中,就会有UK、UV两个泛型,分别表示保存的key和value的类型。同样,MapState提供了操作映射状态的方法,与Map的使用非常类似。

MapState<UK, UV>:

  • UV get(UK key):传入一个key作为参数,查询对应的value值;

  • put(UK key, UV value):传入一个键值对,更新key对应的value值;

  • putAll(Map<UK, UV> map):将传入的映射map中所有的键值对,全部添加到映射状态中;

  • remove(UK key):将指定key对应的键值对删除;

  • boolean contains(UK key):判断是否存在指定的key,返回一个boolean值。

另外,MapState也提供了获取整个映射相关信息的方法;

  • Iterable<Map.Entry<UK, UV>> entries():获取映射状态中所有的键值对;

  • Iterable keys():获取映射状态中所有的键(key),返回一个可迭代Iterable类型;

  • Iterable values():获取映射状态中所有的值(value),返回一个可迭代Iterable类型;

  • boolean isEmpty():判断映射是否为空,返回一个boolean值

  • clear():清空

**案例需求:**统计每种传感器每种水位值出现的次数。

public class KeyedMapStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction())
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
                );

        sensorDS.keyBy(r -> r.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {

                            MapState<Integer, Integer> vcCountMapState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Types.INT, Types.INT));
                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                // 1.判断是否存在vc对应的key
                                Integer vc = value.getVc();
                                if (vcCountMapState.contains(vc)) {
                                    // 1.1 如果包含这个vc的key,直接对value+1
                                    Integer count = vcCountMapState.get(vc);
                                    vcCountMapState.put(vc, ++count);
                                } else {
                                    // 1.2 如果不包含这个vc的key,初始化put进去
                                    vcCountMapState.put(vc, 1);
                                }

                                // 2.遍历Map状态,输出每个k-v的值
                                StringBuilder outStr = new StringBuilder();
                                outStr.append("======================================\n");
                                outStr.append("传感器id为" + value.getId() + "\n");
                                for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {
                                    outStr.append(vcCount.toString() + "\n");
                                }
                                outStr.append("======================================\n");

                                out.collect(outStr.toString());


//                                vcCountMapState.get();          // 对本组的Map状态,根据key,获取value
//                                vcCountMapState.contains();     // 对本组的Map状态,判断key是否存在
//                                vcCountMapState.put(, );        // 对本组的Map状态,添加一个 键值对
//                                vcCountMapState.putAll();  // 对本组的Map状态,添加多个 键值对
//                                vcCountMapState.entries();      // 对本组的Map状态,获取所有键值对
//                                vcCountMapState.keys();         // 对本组的Map状态,获取所有键
//                                vcCountMapState.values();       // 对本组的Map状态,获取所有值
//                                vcCountMapState.remove();   // 对本组的Map状态,根据指定key,移除键值对
//                                vcCountMapState.isEmpty();      // 对本组的Map状态,判断是否为空
//                                vcCountMapState.iterator();     // 对本组的Map状态,获取迭代器
//                                vcCountMapState.clear();        // 对本组的Map状态,清空

                            }
                        }
                )
                .print();

        env.execute();
    }
}

8.2.4 归约状态

​ 类似于值状态(Value),不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。ReducingState这个接口调用的方法类似于ListState,只不过它保存的只是一个聚合值,所以调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。

​ 归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍reduce聚合算子时讲到的ReduceFunction,所以状态类型跟输入的数据类型是一样的

ReducingState<T>
public ReducingStateDescriptor(
    String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}

name:名称
reduceFunction:归约聚合逻辑
typeClass:类型

**案例:**计算每种传感器的水位和

public class KeyedProcessFuncReducingState extends KeyedProcessFunction<String, WaterSensor,Integer> {
    private ReducingState<Integer> sumVcState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        sumVcState = this.getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("sumVcState",
                Integer::sum, Integer.class));
    }

    @Override
    public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, Integer>.Context context,
                               Collector<Integer> out) throws Exception {
        sumVcState.add(value.getVc());
        out.collect(sumVcState.get());
    }
}

8.2.5 聚合状态

​ 与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与ReducingState不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。

​ 同样地,AggregatingState接口调用方法也与ReducingState相同,调用.add()方法添加元素时,会直接使用指定的AggregateFunction进行聚合并更新状态。

AggregatingState<IN, OUT>
public AggregatingStateDescriptor(
            String name,
            AggregateFunction<IN, ACC, OUT> aggFunction,
            TypeInformation<ACC> stateType)
name:描述器名称
aggFunction:聚合函数,聚合逻辑
stateType:状态里各个元素的数据类型

**案例需求:**计算每种传感器的平均水位

public class AggregatingStateKeyedProcessFunc extends KeyedProcessFunction<String, WaterSensor, String> {
    private AggregatingState<Integer, Double> vcAvgAggregatingState;


    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        vcAvgAggregatingState = this.getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>(
                "vcAvgAggregatingState",
                new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
                    @Override
                    public Tuple2<Integer, Integer> createAccumulator() {
                        return Tuple2.of(0, 0);
                    }

                    @Override
                    public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
                        return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
                    }

                    @Override
                    public Double getResult(Tuple2<Integer, Integer> accumulator) {
                        return accumulator.f0 * 1D / accumulator.f1;
                    }

                    @Override
                    public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                        return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
                    }
                }, Types.TUPLE(Types.INT, Types.INT)));
    }

    @Override
    public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, String>.Context context,
                               Collector<String> out) throws Exception {
        // 将水位值添加到聚合状态中
        vcAvgAggregatingState.add(value.getVc());

        // 从聚合状态中获取结果
        Double vcAvg = vcAvgAggregatingState.get();

        out.collect("传感器id为" + value.getId() + ",平均水位值=" + vcAvg);
    }
}

区别:

// 归约函数
ReduceFunction<T>

// 聚合函数
AggregateFunction<IN, ACC, OUT>

可以看到当输入输出类型不一致时,我们使用聚合函数处理更方便。

8.2.6 状态生存时间(TTL)

​ 在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。

​ 具体实现上,如果用一个进程不停地扫描所有状态看是否过期,显然会占用大量资源做无用功。状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。

​ 配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的.enableTimeToLive()方法启动TTL功能。

StateTtlConfig stateTtlConfig = StateTtlConfig
    .newBuilder(Time.seconds(10))
    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build();
/*
newBuilder(@Nonnull Time ttl): ttl——设定的状态生存时间
setUpdateType(UpdateType updateType):更新状态失效时间,
	Disabled:状态不会过期
	OnCreateAndWrite:当创建或者写的时候,更新状态失效时间
	OnReadAndWrite:当读或者写的时候更新状态失效时间
setStateVisibility(@Nonnull StateVisibility stateVisibility):设置状态的可见性,所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能继续存在,这时如果对它进行访问,能否正常读取到就是一个问题了。
	ReturnExpiredIfNotCleanedUp:默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取
	NeverReturnExpired:就是如果过期状态还存在,就返回它的值
*/


ReducingStateDescriptor<Integer> reducingStateDescriptor = new ReducingStateDescriptor<Integer>("sumVcState", Integer::sum, Integer.class);
reducingStateDescriptor.enableTimeToLive(stateTtlConfig);

​ 除此之外,TTL配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对RocksDB状态后端使用压缩过滤器(compaction filter)进行后台清理。这里需要注意,目前的TTL设置只支持处理时间。

!!!注意:目前的TTL设置只支持处理时间!!!

8.3 算子状态

​ 算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的key无关,所以不同key的数据只要被分发到同一个并行子任务,就会访问到同一个Operator State

​ 当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。

​ 算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState。

8.3.1 列表状态(ListState)

与Keyed State中的ListState一样,将状态表示为一组数据的列表。

与Keyed State中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。

当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个“大列表”,然后再均匀地分配给所有并行任务。这种“均匀分配”的具体方法就是“轮询”(round-robin),与之前介绍的rebanlance数据传输方式类似,是通过逐一“发牌”的方式将状态项平均分配的。这种方式也叫作“平均分割重组”(even-split redistribution)。

算子状态中不会存在“键组”(key group)这样的结构,所以为了方便重组分配,就把它直接定义成了“列表”(list)。这也就解释了,为什么算子状态中没有最简单的值状态(ValueState)。

public class OperatorStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        env
                .socketTextStream("hadoop102", 7777)
                .map(new MyCountMapFunction())
                .print();
        
        env.execute();
    }
}
public class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {
    private Long count = 0L;
    private ListState<Long> state;

    @Override
    public Long map(String value) throws Exception {
        return ++count;
    }

    /**
     * 本地变量持久化:将 本地变量 拷贝到 算子状态中,开启checkpoint时才会调用
     * @param functionSnapshotContext context
     * @throws Exception e
     */
    @Override
    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        System.out.println("snapshotState...");
        // 清空算子状态
        state.clear();
        // 将本地变量添加到算子状态中
        state.add(count);

    }

    /**
     * 初始化本地变量:程序启动和恢复时, 从状态中 把数据添加到 本地变量,每个子任务调用一次
     * @param context context
     * @throws Exception e
     */
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        System.out.println("initializeState...");
        // 从 上下文 初始化 算子状态
        state = context
                .getOperatorStateStore()
                .getListState(new ListStateDescriptor<Long>("state", Types.LONG));
        // 从 算子状态中 把数据 拷贝到 本地变量
        if (context.isRestored()) {
            for (Long c : state.get()) {
                count += c;
            }
        }

    }
}

8.3.2 联合列表状态(UnionListState)

与ListState类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。

UnionListState的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。这样,并行度缩放之后的并行子任务就获取到了联合后完整的“大列表”,可以自行选择要使用的状态项和要丢弃的状态项。这种分配也叫作“联合重组”(union redistribution)。如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。

state = context
              .getOperatorStateStore()
              .getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));

8.3.3 广播状态(BroadcastState)

有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。这时所有分区的所有数据都会访问到同一个状态,状态就像被“广播”到所有分区一样,这种特殊的算子状态,就叫作广播状态(BroadcastState)。

因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展;而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接砍掉——因为状态都是复制出来的,并不会丢失。

**案例实操:**水位超过指定的阈值发送告警,阈值可以动态修改

public class BroadcastStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 数据流
        SingleOutputStreamOperator<WaterSensor> sensorDS = env
                .socketTextStream("hadoop102", 7777)
                .map(new WaterSensorMapFunction());

        // 配置流(用来广播配置)
        DataStreamSource<String> configDS = env.socketTextStream("hadoop102", 8888);

        // TODO 1. 将 配置流 广播
        MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);
        BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);

        // TODO 2.把 数据流 和 广播后的配置流 connect
        BroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS);

        sensorBCS.process(new BroadcastStateProcessFunc(broadcastMapState)).print();

        env.execute();
    }
}
package com.zlin.state.func;

import com.zlin.state.po.WaterSensor;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @author ZLin
 * @since 2023/8/28
 */
public class BroadcastStateProcessFunc extends BroadcastProcessFunction<WaterSensor, String, String> {

    MapStateDescriptor<String, Integer> broadcastMapState;

    public BroadcastStateProcessFunc(MapStateDescriptor<String, Integer> broadcastMapState) {
        this.broadcastMapState = broadcastMapState;
    }

    /**
     * 数据流的处理,广播状态,只能读取,不能修改
     * @param value 数据
     * @param context 只读上下文
     * @param out 收集器
     * @throws Exception e
     */
    @Override
    public void processElement(WaterSensor value, BroadcastProcessFunction<WaterSensor, String, String>.ReadOnlyContext context, Collector<String> out) throws Exception {

        // 通过上下文获取广播状态,获取里面的值
        ReadOnlyBroadcastState<String, Integer> broadcastState = context.getBroadcastState(broadcastMapState);
        Integer threshold = broadcastState.get("threshold");
        threshold = (threshold == null ? 0 : threshold);

        if (value.getVc() > threshold) {
            out.collect(value + ",水位超过指定的阈值:" + threshold + "!!!");
        }

    }


    /**
     * 广播后的配置数据的处理方法:只有广播流才能修改广播状态
     * @param s 配置值
     * @param context 上下文
     * @param collector 收集器
     * @throws Exception e
     */
    @Override
    public void processBroadcastElement(String s, BroadcastProcessFunction<WaterSensor, String, String>.Context context, Collector<String> collector) throws Exception {
        BroadcastState<String, Integer> broadcastState = context.getBroadcastState(broadcastMapState);
        broadcastState.put("threshold", Integer.valueOf(s));
    }
}

8.4 状态后端(State Backends)

​ 在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责管理本地状态的存储方式和位置

8.4.1 状态后端的分类(HashMapStateBackend/RocksDB

状态后端是一个“开箱即用”的组件,可以在不改变应用程序逻辑的情况下独立配置。

两种:

  • 哈希表状态后端(HashMapStateBackend):默认;HashMapStateBackend是把状态存放在内存里。把状态当作对象(objects),保存在Taskmanager的JVM堆上,以键值对的形式存储起来。
  • 内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend):RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里。RocksDB的状态数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。EmbeddedRocksDBStateBackend始终执行的是异步快照,所以不会因为保存检查点而阻塞数据的处理;而且它还提供了增量式保存检查点的机制,这在很多情况下可以大大提升保存效率。

8.4.2 如何选择正确的状态后端

HashMapStateBackend:

  • 内存计算,读写速度非常快
  • 状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。

EmbeddedRocksDBStateBackend:

  • RocksDB是硬盘存储,所以可以根据可用的磁盘空间进行扩展,所以它非常适合于超级海量状态的存储。
  • 由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级。

8.4.3 状态后端的配置

(1)配置文件

如果没有配置,则使用默认的集群配置文件flink-conf.yaml中指定的,配置的键名称为state.backend。一般用来配置默认的状态后端。

(2)为每个作业单独配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
/
env.setStateBackend(new EmbeddedRocksDBStateBackend());

如果想在IDE中使用EmbeddedRocksDBStateBackend,需要为Flink项目添加依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb</artifactId>
    <version>${flink.version}</version>
</dependency>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/942272.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【PLSQL】PLSQL基础

文章目录 一&#xff1a;记录类型1.语法2.代码实例 二&#xff1a;字符转换三&#xff1a;%TYPE和%ROWTYPE1.%TYPE2.%ROWTYPE 四&#xff1a;循环1.LOOP2.WHILE&#xff08;推荐&#xff09;3.数字式循环 五&#xff1a;游标1.游标定义及读取2.游标属性3.NO_DATA_FOUND和%NOTFO…

释放 ChatGPT 的价值:5 个专家提示

随着近来ChatGPT的热议&#xff0c;人工智能技术被推上风口浪尖&#xff0c;由此以数字化技术为基础的数字营销也再次受到了不小的关注&#xff0c;但是营销的本质从来都没有变过&#xff0c;今天我们聊下ChatGPT无论如何演进&#xff0c;人工智能无论变得多么先进&#xff0c;…

设计模式—外观模式(Facade)

目录 一、什么是外观模式&#xff1f; 二、外观模式具有什么优点吗&#xff1f; 三、外观模式具有什么缺点呢&#xff1f; 四、什么时候使用外观模式&#xff1f; 五、代码展示 ①、股民炒股代码 ②、投资基金代码 ③外观模式 思维导图 一、什么是外观模式&#xff1f;…

《Flink学习笔记》——第四章 Flink运行时架构

4.1 系统架构 Flink运行时架构 Flink 运行时由两种类型的进程组成&#xff1a;一个 JobManager 和一个或者多个 TaskManager。 1、作业管理器&#xff08;JobManager&#xff09; JobManager是一个Flink集群中任务管理和调度的核心&#xff0c;是控制应用执行的主进程。也就…

Java之API详解之Object类的详细解析

4 Object类 4.1 概述 tips&#xff1a;重点讲解内容 查看API文档&#xff0c;我们可以看到API文档中关于Object类的定义如下&#xff1a; Object类所在包是java.lang包。Object 是类层次结构的根&#xff0c;每个类都可以将 Object 作为超类。所有类都直接或者间接的继承自该类…

JavaScript设计模式(二)——简单工厂模式、抽象工厂模式

个人简介 &#x1f440;个人主页&#xff1a; 前端杂货铺 &#x1f64b;‍♂️学习方向&#xff1a; 主攻前端方向&#xff0c;正逐渐往全干发展 &#x1f4c3;个人状态&#xff1a; 研发工程师&#xff0c;现效力于中国工业软件事业 &#x1f680;人生格言&#xff1a; 积跬步…

网络防御和入侵检测

网络防御和入侵检测是维护网络安全的关键任务&#xff0c;可以帮助识别和阻止未经授权的访问和恶意行为。以下是一些基本的步骤和方法&#xff0c;用于进行网络防御和入侵检测。 网络防御&#xff1a; 防火墙设置&#xff1a; 部署防火墙来监控和控制网络流量&#xff0c;阻止…

禅道后台命令执行漏洞 (二)

漏洞简介 禅道是第一款国产的开源项目管理软件。它集产品管理、项目管理、质量管理、文档管理、 组织管理和事务管理于一体&#xff0c;是一款专业的研发项目管理软件&#xff0c;完整地覆盖了项目管理的核心流程。禅道管理思想注重实效&#xff0c;功能完备丰富&#xff0c;操…

多线程基础篇

我们平常说的一个程序&#xff0c;一个程序中有声音&#xff0c;图片&#xff0c;字幕 实际上是一个进程中有多个线程 main线程是主线程。 多核&#xff0c;多个cpu&#xff0c;多个线程&#xff0c;切换的很快 单核的话是一个cpu,某一时间只能是一个线程&#xff0c;但是因为…

NLNet、GCNet、RTNet三种多头注意力网络的对比与分析

目录 一、中心思想 二、网络结构的异同点 三、网络结构的改进 3.1 GCNet的改进 3.2 RTNet的改进 四、总结 一、中心思想 三种网络最终目的都是为了捕获远程依赖关系或是全局上下文信息以增强目标检测或是目标分割的效果。 NLNet&#xff1a;卷积运算只能一次处理一个局…

C#,《小白学程序》第七课:列表(List)应用之一“编制高铁车次信息表”

1 文本格式 /// <summary> /// 车站信息类 class /// </summary> public class Station { /// <summary> /// 编号 /// </summary> public int Id { get; set; } 0; /// <summary> /// 车站名 /// </summary>…

17.CSS发光按钮悬停特效

效果 源码 <!DOCTYPE html> <html> <head><title>CSS Modern Button</title><link rel="stylesheet" type="text/css" href="style.css"> </head> <body><a href="#" style=&quo…

【Go 基础篇】探索Go语言中Map的神奇操作

嗨&#xff0c;Go语言的学习者们&#xff01;在编程世界中&#xff0c;Map是一个强大而又有趣的工具&#xff0c;它可以帮助我们高效地存储和操作键值对数据。Map就像是一本字典&#xff0c;可以让我们根据关键字&#xff08;键&#xff09;快速找到对应的信息&#xff08;值&a…

Windows版本Docker安装详细步骤

文章目录 下载地址安装异常处理docker desktop requires a newer wsl 下载地址 https://desktop.docker.com/win/stable/Docker%20Desktop%20Installer.exe 安装 双击下载的文件Docker Desktop Installer.exe进行安装 点击OK 开始安装 安装完成点击Close and restart&…

JavaScript—面向对象、作用域

C#&#xff1a;从类继承 js&#xff1a;从对象继承 什么叫继承&#xff1f; 模板&#xff08;类&#xff09; 原型继承&#xff08;实体&#xff09; 有一个对象存在&#xff0c;构造函数设置原型为这个对象 创建出来的对象就继承与这个对象&#xff08;从对象那里继承&am…

ElasticSearch基础知识汇总

文章目录 前言一、认识ElasticSearch1.正向索引和倒排索引2. MySql与ElasticSearc3.IK分词器 二、ES索引库操作1.mapping映射属性2.索引库的CRUD 三、ES文档库操作 前言 Elasticsearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎&#xff0c;基…

Kali Linux进行移动应用安全测试

使用Kali Linux进行移动应用安全测试是一项关键任务&#xff0c;可以帮助识别和修复移动应用中的安全漏洞。以下是一个基本的步骤指南&#xff0c;展示如何在Kali Linux中进行移动应用安全测试。 步骤&#xff1a; 准备环境&#xff1a; 确保 已经安装了Kali Linux&#xff0c…

开箱报告,Simulink Toolbox库模块使用指南(五)——S-Fuction模块(C MEX S-Function)

文章目录 前言 C MEX S-Function 算法原理 原始信号创建 编写S函数 仿真验证 Tips 分析和应用 总结 前言 见《开箱报告&#xff0c;Simulink Toolbox库模块使用指南&#xff08;一&#xff09;——powergui模块》 见《开箱报告&#xff0c;Simulink Toolbox库模块使用…

fastjson利用templatesImpl链

fastjson1.2.24 环境&#xff1a; pom.xml&#xff1a; <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLoc…

在 Python 中逐步构建 DCF(贴现流)估值

Building A DCF Valuation in Python, Step by Step | by Roi Polanitzer | Medium 说明 这是一个真实的&#xff0c;以色列国土内的公司业务评估案例。在本文中&#xff0c;我将演示如何使用python中的DCF方法对以色列系统和应用程序公司进行业务评估。因为存在许多业务术语&a…