flink重温笔记(十):Flink 高级 API 开发——flink 四大基石之 State(涉及Checkpoint)

news2024/11/16 9:33:24

Flink学习笔记

前言:今天是学习 flink 的第 10 天啦!学习了 flink 四大基石之 State (状态),主要是解决大数据领域增量计算的效果,能够保存已经计算过的结果数据状态!重点学习了 state 的类型划分和应用,以及 TTL 原理和应用,即数据状态也会过期和定期清除的问题,以及广播流数据的企业应用场景,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

Tips:广州回南天色佳,学习 state 意更浓。心随知识飘然去,智慧之舟破浪中。越来越有状态,明天也要继续努力!


文章目录

  • Flink学习笔记
    • 三、Flink 高级 API 开发
      • 3. State
        • 3.1 State 应用场景
        • 3.2 State 类型划分
          • 3.2.1 Keyed State 键控状态
            • (1) 特点
            • (2) 保存的数据结构
            • (3) 案例演示
          • 3.2.1 Operate State 算子状态
            • (1) 特点
            • (2) 保存的数据结构
            • (3) 案例演示
        • 3.3 State TTL 状态有效期
          • 3.3.1 功能用法
            • (1) TTL 的更新策略
            • (2) 状态数据过期但未被清除时
            • (3) 过期数据的清除
          • 3.3.2 案例演示
        • 3.4 Broadcast State
          • 3.4.1 应用场景
          • 3.4.2 注意事项
          • 3.4.3 案例演示
          • 3.4.4 BroadcastState 执行思路梳理

三、Flink 高级 API 开发

3. State

简介:State(状态)是基于 Checkpoint(检查点)来完成状态持久化,在 Checkpoint 之前,State 是在内存中(变量),在 Checkpoint 之后,State 被序列化永久保存,支持存储方式:File,HDFS,S3等。

3.1 State 应用场景
  • (1)去重
  • (2)窗口计算
  • (3)机器学习/深度学习
  • (4)访问历史数据

3.2 State 类型划分
  • 基本类型划分
    • Keyed State(键控状态)
    • Operate State(算子状态)
  • 存在方式划分
    • raw State (原始状态):原始状态
    • managed State(托管状态):Flink 自动管理的 State,实际生产推荐使用
  • 原始状态和托管状态区别:
状态管理方式数据结构使用场景
Managed StateFlink Runtime 管理自动存储,自动恢复内存管理可自动优化Value,List,Map…大多数情况下均可使用
Raw State需要用户自己管理,需要自己序列化字节数组自定义Operator时使用
3.2.1 Keyed State 键控状态
(1) 特点
  • 1- 只能用于 keyby 后的数据流
  • 2- 一个 key 只能属于 一个key State
(2) 保存的数据结构
  • Keyed State 通过 RuntimeContext 访问,这需要 Operator 是一个RichFunction。
  • 1- ValueState:单值状态
  • 2- ListState:列表状态
  • 3- ReducingState:传入reduceFunction,单一状态值
  • 4- MapState<UK, UV>:状态值为 map
(3) 案例演示

例子:词频统计,不要用 sum,而是用 reduce,然后 ValueState

package cn.itcast.day10.state;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-03-03 16:01:44
 * @description TODO:演示 keyedState 的使用
 */
public class KeyedStateDemo {
    public static void main(String[] args) throws Exception {
        //todo 1)创建flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //todo 2)开启checkpoint
        env.enableCheckpointing(5000);

        //todo 3)构建数据源
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        //todo 4)单词拆分
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndNum = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String[] data = line.split(",");
                return Tuple2.of(data[0], Integer.parseInt(data[1]));
            }
        });

        //todo 5)分流操作
        KeyedStream<Tuple2<String, Integer>, String> keyedDataStream = wordAndNum.keyBy(t -> t.f0);

        //todo 6) 聚合操作(自定义state方式实现)
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduceState = keyedDataStream.reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
            // todo 6.1 定义 state 对象
            private ValueState<Tuple2<String, Integer>> valueState = null;

            // 初始化资源
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                // todo 6.2 实例化 state 对象
                valueState = getRuntimeContext().getState(
                        new ValueStateDescriptor<Tuple2<String, Integer>>(
                                "reduceState",
                                TypeInformation.of(
                                        new TypeHint<Tuple2<String, Integer>>() {
                                        })));
            }

            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                // state 存储的历史数据
                // todo 6.3 获取state 对象
                Tuple2<String, Integer> result = valueState.value();
                if (result == null) {
                    result = Tuple2.of(value1.f0, value1.f1);
                }
                Tuple2<String, Integer> resultSum = Tuple2.of(result.f0, result.f1 + value2.f1);
                // 之前没有重写状态:Tuple2.of(value1.f0, value1.f1 + value2.f1);
                // 因为我们这里的 value1 已经从 state 中获取到 result了,所以重写状态的方法是,就用result!
                // todo 6.4 更新 state 对象
                valueState.update(resultSum);
                return resultSum;
            }

            // 释放资源
            @Override
            public void close() throws Exception {
                super.close();
                // 获取 state 的数据
                Tuple2<String, Integer> value = valueState.value();
                System.out.println("=======释放资源的时候打印 state 数据========");
                System.out.println(value);
            }
        });
        //todo 6)打印测试
        reduceState.print();
        //todo 7)运行
        env.execute();
    }
}

结果:

输入:
hadoop,1
hadoop,2

输出:
8> (hadoop,1)
8> (hadoop,3)

总结:

  • 1- state 类似于一个数据库,可以从中获取之前计算过的数据
  • 2- 定义 state 对象,初始值为 null
  • 3- 实例化 state 对象,getRuntimeContext().getState(new ValueStateDescriptor())
  • 4- 获取 state 中的数据:状态.value()
  • 5- 更新 state 中的数据:状态.update(数据流)

3.2.1 Operate State 算子状态
(1) 特点
  • 1- 一个算子状态仅与一个算子实例绑定
  • 2- 算子状态可以用于所有算子,但常见是 Source
(2) 保存的数据结构
  • Operator State 需要自己实现 CheckpointedFunction 或 ListCheckpointed 接口
  • 1- ListState
  • 2- BroadcastState<K,V>
(3) 案例演示

例子:使用 OperatorState 进行演示基于类似于 kafka 消费数据的功能

package cn.itcast.day10.state;

/**
 * @author lql
 * @time 2024-03-03 17:21:50
 * @description TODO:使用OperatorState进行演示基于类似于kafka消费数据的功能
 */

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * 实现步骤:
 * 1)初始化flink流式程序的运行环境
 * 2)设置并行度为1
 * 3)启动checkpoint
 * 4)接入数据源
 * 5)打印测试
 * 6)启动作业
 */
public class OperatorStateDemo {
    public static void main(String[] args) throws Exception {
        // todo 1) 初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // todo 2) 设置并行度为 1
        env.setParallelism(1);

        // todo 3) 启动checkpoint机制
        env.enableCheckpointing(4000);

        // todo 4) 接入数据源
        DataStreamSource<Integer> source  = env.addSource(new MySourceWithState());

        SingleOutputStreamOperator<String> result = source.map(new MapFunction<Integer, String>() {
            @Override
            public String map(Integer integer) throws Exception {
                return integer.toString();
            }
        });

        //TODO 5)打印测试
        result.printToErr();

        //TODO 6)启动作业
        env.execute();
    }

    private static class MySourceWithState extends RichSourceFunction<Integer> implements CheckpointedFunction {
        //定义成员变量是否循环生成数据
        private Boolean isRunning = true;
        private Integer currentCounter = 0;

        //定义ListState保存结果数据。保存offset的累加值
        private ListState<Integer> listState = null;

        /**
         * 将state中的数据持久化存储到文件中(每4秒钟进行一次快照,将state数据存储到hdfs)
         * @param functionSnapshotContext
         * @throws Exception
         */
        @Override
        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            System.out.println("调用snapshotState方法。。。。。。。。");

            // 清除历史状态存储的历史数据,this引用对象的成员变量
            this.listState.clear();

            //将最新的累加值添加到状态中
            this.listState.add(this.currentCounter);
        }

        /**
         * 初始化 state 对象
         * @param context
         * @throws Exception
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("调用initializeState方法。。。。。。。。");

            //初始化一个listState
            OperatorStateStore stateStore  = context.getOperatorStateStore();

            listState = stateStore.getListState(
                    new ListStateDescriptor<>(
                            "operator-states",
                            TypeInformation.of(new TypeHint<Integer>() {}))
            );

            // 获取历史数据
            for (Integer counter  : this.listState.get()) {
                //将历史存储的累加值取出来赋值给当前累加值变量
                this.currentCounter = counter;
            }

            //清除状态中存储的历史数据
            this.listState.clear();
        }

        /**
         * 生产数据
         * @param sourceContext
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Integer> sourceContext) throws Exception {
            while (isRunning){
                currentCounter ++;
                sourceContext.collect(currentCounter);

                TimeUnit.SECONDS.sleep(1);
                if (this.currentCounter == 10){
                    System.out.println("手动抛出异常"+(1/0));
                }
            }
        }

        /**
         * 取消生产数据
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

结果:

调用initializeState方法。。。。。。。。
1
2
3
4
调用snapshotState方法。。。。。。。。
5
6
7
8
调用snapshotState方法。。。。。。。。
9
10
调用initializeState方法。。。。。。。。
9
调用snapshotState方法。。。。。。。。
10
    
<==10这里出现异常了,所以保存state中的数据是9,
    初始化方法后恢复 9+1 数据,然后 10 也保存到state中了==>
    
调用initializeState方法。。。。。。。。
10
调用snapshotState方法。。。。。。。。
调用initializeState方法。。。。。。。。
11
12
13
14
调用snapshotState方法。。。。。。。。
15
16
17

总结:

  • 1- 定义数据源要继承 RichSourceFunction 父类,实现 CheckpointedFunction 接口
  • 2- 初始化 listState:getOperatorStateStore.getListState(new ListStateDescriptor<>)
  • 3- 获取历史数据:因为是listState,所以用循环 get()
  • 4- 历史值赋予当前值,清空历史数据 clear()
  • 5- 检查点之后,恢复数据就是 [ 历史数据 + 1 ]

3.3 State TTL 状态有效期

举例子:更新策略着眼于是更新日期是在哪个时候,

​ 而这里设置停留时间 .newBuilder(Time.seconds(1)) 是指状态保存多长时间,

​ 时间一过状态数据就标记过期(设置时间要比 checkpoint 时间长,才能保证 checkpoint 顺利持久化),

清除策略着眼于过期数据清理是在哪个时候

应用场景:使用 flink 进行实时计算中,会遇到一些状态数不断累积,导致状态量越来越大的情形


3.3.1 功能用法
(1) TTL 的更新策略
  • 只在创建和写入时更新:.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

  • 读取和写入时也会更新:.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)

(2) 状态数据过期但未被清除时
  • 不返回过期数据:.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  • 返回未被清除的过期数据:.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
(3) 过期数据的清除
  • 关闭后台自动清理过期数据:disableCleanupInBackground()

  • 全量快照时清理:cleanupFullSnapshot()

    • 这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。
  • 增量快照时清理:.cleanupIncrementally(10, true)

    • 第一个是每次清理时检查状态的条目数,在每个状态访问时触发。

      第二个参数表示是否在处理每条记录时触发清理。

  • 后台自动清理(RocksDB state backend 存储):.cleanupInRocksdbCompactFilter(1000)

    • 默认后台清理策略会每处理 1000 条数据进行一次,

    • 表示在 RocksDB 的compaction过程中,每删除1000个键值对,就会执行一次TTL过期的键值对的清理。

    • RocksDB 的 compaction是一个过程,它会合并多个小的数据库文件(SSTables)成一个大的文件


3.3.2 案例演示

例子1:10s 读取一行数据,checkpoint 60s,state 时间 5 s

package cn.itcast.day10.state;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.concurrent.TimeUnit;

/**
 * @author lql
 * @time 2024-03-04 18:53:52
 * @description TODO
 */
public class StateWordCount {
    public static void main(String[] args) throws Exception {
        // todo 1) 使用工具类将传入的参数解析为对象
        final ParameterTool parameters = ParameterTool.fromArgs(args);

        // todo 2) 初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        // todo 3) 将传入的参数解析成参数注册到作业中
        env.getConfig().setGlobalJobParameters(parameters);

        // todo 4) 开启 checkpoint,一致性语义
        env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);

        // todo 5) 接入数据源
        DataStreamSource<String> lines = env.addSource(new SourceFunctionFile());

        lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] data = s.split(" ");
                for (String item : data) {
                    collector.collect(new Tuple2<>(item, 1));
                }
            }
        }).keyBy(t -> t.f0)
                .flatMap(new WordCountFlatMap()).printToErr();

        // todo 6)启动作业
        env.execute();
    }

    private static class SourceFunctionFile extends RichSourceFunction<String> {

        // 定义是否继续生成数据标记
        private Boolean isRunning = true;

        @Override
        public void run(SourceContext<String> sourceContext) throws Exception {
            BufferedReader bufferedReader = new BufferedReader(new FileReader("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\wordcount.txt"));

            while (isRunning) {
                String line = bufferedReader.readLine();
                if (StringUtils.isBlank(line)) {
                    continue;
                }
                sourceContext.collect(line);
                TimeUnit.SECONDS.sleep(10);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    // 自定义聚合
    private static class WordCountFlatMap extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

        // 定义状态
        private ValueState<Tuple2<String, Integer>> valueState = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            //创建ValueStateDescriptor
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("StateWordCount",
                    TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
                    }));

            //配置stateTTL
            StateTtlConfig ttlConfig = StateTtlConfig
                    .newBuilder(Time.seconds(5))  // 存活时间
                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 创建或更新的时候修改时间
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 永不返回过期的数据
                    .cleanupFullSnapshot() // 全量快照时进行清理
                    .build();

            // 激活 stateTTL
            valueStateDescriptor.enableTimeToLive(ttlConfig);

            // 实例化 valueState 对象
            valueState = getRuntimeContext().getState(valueStateDescriptor);

        }

        @Override
        public void close() throws Exception {
            super.close();
        }

        @Override
        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> collector) throws Exception {

            // 获取状态数据
            Tuple2<String, Integer> currentState = valueState.value();

            // 初始化 valueState 数据
            if (currentState == null) {
                currentState = new Tuple2<>(value.f0,0);
            }
            // 累加单词的次数
            Tuple2<String, Integer> newState = new Tuple2<>(currentState.f0, currentState.f1 + value.f1);

            // 更新valueState
            valueState.update(newState);

            // 返回累加后的结果
            collector.collect(newState);
        }
    }
}

数据源:

Total time BUILD SUCCESS
Final Memory Finished at
Total time BUILD SUCCESS
Final Memory Finished at
Total time BUILD SUCCESS
Final Memory Finished at
BUILD SUCCESS
BUILD SUCCESS
BUILD SUCCESS
BUILD SUCCESS
BUILD SUCCESS

结果:没有累加历史值

======110s=====
(Total,1)
(time,1)
(BUILD,1)
(SUCCESS,1)

======210s=====
(Final,1)
(Memory,1)
(Finished,1)
(at,1)
    
======310s=====
(Total,1)
(time,1)
(BUILD,1)
(SUCCESS,1)

例子2:10s 读取一行数据,checkpoint 5s,state 时间 20 s 或者 21s

结果:触发累加历史值

======110s=====
(Total,1)
(time,1)
(BUILD,1)
(SUCCESS,1)

======210s=====
(Final,1)
(Memory,1)
(Finished,1)
(at,1)
    
======310s=====
(Total,2)
(time,2)
(BUILD,2)
(SUCCESS,2)

例子3:10s 读取一行数据,checkpoint 5s,state 时间 19s

结果:没有累加历史值


  • 总结1: 从例子1-3,因为第1个Total距第2个Total 20s,故 state 设置要尽可能大20s,设置太小来不及遇见第二个就过期了。


例子4:10s 读取一行数据,checkpoint 10s,state 时间 20 s

结果:触发累加历史值


例子5:10s 读取一行数据,checkpoint 60s,state 时间 20s

结果:触发累加历史值


例子6:10s 读取一行数据,checkpoint 20s,state 时间 20 s

结果:没有累加历史值


  • 总结2:从例子 4-6,checkpoint 比 state 建议小很多或大很多,不然 [10-20] 有些不能触发,[20-60] 有些能触发。


例子7:10s 读取一行数据,checkpoint 19s,state 时间 20s(之前不能触发)

(此时设置.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) // 返回过期但未被清除的数据)

结果:触发累加历史值


例子8:10s 读取一行数据,checkpoint 60s,state 时间 5 s(之前state小于20s不能触发)

结果:触发累加历史值


例子9:10s 读取一行数据,checkpoint 60s,state 时间 20s(之前可以触发)

(此时设置.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) // 返回过期但未被清除的数据)

结果:触发累加历史值


  • 总结3:从例子7-9,设置 ReturnExpiredIfNotCleanedUp 之前可以触发的不可以触发的,都可以触发了!


总结:

  • 1- state 设置要尽可能大于相同数据间隔s,设置太小来不及遇见第二个就过期了。
  • 2- checkpoint 比 state 建议小很多或大很多
  • 3- 设置 ReturnExpiredIfNotCleanedUp 可以避免不能触发
  • 4- BufferedReader 去 new 一个 FileReader 可以读取文件
  • 5- 激活 stateTTL:valueStateDescriptor.enableTimeToLive(ttlConfig)

3.4 Broadcast State
3.4.1 应用场景
  • 1-动态更新计算规则: 如事件流需要根据最新规则进行计算,可将规则作为广播状态广播到下游Task。
  • 2-实时增加额外字段: 如事件流需要实时增加用户基础信息,可将基础信息作为广播状态到下游Task。

3.4.2 注意事项
  • Broadcast State 是 Map 类型,即 K-V 类型

  • Broadcast State 只有在广播的一侧,即在重写方法:processBroadcastElement 方法中可修改,另一个方法只读

  • Broadcast State 在 checkpoint 时,每个 Task 都会 checkpoint(持久化)广播状态。

  • Broadcast State 在运行时保存在内存中,(flink 1.13)还不能保存在 Rocked State Backend


3.4.3 案例演示

例子:公司有10个广告位, 其广告的内容(描述和图片)会经常变动(广告到期,更换广告等)

package cn.itcast.day10.state;

/**
 * @author lql
 * @time 2024-03-05 13:54:54
 * @description TODO
 */

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * 广播状态流演示
 * 需求:公司有10个广告位, 其广告的内容(描述和图片)会经常变动(广告到期,更换广告等)
 * 实现:
 * 1)通过socket输入广告id(事件流)
 * 2)关联出来广告的信息打印出来,就是广告发生改变的时候,能够感知到(规则流)
 */
public class BroadcastStateDemo {
    public static void main(String[] args) throws Exception {
        //todo 1)初始化flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //todo 2) 设置checkpoint周期运转
        env.enableCheckpointing(5000L);

        //todo 3) 构建数据流 (我输入的是字符串数字,需要转化为整数类型)
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
        SingleOutputStreamOperator<Integer> adIdDataStream = lines.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value);
            }
        });

        // todo 4) 构建规则流(广告流)
        DataStreamSource<Map<Integer, Tuple2<String, String>>> adSourceStream = env.addSource(new MySourceForBroadcastFunction());
        adSourceStream.print("最新的广告信息>>>");

        // todo 5) 将规则流(广告流)转化为广播流==> 这里和之前的广播是不一样的!
        // todo 5.1 定义规则流(广告流)的描述器
        MapStateDescriptor<Integer, Tuple2<String, String>> mapStateDescriptor = new MapStateDescriptor<Integer, Tuple2<String, String>>(
                "broadcaststate",
                TypeInformation.of(new TypeHint<Integer>() {}),
                TypeInformation.of(new TypeHint<Tuple2<String,String>>(){})
        );
        // todo 5.2 运用描述器将(广告流)转化为广播流
        BroadcastStream<Map<Integer, Tuple2<String, String>>> broadcastStream = adSourceStream.broadcast(mapStateDescriptor);

        // todo 6) 将数据流和规则流合并在一起
        BroadcastConnectedStream<Integer, Map<Integer, Tuple2<String, String>>> connectedStream = adIdDataStream.connect(broadcastStream);

        // todo 7) 对关联后的数据做拉宽操作
        SingleOutputStreamOperator<Tuple2<String, String>> result = connectedStream.process(new MyBroadcastProcessFunction());

        // todo 8) 打印结果数据
        result.printToErr("拉宽后的结果>>>");

        //todo 9)启动作业
        env.execute();
    }

    /**
     * 自定义规则数据,注意 返回类型是 Map<K,V>
     */
    private static class MySourceForBroadcastFunction implements SourceFunction<Map<Integer, Tuple2<String,String>>> {

        private final Random random = new Random();
        private final List<Tuple2<String, String>> ads = Arrays.asList(
                Tuple2.of("baidu", "搜索引擎"),
                Tuple2.of("google", "科技大牛"),
                Tuple2.of("aws", "全球领先的云平台"),
                Tuple2.of("aliyun", "全球领先的云平台"),
                Tuple2.of("腾讯", "氪金使我变强"),
                Tuple2.of("阿里巴巴", "电商龙头"),
                Tuple2.of("字节跳动", "靠算法出名"),
                Tuple2.of("美团", "黄色小公司"),
                Tuple2.of("饿了么", "蓝色小公司"),
                Tuple2.of("瑞幸咖啡", "就是好喝")
        );
        private boolean isRun = true;

        @Override
        public void run(SourceContext<Map<Integer, Tuple2<String, String>>> sourceContext) throws Exception {
            while (isRun){
                // 定义一个 HashMap,用来存储键值对
                HashMap<Integer, Tuple2<String,String>> map = new HashMap<>();
                int keyCounter = 0;
                for (int i = 0; i < ads.size(); i++) {
                    keyCounter++;
                    map.put(keyCounter,ads.get(random.nextInt(ads.size())));
                }
                sourceContext.collect(map);
                TimeUnit.SECONDS.sleep(5L);
            }
        }

        @Override
        public void cancel() {
            isRun = false;
        }
    }

    private static class MyBroadcastProcessFunction extends BroadcastProcessFunction<Integer,Map<Integer, Tuple2<String, String>>, Tuple2<String, String>> {

        //定义state的描述器
        MapStateDescriptor<Integer, Tuple2<String, String>> mapStateDescriptor = new MapStateDescriptor<Integer, Tuple2<String, String>>(
                "broadcaststate",
                TypeInformation.of(new TypeHint<Integer>() {}),
                TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})
        );

        /**
         * 这个方法只读,用来拉宽操作
         * @param integer
         * @param readOnlyContext
         * @param collector
         * @throws Exception
         */
        @Override
        public void processElement(Integer integer, ReadOnlyContext readOnlyContext, Collector<Tuple2<String, String>> collector) throws Exception {
            //只读操作,意味着只能读取数据,不能修改数据,根据广告id获取广告信息
            ReadOnlyBroadcastState<Integer, Tuple2<String, String>> broadcastState = readOnlyContext.getBroadcastState(mapStateDescriptor);
            //根据广告id获取广告信息
            Tuple2<String, String> tuple2 = broadcastState.get(integer);
            //判断广告信息是否关联成功
            if(tuple2 != null) {
                collector.collect(tuple2);
            }
        }

        /**
         * 可写的,用来更新state的数据
         * @param integerTuple2Map
         * @param context
         * @param collector
         * @throws Exception
         */
        @Override
        public void processBroadcastElement(Map<Integer, Tuple2<String, String>> integerTuple2Map, Context context, Collector<Tuple2<String, String>> collector) throws Exception {
            //先读取state数据
            BroadcastState<Integer, Tuple2<String, String>> broadcastState = context.getBroadcastState(mapStateDescriptor);
            // 删除历史状态数据
            broadcastState.clear();
            //将最新获取到的广告信息进行广播操作
            broadcastState.putAll(integerTuple2Map);
        }
    }
}

结果:

最新的广告信息>>>:7> {1=(aws,全球领先的云平台), 2=(aliyun,全球领先的云平台), 3=(阿里巴巴,电商龙头), 4=(aws,全球领先的云平台), 5=(瑞幸咖啡,就是好喝), 6=(瑞幸咖啡,就是好喝), 7=(美团,黄色小公司), 8=(aws,全球领先的云平台), 9=(腾讯,氪金使我变强), 10=(字节跳动,靠算法出名)}

最新的广告信息>>>:8> {1=(美团,黄色小公司), 2=(饿了么,蓝色小公司), 3=(aws,全球领先的云平台), 4=(baidu,搜索引擎), 5=(aws,全球领先的云平台), 6=(baidu,搜索引擎), 7=(美团,黄色小公司), 8=(字节跳动,靠算法出名), 9=(瑞幸咖啡,就是好喝), 10=(腾讯,氪金使我变强)}

======我在终端输入了 3,刚好对应上面一条信息的 3 位置========
拉宽后的结果>>>:4> (aws,全球领先的云平台)

最新的广告信息>>>:1> {1=(饿了么,蓝色小公司), 2=(aliyun,全球领先的云平台), 3=(google,科技大牛), 4=(瑞幸咖啡,就是好喝), 5=(美团,黄色小公司), 6=(baidu,搜索引擎), 7=(google,科技大牛), 8=(google,科技大牛), 9=(google,科技大牛), 10=(腾讯,氪金使我变强)}

最新的广告信息>>>:2> {1=(google,科技大牛), 2=(饿了么,蓝色小公司), 3=(字节跳动,靠算法出名), 4=(aliyun,全球领先的云平台), 5=(aws,全球领先的云平台), 6=(aws,全球领先的云平台), 7=(google,科技大牛), 8=(google,科技大牛), 9=(美团,黄色小公司), 10=(aliyun,全球领先的云平台)}

======我在终端输入了 1,刚好对应上面一条信息的 1 位置========
拉宽后的结果>>>:5> (google,科技大牛)

总结:

  • 1- 这里的规则流转化为广播流操作,和之前广播分区不一样
    • 数据广播到各个分区:数据.Brocast()
    • 数据流转化为广播流:数据.Brocast(描述器)
  • 2- BroadcastProcessFunction 方法中重写的 processElement 只读方法,用来拉宽操作
    • 获取数据+关联数据(判非空后收集)
  • 3- BroadcastProcessFunction 方法中重写的 processBroadcastElement 可修改方法,
    • 读取 state 数据:getBroadcastState()
    • 删除历史数据:clear()
    • 将新的实例添加到广播状态中:putAll()

3.4.4 BroadcastState 执行思路梳理

在这里插入图片描述


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1490874.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ABAP - SALV教程12 显示图标和提示信息

ALV要求字段的值为图标的需求并不多见&#xff0c;一般都用于红黄绿灯&#xff0c;来表示单据的执行状态&#xff0c;添加图标的方式也可以实现红黄绿灯的功能&#xff0c;也可以参考SALV实现红黄绿灯这篇文章&#xff1a;http://t.csdnimg.cn/Dzx7x效果图SAVL列设置为图标图标…

1688淘宝天猫无货源API(商品列表、商品详情、店铺商品、sku)

item_get 获得淘宝商品详情item_get_pro 获得淘宝商品详情高级版item_review 获得淘宝商品评论item_search 按关键字搜索淘宝商品item_search_img 按图搜索淘宝商品&#xff08;拍立淘&#xff09;item_search_shop 获得店铺的所有商品item_search_seller 搜索店铺列表 API公共…

【LeetCode】升级打怪之路 Day 13:优先级队列的应用

今日题目&#xff1a; 23. 合并 K 个升序链表 | LeetCode378. 有序矩阵中第 K 小的元素 | LeetCode373. 查找和最小的 K 对数字 | LeetCode703. 数据流中的第 K 大元素 | LeetCode347. 前 K 个高频元素 | LeetCode 目录 Problem 1&#xff1a;合并多个有序链表 【classic】LC 2…

一些硬件知识(五)

选择MCU时需要考虑以下几个方面&#xff1a;1。首先考虑引脚功能数量是否够用2.其次如果跑RTOS操作系统的话对堆栈有要求3.需要考虑单片机某个功能的极限性能&#xff0c;例如做BLDC驱动板子的时候要求对电机的电流做到精确采样&#xff0c;此时会选用这个方向表现较好的MCU,例…

如何搭建Nacos集群

1.搭建Nacos集群 众所周知&#xff0c;在实际的工作中&#xff0c;Nacos的生成环境下一定要部署为集群状态 其中包含3个nacos节点&#xff0c;然后一个负载均衡器代理3个Nacos。这里负载均衡器可以使用nginx。 我们计划的集群结构&#xff1a; 我就直接在本机上开三个Nacos来搭…

变分推断中的ELBO(证据下界)

一、变分推断简介 变分推理的目标是近似潜在变量(latent variables)在观测变量(observed variables)下的条件概率。解决该问题,需要使用优化方法。在变分推断中,需要使用到的一个重要理论,是平均场理论。 1、平均场理论 来源于物理学,是一种研究复杂多体问题的方法,将…

Rust 中如何解析 JSON?

Rust 中如何解析 JSON? 在本文中&#xff0c;我们将讨论如何在 Rust 中使用 JSON 解析库&#xff0c;以及比较最流行的库及其性能。 JSON 解析基础知识 手动解析 JSON 要开始在 Rust 中使用 JSON&#xff0c;您需要安装一个可以轻松操作 JSON 的库。目前可用的流行crate之一…

07 系统的线性时不变特性

各位看官&#xff0c;大家好&#xff01;本讲为《数字信号处理理论篇》07 系统的线性时不变特性。&#xff08;特别提示&#xff1a;课程内容为由浅入深的特性&#xff0c;而且前后对照&#xff0c;不要跳跃观看&#xff0c;请按照文章或视频顺序进行观看。 从本讲开始开始为大…

【Python】进阶学习:__len__()方法的使用介绍

【Python】进阶学习&#xff1a;__len__()方法的使用介绍 &#x1f308; 个人主页&#xff1a;高斯小哥 &#x1f525; 高质量专栏&#xff1a;Matplotlib之旅&#xff1a;零基础精通数据可视化、Python基础【高质量合集】、PyTorch零基础入门教程&#x1f448; 希望得到您的订…

shadertoy 游戏《来自星尘》摇杆复刻

正确的做法应该是上 noise 而不是叠加 sin 波&#xff0c;不过如果不想麻烦的话叠波还是一个不错的选择&#xff1a;整体效果如下&#xff0c;已经非常形似 直接上链接&#xff1a;Shader - Shadertoy BETA float radiusScale 0.9; float variation(vec2 v1, vec2 v2, float …

KCV(Key Check Value)的作用(验证密钥导入是否正确)与算法(DES/3DES或AES)示例

KCV的作用与算法 KCV&#xff08;Key Check Value&#xff09;的计算通常与加密算法有关&#xff0c;不同算法计算KCV的方式不同。以下是常见算法的KCV计算方法&#xff1a; DES/3DES算法&#xff1a; KCV是通过使用ECB模式的3DES加密8字节’00’来计算的。例如&#xff0c;…

【鸿蒙 HarmonyOS 4.0】弹性布局(Flex)

一、介绍 弹性布局&#xff08;Flex&#xff09;提供更加有效的方式对容器中的子元素进行排列、对齐和分配剩余空间。容器默认存在主轴与交叉轴&#xff0c;子元素默认沿主轴排列&#xff0c;子元素在主轴方向的尺寸称为主轴尺寸&#xff0c;在交叉轴方向的尺寸称为交叉轴尺寸…

Pipy 进化:从可编程代理到应用引擎

网络功能变得越来越复杂&#xff0c;编写和维护的难度提升&#xff1b;新的基于 Pipy 的应用中&#xff0c;Pipy 角色从数据平面变成控制面&#xff0c;需要执行更多复杂非网络的逻辑&#xff1b;从长远来看&#xff0c;Pipy 将更像是一个常见的类似 shell/bash 的系统脚本工具…

如何在MinIO系统中进行配置并结合内网穿透实现公网远程连接上传文件

文章目录 前言1. 创建Buckets和Access Keys2. Linux 安装Cpolar3. 创建连接MinIO服务公网地址4. 远程调用MinIO服务小结5. 固定连接TCP公网地址6. 固定地址连接测试 前言 MinIO是一款高性能、分布式的对象存储系统&#xff0c;它可以100%的运行在标准硬件上&#xff0c;即X86等…

力扣hot100题解(python版33-35题)

33、排序链表 给你链表的头结点 head &#xff0c;请将其按 升序 排列并返回 排序后的链表 。 示例 1&#xff1a; 输入&#xff1a;head [4,2,1,3] 输出&#xff1a;[1,2,3,4]示例 2&#xff1a; 输入&#xff1a;head [-1,5,3,4,0] 输出&#xff1a;[-1,0,3,4,5]示例 3&a…

你知道该如何使用 JS 创建 css 类样式吗?

前言 去年我为公司内部开发了一个浏览器插件&#xff0c;当时为了加快开发进度&#xff0c;我没有选用现成的插件框架&#xff0c;而是直接使用原生 JavaScript 搭配 Rollup 进行打包。由于这是一个浏览器插件&#xff0c;我不可避免地需要对页面元素进行操作&#xff0c;比如…

小太阳防倾倒开关原理

小太阳防倾倒开关是一种体积小巧、安装简便、灵敏度高的设备&#xff0c;其原理基于角度感应和光电技术。该开关具有精确的角度判断能力&#xff0c;无需机械接触&#xff0c;稳定性强&#xff0c;支持个性化角度设置&#xff0c;可根据需求进行水平、垂直或倒置安装。 在应用…

注意力机制(代码实现案例)

学习目标 了解什么是注意力计算规则以及常见的计算规则.了解什么是注意力机制及其作用.掌握注意力机制的实现步骤. 1 注意力机制介绍 1.1 注意力概念 我们观察事物时&#xff0c;之所以能够快速判断一种事物(当然允许判断是错误的), 是因为我们大脑能够很快把注意力放在事物…

STM32(16)使用串口向电脑发送数据

发送字节 发送数组 发送字符和字符串 字符&#xff1a; 字符串&#xff1a; 字符串在电脑中以字符数组的形式存储

最新AI系统ChatGPT网站H5系统源码,支持Midjourney绘画

一、前言 SparkAi创作系统是基于ChatGPT进行开发的Ai智能问答系统和Midjourney绘画系统&#xff0c;支持OpenAI-GPT全模型国内AI全模型。本期针对源码系统整体测试下来非常完美&#xff0c;那么如何搭建部署AI创作ChatGPT&#xff1f;小编这里写一个详细图文教程吧。已支持GPT…