Flink系列之Flink中State设计详解与企业案例实践

news2024/7/6 20:38:43

title: Flink系列


二、Flink State 设计详解

Flink 官网解释:Apache Flink® — Stateful Computations over Data Streams

前课中 WordCount 的例子,可以得知:其实我们会发现,单词出现的次数有累计的效果。如果没有状态的管理,是不会有累计的效果的,所以 Flink 里面有 state 的概念。

需求:统计路口每个小时里面的总车流量

路口编号小时总的车流量
路口010-12000
路口021-21800
路口032-31600
路口043-41500

State: 假设某个路口统计到 0:30 的时候,临时总车流量 就是当前这个 路口对应的 Task 的 状态

Window操作:每隔 10s 统计过去 20s 的总车流量

在这里插入图片描述

计算平均值:1 =>(1,1) ; 2 => (2,3) ; 3=>(3,6) ;…

​ 通用的:n => (state.count +1 , state.sum + n) 最终输出的结果: state.sum / state.count

​ Flink 的一个重要特性就是有状态计算:Stateful Computations over Data Streams

​ State 简单说,就是 Flink Job 的 Task 在运行过程中,产生的一些状态数据。这些状态数据,会辅助 Task 执行某些有状态计算,同时也涉及到 Flink Job 的重启状态恢复。所以,保存和管理每个 Task 的状态是非常重要的一种机制。这也是 Flink 有别于其他分布式计算引擎的最重要的区别。

​ State 需要配合检查点 Checkpoint 机制来保证 Flink 作业失败后能正确地进行错误恢复。

​ Flink 中的状态分为两类,Keyed State 和 Operator State 。

  • Keyed State 是和具体的 Key 相绑定的,只能在 KeyedStream 上的函数和算子中使用。
  • Opeartor State 则是和 Operator 的一个特定的并行实例相绑定的,例如 Kafka Connector 中,每一个并行的 Kafka Consumer 都在 Operator State 中维护当前 Consumer 订阅的 partiton 和 offset。

​ 由于 Flink 中的 keyBy 操作保证了每一个键相关联的所有消息都会送给下游算子的同一个并行实例处理,因此 Keyed State 也可以看作是 Operator State 的一种分区(partitioned)形式,每一个 key 都关联一个状态分区(state-partition)。

​ 从另一个角度来看,无论 Operator State 还是 Keyed State,都有两种形式,Managed State 和 Raw State。 Managed State 的数据结构由 Flink 进行托管,而Raw State 的数据结构对 Flink 是透明的。 Flink 的建议是尽量使用 Managed State,这样 Flink 可以在并行度改变等情况下重新分布状态,并且可以更好地进行内存管理。

简单总结一下:

state:一般指一个具体的 task/operator 的状态。State 可以被记录,在失败的情况下数据还可以恢复

Flink 中有两种基本类型的 State:Keyed StateOperator State,他们两种都可以以两种形式存在:原始状态(raw state) 和 托管状态(managed state)。

Keyed State:在做 keyBy 之后,每个 key 都会携带一个状态。这种状态,就是 key state 

Operator State: 一个 Task 一个 State

1、托管状态:由 Flink 框架管理的状态,我们通常使用的就是这种。

2、原始状态:由用户自行管理状态具体的数据结构,框架在做 checkpoint 的时候,使用 byte[] 来读写状态内容,对其内部数据结构一无所知。通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的 operator 时,会使用到原始状态。但是我们工作中一般不常用,所以我们不考虑它。

ZooKeeper: ZNode 的分类:持久类型 + 临时类型, 这两个类型的 ZNode 又可以划分成另外两种类型:带顺序编号的,不带顺序的

Flink 的 State 类型,通过一张图来理解

在这里插入图片描述

关于上图的理解补充:

1、假设 kafka 中有一个 Topic ,有 4 个分区

2、Flink Application 应用程序的 Source Operator 的并行度,一般就会必然设置成 4 ,一般不会设置多,也不会设置少

3、每个 Source Task 必然要去记录 当前这个 SourceTask 消费到 对应的那个 Topic Partition 的 offset

	就是把所有的 Source Task 的状态 Operator State 进行持久化

4、Flink + Kafka 的整合
(1)记录 当前应用程序消费 Topic 的 每个分区的 offset
(2)Flink 的 Applicatioin 的 Sink 操作必须输出的时候确保数据一致性(确保数据消费语义有且仅一次)
	幂等输出
	2PC 两阶段分布式事务提交

补充一个知识点:keyed state 记录的是每个 key 的状态
Keyed State 托管状态有五种类型:

1、ValueState 单个值(Integer, String, Tuple2, Student)
2、ListState 多个值的(List)
3、MapState key-value类型的值的
4、ReducingState 聚合逻辑
5、AggregatingState 聚合逻辑

并不是所有的计算,都是有状态的, 也有一些计算类型是,是无状态(KeyState)的: 比如大写 转 小写, 比如 ETL 。

三、Flink Keyed State 企业案例实战

Keyed State 主要有下面五种状态:

ValueState

ListState

MapState

ReducingState

AggregatingState

相关的实战案例课上直播编写代码。

3.1 FlinkState案例之ValueState

package com.aa.flinkjava.state;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.state
 * 需求:每 N个相同的key输出的他们的平均值,例如 N=4。
 */
public class FlinkState_01_ValueState {
    public static void main(String[] args) throws Exception {
        //1、环境准备。获取编程入口对象
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //executionEnvironment.setParallelism(2);

        //2、准备一些数据
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource = executionEnvironment.fromElements(
                Tuple2.of(1L, 1L),
                Tuple2.of(1L, 2L),
                Tuple2.of(2L, 3L),
                Tuple2.of(2L, 4L),
                Tuple2.of(1L, 5L),
                Tuple2.of(1L, 6L),
                Tuple2.of(2L, 7L),
                Tuple2.of(2L, 8L)
        );

        //3、状态编程
        /**
         * 三种 State 来计算
         * 1、 ValueState 存储单一的值,一个 Key 对应一个 ValueState
         * 2、 ListState 存储数据列表,一个 Key 对应一个 ListState
         * 3、 MapState 存储数据集合,一个 Key 对应一个 MapState
         */
        //状态编程
        SingleOutputStreamOperator<Tuple2<Long, Float>> result = dataStreamSource.keyBy(0).flatMap(new MyAverageWithValueState());

        //4、输出
        result.print();

        //5、提交执行
        executionEnvironment.execute("FlinkState_01_ValueState");
    }

    /**
     * RichFlatMapFunction<IN, OUT>  两个泛型,一个输入,一个输出
     */
    private static class MyAverageWithValueState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Float>> {
        //定义一个成员变量
        private ValueState<Tuple2<Long, Long>> countAndSumState;

        //初始化的一些方法在这个里面
        @Override
        public void open(Configuration parameters) throws Exception {
            //下面其实就固定步骤: 先声明一个 XXStateDesc
            ValueStateDescriptor<Tuple2<Long, Long>> countAndSumStateDsc = new ValueStateDescriptor<>("countAndSumState", Types.TUPLE(Types.LONG, Types.LONG));

            countAndSumState = getRuntimeContext().getState(countAndSumStateDsc);
        }

        //这个里面实现具体的业务逻辑的。
        @Override
        public void flatMap(Tuple2<Long, Long> record, Collector<Tuple2<Long, Float>> collector) throws Exception {
            //如果当前的这个key的ValueState不存在,则初始化一个。
            Tuple2<Long, Long> currentState = countAndSumState.value();
            if (currentState == null){
                currentState = Tuple2.of(0L,0L);
            }

            //进行状态的处理,key出现的次数加1,key的值进行了累加
            currentState.f0 += 1;
            currentState.f1 += record.f1;

            //状态更新一下
            countAndSumState.update(currentState);

            //下面其实就是具体的业务逻辑了
            if (currentState.f0 == 4){
                //执行计算,求平均值
                float avg = (float)currentState.f1 / currentState.f0;
                //输出出去
                collector.collect(Tuple2.of(record.f0,avg));
                //清空状态
                countAndSumState.clear();
            }

        }
    }

}

3.2 FlinkState案例之ListState

package com.aa.flinkjava.state;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.Collections;


/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.state
 */
public class FlinkState_02_ListState {
    public static void main(String[] args) throws Exception {
        //1、环境准备。获取编程入口对象
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //executionEnvironment.setParallelism(2);

        //2、准备一些数据
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource = executionEnvironment.fromElements(
                Tuple2.of(1L, 1L),
                Tuple2.of(1L, 2L),
                Tuple2.of(2L, 3L),
                Tuple2.of(2L, 4L),
                Tuple2.of(1L, 5L),
                Tuple2.of(1L, 6L),
                Tuple2.of(2L, 7L),
                Tuple2.of(2L, 8L)
        );

        //3、状态编程
        /**
         * 三种 State 来计算
         * 1、 ValueState 存储单一的值,一个 Key 对应一个 ValueState
         * 2、 ListState 存储数据列表,一个 Key 对应一个 ListState
         * 3、 MapState 存储数据集合,一个 Key 对应一个 MapState
         */
        //状态编程
        SingleOutputStreamOperator<Tuple2<Long, Float>> result = dataStreamSource.keyBy(0).flatMap(
                new MyAverageWithListState());

        //4、输出
        result.print();

        //5、提交执行
        executionEnvironment.execute("FlinkState_02_ListState");
    }

    /**
     * RichFlatMapFunction<IN, OUT>  两个泛型,一个输入,一个输出
     */
    private static class MyAverageWithListState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Float>> {
        //定义一个成员变量
        private ListState<Tuple2<Long, Long>> listState;

        //初始化的一些方法在这个里面
        @Override
        public void open(Configuration parameters) throws Exception {
            //下面其实就固定步骤: 先声明一个 XXStateDescr
            ListStateDescriptor<Tuple2<Long, Long>> listStateDsc = new ListStateDescriptor<>(
                    "list_State", Types.TUPLE(Types.LONG, Types.LONG));

            listState = getRuntimeContext().getListState(listStateDsc);
        }

        //这个里面实现具体的业务逻辑的。
        @Override
        public void flatMap(Tuple2<Long, Long> record, Collector<Tuple2<Long, Float>> collector) throws Exception {
            //如果当前的这个key的listState不存在,则初始化一个。
            Iterable<Tuple2<Long, Long>> dataIterator = listState.get();
            if (dataIterator == null){
                listState.addAll(Collections.EMPTY_LIST);
            }

            //将接受到数据放到listState中,然后再去做判断。
            listState.add(record);

            //获取数据集的状态
            ArrayList<Tuple2<Long, Long>> dataList = Lists.newArrayList(listState.get());

            //下面其实就是具体的业务逻辑了,也就是判断是否达到统计的条件
            if (dataList.size() == 4){
                //执行计算,求平均值
                //先统计一个和
                long total = 0;
                for (Tuple2<Long, Long> data : dataList) {
                    total += data.f1;
                }
                float avg = (float) total / 4 ;

                //输出出去
                collector.collect(Tuple2.of(record.f0,avg));
                //清空状态
                listState.clear();
            }

        }
    }
}

3.3 FlinkState案例之MapState

package com.aa.flinkjava.state;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.UUID;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.state
 * MapState 案例
 */
public class FlinkState_03_MapState {
    public static void main(String[] args) throws Exception {
        //1、环境准备。获取编程入口对象
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //executionEnvironment.setParallelism(2);

        //2、准备一些数据
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource = executionEnvironment.fromElements(
                Tuple2.of(1L, 1L),
                Tuple2.of(1L, 2L),
                Tuple2.of(2L, 3L),
                Tuple2.of(2L, 4L),
                Tuple2.of(1L, 5L),
                Tuple2.of(1L, 6L),
                Tuple2.of(2L, 7L),
                Tuple2.of(2L, 8L)
        );

        //3、状态编程
        /**
         * 三种 State 来计算
         * 1、 ValueState 存储单一的值,一个 Key 对应一个 ValueState
         * 2、 ListState 存储数据列表,一个 Key 对应一个 ListState
         * 3、 MapState 存储数据集合,一个 Key 对应一个 MapState
         */
        //状态编程
        SingleOutputStreamOperator<Tuple2<Long, Float>> result = dataStreamSource.keyBy(0).flatMap(
                new MyAverageWithMapState()
        );

        //4、输出
        result.print();

        //5、提交执行
        executionEnvironment.execute("FlinkState_03_MapState");
    }

    /**
     * RichFlatMapFunction<IN, OUT>  两个泛型,一个输入,一个输出
     */
    private static class MyAverageWithMapState extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Float>> {
        //定义一个成员变量
        private MapState<String, Long> mapState;

        //初始化的一些方法在这个里面
        @Override
        public void open(Configuration parameters) throws Exception {
            //下面其实就固定步骤: 先声明一个 XXStateDescr
            MapStateDescriptor<String, Long> mapStateDse = new MapStateDescriptor<>("map_State", String.class, Long.class);

            mapState = getRuntimeContext().getMapState(mapStateDse);
        }

        //这个里面实现具体的业务逻辑的。
        @Override
        public void flatMap(Tuple2<Long, Long> record, Collector<Tuple2<Long, Float>> collector) throws Exception {
            //记录状态数据。
            mapState.put(UUID.randomUUID().toString(), record.f1);

            //执行判断逻辑
            Iterable<Long> values = mapState.values();
            ArrayList<Long> arrayList = Lists.newArrayList(values);

            //下面其实就是具体的业务逻辑了
            if (arrayList.size() == 4){
                //执行计算,求平均值
                //先统计一个和
                long total = 0;
                for (Long data : arrayList) {
                    total += data;
                }
                float avg = (float) total / 4 ;

                //输出出去
                collector.collect(Tuple2.of(record.f0,avg));
                //清空状态
                arrayList.clear();
            }
        }
    }
}

3.4 FlinkState案例之ReducingState

package com.aa.flinkjava.state;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.state
 * ReducingState 案例
 * 自定义累加求和
 */
public class FlinkState_04_ReducingState {
    public static void main(String[] args) throws Exception {
        //1、环境准备。获取编程入口对象
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //executionEnvironment.setParallelism(1);

        //2、准备一些数据
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource = executionEnvironment.fromElements(
                Tuple2.of(1L, 1L),
                Tuple2.of(1L, 2L),
                Tuple2.of(2L, 3L),
                Tuple2.of(2L, 4L),
                Tuple2.of(1L, 5L),
                Tuple2.of(1L, 6L),
                Tuple2.of(2L, 7L),
                Tuple2.of(2L, 8L)
        );

        //3、状态编程
        SingleOutputStreamOperator<Tuple2<Long, Long>> result = dataStreamSource.keyBy(0).flatMap(
                new MySumByReducingStateFunction()
        );

        //4、输出
        result.print();

        //5、提交执行
        executionEnvironment.execute("FlinkState_04_ReducingState");
    }

    /**
     * RichFlatMapFunction<IN, OUT>  两个泛型,一个输入,一个输出
     */
    private static class MySumByReducingStateFunction extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> {
        //定义一个成员变量
        private ReducingState<Long> reducingState;

        //初始化的一些方法在这个里面
        @Override
        public void open(Configuration parameters) throws Exception {
            ReducingStateDescriptor<Long> reducingStateDse = new ReducingStateDescriptor<Long>("reduce_state",
                    new ReduceFunction<Long>() {
                //初始化ReducingState的时候,定义了一个聚合函数
                        @Override
                        public Long reduce(Long value1, Long value2) throws Exception {
                            return value1 + value2;
                        }
                    },Long.class
            );
            reducingState = getRuntimeContext().getReducingState(reducingStateDse);
        }

        //这个里面实现具体的业务逻辑的。
        @Override
        public void flatMap(Tuple2<Long, Long> record, Collector<Tuple2<Long, Long>> collector) throws Exception {
            //将输入合并到 ReducingState 当中
            reducingState.add(record.f1);

            //输出
            collector.collect(Tuple2.of(record.f0,reducingState.get()));
        }
    }
}

3.5 FlinkState案例之AggregatingState

package com.aa.flinkjava.state;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.state
 * AggregatingState案例
 * 输出每个key对应的value列表
 */
public class FlinkState_05_AggregatingState {
    public static void main(String[] args) throws Exception {
        //1、环境准备。获取编程入口对象
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //executionEnvironment.setParallelism(1);

        //2、准备一些数据
        DataStreamSource<Tuple2<Long, Long>> dataStreamSource = executionEnvironment.fromElements(
                Tuple2.of(1L, 1L),
                Tuple2.of(1L, 2L),
                Tuple2.of(2L, 3L),
                Tuple2.of(2L, 4L),
                Tuple2.of(1L, 5L),
                Tuple2.of(1L, 6L),
                Tuple2.of(2L, 7L),
                Tuple2.of(2L, 8L)
        );

        //3、状态编程
        SingleOutputStreamOperator<Tuple2<Long, String>> result = dataStreamSource.keyBy(0).flatMap(
                new MySumByAggregatingStateFunction()
        );

        //4、输出
        result.print();

        //5、提交执行
        executionEnvironment.execute("FlinkState_05_AggregatingState");
    }

    /**
     * RichFlatMapFunction<IN, OUT>  两个泛型,一个输入,一个输出
     */
    private static class MySumByAggregatingStateFunction extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, String>> {
        //定义一个成员变量
        private AggregatingState<Long,String> aggregatingState;

        //初始化的一些方法在这个里面
        @Override
        public void open(Configuration parameters) throws Exception {
            AggregatingStateDescriptor<Long, String, String> aggregatingStateDse = new AggregatingStateDescriptor<>("agg_state",
                    new AggregateFunction<Long, String, String>() {
                        //创建一个初始变量
                        @Override
                        public String createAccumulator() {
                            return "valueList:";
                        }

                        /**
                         * 进行 value的累加
                         * 例如传输过来的数据是 1 2 3
                         * 那么 结果的
                         * valueList: 1
                         * valueList: 1 2
                         * valueList: 1 2 3
                         * @param value   传递过来的值
                         * @param accumulator  字符串拼接结果
                         * @return
                         */
                        @Override
                        public String add(Long value, String accumulator) {
                            if (accumulator.equals("valueList:")) {
                                return accumulator + value;
                            } else {
                                return accumulator + "," + value;
                            }
                        }

                        /**
                         * 获取最终的结果
                         * @param accumulator
                         * @return
                         */
                        @Override
                        public String getResult(String accumulator) {
                            return accumulator;
                        }

                        /**
                         * 分区合并
                         * @param a
                         * @param b
                         * @return
                         */
                        @Override
                        public String merge(String a, String b) {
                            return a + "," + b;
                        }
                    }, String.class
            );
            aggregatingState = getRuntimeContext().getAggregatingState(aggregatingStateDse);
        }

        //这个里面实现具体的业务逻辑的。
        @Override
        public void flatMap(Tuple2<Long, Long> record, Collector<Tuple2<Long, String>> collector) throws Exception {
            //将输入合并到 ReducingState 当中
            aggregatingState.add(record.f1);

            //输出
            collector.collect(Tuple2.of(record.f0,aggregatingState.get()));
        }
    }
}

四、Flink Operator State 企业案例实战

4.1 OperatorState自定义输出案例

package com.aa.flinkjava.state;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.util.ArrayList;
import java.util.List;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.state
 *
 * OperatorState 案例
 * 自定义输出
 */
public class FlinkState_01_OperatorState {
    public static void main(String[] args) throws Exception {
        //1、环境准备。获取编程入口对象
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //executionEnvironment.setParallelism(2);

        //2、准备一些数据
        DataStreamSource<Tuple2<String, Integer>> dataStreamSource = executionEnvironment.fromElements(
                Tuple2.of("zhangsan", 3),
                Tuple2.of("lisi", 4),
                Tuple2.of("wangwu", 5),
                Tuple2.of("zhaoliu", 6),
                Tuple2.of("sunqi", 7),
                Tuple2.of("zhouba", 8),
                Tuple2.of("wujiu", 9),
                Tuple2.of("zhengshi", 10)
        );

        //4、输出
        dataStreamSource.addSink(new MyPrintSink(2)).setParallelism(1);

        //5、提交执行
        executionEnvironment.execute("FlinkState_01_OperatorState");
    }

    private static class MyPrintSink implements SinkFunction<Tuple2<String,Integer>>, CheckpointedFunction{

        private int recordNumber;
        private List<Tuple2<String,Integer>> bufferElements;
        private ListState<Tuple2<String,Integer>> listState;

        public MyPrintSink(int recordNumber) {
            this.recordNumber = recordNumber;
            this.bufferElements = new ArrayList<>();
        }

        /**
         * 对于数据的结合,拍摄state的快照
         * @param functionSnapshotContext
         * @throws Exception
         */
        @Override
        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            listState.clear();
            for (Tuple2<String, Integer> element : bufferElements) {
                listState.add(element);
            }
        }

        /**
         * 初始化 state
         * @param context
         * @throws Exception
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            ListStateDescriptor<Tuple2<String, Integer>> listStateDescriptor = new ListStateDescriptor<>("MyPrintSink",
                    TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
                    })
            );

            //初始化 listState
            ListState<Tuple2<String, Integer>> listState = context.getOperatorStateStore().getListState(listStateDescriptor);

            //状态恢复
            if (context.isRestored()){
                for (Tuple2<String, Integer> element : listState.get()) {
                    bufferElements.add(element);
                }
            }
        }

        /**
         * 帮数据给添加到 bufferElements 的数据集合中
         * @param value
         * @throws Exception
         */
        @Override
        public void invoke(Tuple2<String, Integer> value) throws Exception {
            bufferElements.add(value);

            if (bufferElements.size() == recordNumber){
                System.out.println("输出格式为: " + bufferElements);
                bufferElements.clear();
            }
        }
    }
}



声明:
        文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


By luoyepiaoxue2014

B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/60916.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

自制肥鲨HDO2电源升压延长线

自制肥鲨HDO2电源升压延长线1. 问题源由2. 解决方案3. 材料准备4. 最终延长线产出4.1 裸照4.2 成品5. 参考资料1. 问题源由 之前我们介绍了【自制肥鲨HDO2电源降压延长线&#xff0c;支持3S~6S动力电池】&#xff0c;主要解决使用动力电池给眼镜供电的问题。 但是马上有兄弟反…

SpringMVC执行流程

SpringMVC的流程 整个过程开始于客户端发出的一个HTTP请求&#xff0c;Web应用服务器接收到这个请求。如果匹配DispatcherServlet的请求映射路径&#xff0c;则Web容器将该请求转交给DispatcherServlet处理。DispatcherServlet接收到这个请求后&#xff0c;将根据请求的信息&a…

Linux环境下Vivado和HLS功能测试

一. 简介 针对已经完成的Vivado在Linux下的安装与运行&#xff0c;本文主要通过一个LED灯闪烁的案例对Vivado和HLS在Linux操作系统下的运行流程进行介绍&#xff0c;并对已安装软件功能进行一个简单的测试。 HLS将C代码的编译综合为Verilog或VHDL代码&#xff0c;本文对HLS生成…

[附源码]计算机毕业设计JAVA医院门诊信息管理系统

[附源码]计算机毕业设计JAVA医院门诊信息管理系统 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM my…

MySQL视图

MySQL视图 VIEW&#xff08;视图&#xff09; 概念 可以被当作是虚拟表或存储查询 视图跟表格的不同是&#xff0c;表格中有实际储存资料&#xff0c;而视图是建立在表格之上的一个架构&#xff0c;它本身并不实际储存资料。 临时表在用户退出或同数据库的连接断开后就自动…

[LeetCode解题报告] 1610. 可见点的最大数目

[LeetCode解题报告] 1610. 可见点的最大数目一、 题目1. 题目描述2. 原题链接二、 解题报告1. 思路分析2. 复杂度分析3. 代码实现三、 本题小结四、 参考链接一、 题目 1. 题目描述 可见点的最大数目 难度&#xff1a;2147 给你一个点数组 points 和一个表示角度的整数 ang…

01-25-javajvm-JVM和Java体系架构

01-java-JVM和Java体系架构: 1、jvm底层&#xff0c;对性能调优&#xff0c;java是动态内存分配 2、java的跨平台性&#xff1a; Java虚拟机关心“字节码”文件&#xff0c;Java虚拟机和语言关性&#xff0c;只要其他编程语言的编译结果满足并包含Java虚拟机的内部指令集、符…

ch55xduino

1.把wch的ch55x系列单片机&#xff0c;移植到Arduino&#xff0c;制成所谓的“ch55xduino”&#xff1a;GitHub - DeqingSun/ch55xduino: An Arduino-like programming API for the CH55X 2.ch55x系列单片机比较&#xff08;立创/云汉2022年12月报价&#xff09; (1)CH552T:2…

ECMAScript新特性

代码 ECMAScript概述 ECMAScript 是脚本语言的标准化规范&#xff0c;也就是语言的语法。比如&#xff1a;怎样定义变量、怎样定义函数和逻辑运算等等。 那么ECMAScript 和 JavaScript 是何关系&#xff1f; JavaScript 是ECMAScript 的扩展语言&#xff0c;JavaScript实现了…

项目管理逻辑:日志\周报\月报, 一直要求写, 有用吗?

目录 1.公司管控项目: 2.什么是项目的生命周期? 3.项目管控举例 3.1装修项目阶段划分 3.2研发项目 4.控制项目的核心 1.公司管控项目: 写周报,日报,项目问题照样失控, 其实本质上的问题就是 我们没有如何设置好项目的阶段和项目的里程碑. 项目管理的五个阶段 2.什么是…

Golang基本命令操作

在前两期【初探Golang语言之环境搭建】 和 【Golang语法总结与学习】&#xff0c;对环境搭建和基本语法有介绍&#xff0c;本篇对常用的命令进行学习和梳理&#xff0c;记录下来&#xff0c;方便备查。 一、Go 语言基本命令 // 编译&#xff0c;生成exe文件 go build // 移除…

家庭用户无线上网案例(AC通过三层口对AP进行管理)

组网需求 为一个家庭用户使用的网络架构。该家庭消费用户的上网流量大多是低速流量&#xff0c;例如浏览网页、玩游戏、看视频等。家庭成员使用的无线终端主要为手机、PC、电视机等。终端接入的数量正常情况下在10个以内&#xff0c;偶尔有家庭聚会等特殊情况&#xff0c;终端接…

微服务框架 SpringCloud微服务架构 11 自定义镜像 11.1 镜像结构

微服务框架 【SpringCloudRabbitMQDockerRedis搜索分布式&#xff0c;系统详解springcloud微服务技术栈课程|黑马程序员Java微服务】 SpringCloud微服务架构 文章目录微服务框架SpringCloud微服务架构11 自定义镜像11.1 镜像结构11.1.1 镜像结构11.1.2 总结11 自定义镜像 11…

【人工智能与机器学习】——线性回归、逻辑回归与分类评价指标(学习笔记)

&#x1f4d6; 前言&#xff1a;线性回归&#xff08;Lincar Regression&#xff09;模型是最简单的线性模型之一&#xff0c;简而言之就像一元一次函数&#xff0c;是所有机器学习初学者的起点。而逻辑回归&#xff08;Logistic Regression&#xff09;则稍显复杂&#xff0c;…

微服务架构

单体架构 优点&#xff1a; 1&#xff1a;部署简单: 由于是完整的结构体&#xff0c;可以直接部署在一个服务器上即可。 2&#xff1a;技术单一: 项目不需要复杂的技术栈&#xff0c;往往一套熟悉的技术栈就可以完成开发。 3&#xff1a;用人成本低: 单个程序员可以完成业务接口…

k8s之Pod控制器详解

文章目录一、Pod控制器介绍1、什么是Pod控制器1.2、ReplicaSet(RS)1.3、Deployment(Deploy)1.3.1、重建更新1.3.2、滚动更新1.3.2、版本回退1.3.3、金丝雀发布1.4、Horizontal Pod Autoscaler(HPA)1.5、DaemonSet(DS)1.6、Job1.7、CronJob(CJ)一、Pod控制器介绍 Pod是kubernet…

c语言结构体看这篇文章就够啦(详细介绍结构体)

前言&#xff1a; c语言两大重要点&#xff0c;一个是指针&#xff0c;另一个就是结构体啦&#xff0c;这篇文章我将全面的介绍一下结构体&#xff0c;和他的使用&#xff0c;相信大家看完这篇以后定能对结构体有个深入的理解&#xff0c;并且会正确的使用它。 &#x1f49e; &…

智工教育:注册计量师一级和二级的科目一样吗?

注册计量师有二级、一级之分&#xff0c;其中二级考试所涉及的科目为《计量法律法规及综合知识》《计量专业实务与案例分析》&#xff0c;必须要在连续2年内考过&#xff0c;否则就要重新报考。 而一级的考试科目&#xff0c;则分别是《计量法律法规及综合知识》、《测量数据处…

H2/H∞半车悬架控制仿真分析

目录 前言 1.悬架模型 2.LMI求解 3.simulink仿真分析 3.1结论 前言 对于H2/H∞控制的鲁棒项相比不用多说&#xff0c;之前也写过两篇关于1/4车的H2/H∞控制文章&#xff0c;链接如下&#xff1a; 基于LMI的车辆主动悬架控制_Mr. 邹的博客-CSDN博客 基于MATLAB/Simulink的…

01-redis篇 两种数据储存持久化方式

目录 1. 背景: 2. 两种数据持久化方式 2.1 RDB存储机制 -> (1) 配置docker版redis -> (2) rdb默认开启, 配置如下 redis.conf 打开 ->(3)安全退出的模式 ->(4) save与bgsave 2.2 AOF持久化机制 ->(1) 修改redis.conf 开启aof储存机制 ->(2) 为什么要…