大数据之Flink(四)

news2024/12/25 9:32:13

11、水位线

11.1、水位线概念

一般实时流处理场景中,事件时间基本与处理时间保持同步,可能会略微延迟。

flink中用来衡量事件时间进展的标记就是水位线(WaterMark)。水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容是一个时间戳,用来指示当前的事件时间。一般使用某个数据的时间戳作为水位线的时间戳。

水位线特性:

  • 水位线是插入到数据流中的一个标记
  • 水位线主要内容是一个时间戳用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线时间戳单调递增
  • 水位线可通过设置延迟正确处理乱序数据
  • 一个水位线WaterMark(t)表示在当前流中事件时间已经达到了时间戳t,代表t之前的所有数据都到齐了,之后流中不会出现时间戳小于或等于t的数据

以WaterMark等2s为例:
在这里插入图片描述
**注意:**flink窗口并不是静态准备好的,而是动态创建的,当有罗在这个窗口区间范围的数据达到时才创建对应的窗口。当到达窗口结束时间后窗口就触发计算并关闭,触发计算和窗口关闭两个行为也是分开的。

11.2、生成水位线
11.2.1、原则

要性能就设置低水位线或不设置水位线,直接使用处理时间语义可得到最低的延迟,但有可能遗漏数据。

如要保证数据全部到齐可以设置高水位线,但会影响性能,计算会有延迟。

11.2.2、内置水位线

1、有序流中内置水位线设置

直接调用

package waterMark;

import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;


public class WatermarkMonoDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } )
                .assignTimestampsAndWatermarks(
                        //指定WaterMark策略
                        WatermarkStrategy
                                //升序的WaterMark,没有等待时间
                                .<WaterSensor>forMonotonousTimestamps()
                                //指定时间戳分配器
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<WaterSensor>(){
                                            @Override
                                            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                                //返回的时间戳为毫秒
                                                System.out.println("数据="+element+"recordTS="+recordTimestamp);
                                                return element.getTs()*1000L;
                                            }
                                        }
                                )
                );
        SingleOutputStreamOperator<String> process = dataStreamSource
                .keyBy(value -> value.getId())
                //要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long start = context.window().getStart();
                long end = context.window().getEnd();
                String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
                String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
                long l = elements.spliterator().estimateSize();
                out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
            }
        });
        process.print();
        env.execute();

    }
}

2、乱序流中内置水位线设置

设置等待时间为2秒,即12秒时触发窗口关闭

package waterMark;

import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @Title:
 * @Author lizhe
 * @Package sink
 * @Date 2024/6/5 21:57
 * @description:
 */
public class WatermarkOutOfOrdernessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } )
                .assignTimestampsAndWatermarks(
                        //指定WaterMark策略
                        WatermarkStrategy
                                //乱序的WaterMark,有等待时间
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                //指定时间戳分配器
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<WaterSensor>(){
                                            @Override
                                            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                                //返回的时间戳为毫秒
                                                System.out.println("数据="+element+"recordTS="+recordTimestamp);
                                                return element.getTs()*1000L;
                                            }
                                        }
                                )
                );
        SingleOutputStreamOperator<String> process = dataStreamSource
                .keyBy(value -> value.getId())
                //要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long start = context.window().getStart();
                long end = context.window().getEnd();
                String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
                String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
                long l = elements.spliterator().estimateSize();
                out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
            }
        });
        process.print();
        env.execute();

    }
}

结果:
在这里插入图片描述
可见当发送数据=WaterSensor{id=‘s1’, ts=12, vc=12}recordTS=-9223372036854775808时使得[0,10)窗口关闭,但是WaterSensor{id=‘s1’, ts=12, vc=12}不会在[0,10)窗口中,而是在[10,20)窗口中。

11.2.3、内置WaterMark生成原理
  • 都是周期性生成的,默认是200ms
  • 有序流:WaterMark=当前最大的事件时间-1ms
  • 乱序流:WaterMark=当前最大的事件时间-延迟时间-1ms
11.3、水位线的传递
11.3.1、多并行度下水位线传递

水位线传递以最小的WaterMark为准,否则提前触发关窗造成数据丢失。
在这里插入图片描述
演示WaterMark多并行度下的传递

package waterMark;

import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @Title:
 * @Author lizhe
 * @Package sink
 * @Date 2024/6/5 21:57
 * @description:
 */
public class WatermarkOutOfOrdernessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
        //演示WaterMark多并行度下的传递
        env.setParallelism(2);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } )
                .assignTimestampsAndWatermarks(
                        //指定WaterMark策略
                        WatermarkStrategy
                                //乱序的WaterMark,有等待时间
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                //指定时间戳分配器
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<WaterSensor>(){
                                            @Override
                                            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                                //返回的时间戳为毫秒
                                                System.out.println("数据="+element+"recordTS="+recordTimestamp);
                                                return element.getTs()*1000L;
                                            }
                                        }
                                )
                );
        SingleOutputStreamOperator<String> process = dataStreamSource
                .keyBy(value -> value.getId())
                //要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long start = context.window().getStart();
                long end = context.window().getEnd();
                String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
                String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
                long l = elements.spliterator().estimateSize();
                out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
            }
        });
        process.print();
        env.execute();

    }
}

结果:
在这里插入图片描述
在多并行度下,增加了一个WaterMark的更新操作。当数据WaterSensor{id=‘s1’, ts=12, vc=12}到来时,一个WaterMark,5-2=3,一个WaterMark是12-2=10,因WaterMark取小原则WaterMark是3未更新为10。当数据WaterSensor{id=‘s1’, ts=13, vc=13}到来,WaterMark更新为10,进而触发窗口关闭。

结论:在多并行度下,当触发WaterMark的下一条数据到来时才能进行关窗操作。

11.3.2、水位线空闲等待设置

在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个座位当前任务的事件时钟,就会导致当前Task的水位线无法推进,从而导致窗口无法触发。这时候可以设置空闲等待。

package waterMark;

import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @Title:
 * @Author lizhe
 * @Package 
 * @Date 2024/6/5 21:57
 * @description:
 */
public class WatermarkIdlenessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        SingleOutputStreamOperator<Integer> streamOperator = env
                .socketTextStream("192.168.132.101", 7777)
                //自定义分区器,数据%分区数,只输入奇数,都只会去往一个子任务
                .partitionCustom(new Partitioner<String>() {
                    @Override
                    public int partition(String key, int numPartitions) {
                        return Integer.parseInt(key) % numPartitions;
                    }
                }, value -> value)
                .map(value -> Integer.parseInt(value))
                .assignTimestampsAndWatermarks(
                        //指定WaterMark策略
                        WatermarkStrategy
                                .<Integer>forMonotonousTimestamps()
                                .withTimestampAssigner((r, ts) -> r * 1000
                                )
                                //空闲等待5s
                                .withIdleness(Duration.ofSeconds(5))
                );
        //分成两组:奇数一组,偶数一组,开10s的事件时间滚动窗口
        streamOperator
                .keyBy(value -> value%2)
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .process(new ProcessWindowFunction<Integer, String, Integer, TimeWindow>() {
                    @Override
                    public void process(Integer integer, Context context, Iterable<Integer> elements, Collector<String> out) throws Exception {
                        long start = context.window().getStart();
                        long end = context.window().getEnd();
                        String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
                        String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
                        long l = elements.spliterator().estimateSize();
                        out.collect("key=" + integer + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
                    }
                }).print();


        env.execute();

    }
}

11.4、迟到数据处理
11.4.1、推迟WaterMark推进

在WaterMark产生时设置一个乱序容忍度,推迟系统时间的推进,保证窗口计算被延迟执行,为乱序的数据争取更多时间进入窗口。

forBoundedOutOfOrderness(Duration.ofSeconds(2))
11.4.2、设置窗口延迟关闭

flink的窗口允许迟到数据。当触发窗口计算后会先计算当前结果,但此时并不会关闭窗口。以后每来一条数据就触发一次窗口计算(增量计算)。直到WaterMark超过了窗口结束时间+推迟时间,窗口才会关闭。

.allowedLateness(Time.seconds(2))
package waterMark;

import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @Title:
 * @Author lizhe
 * @Package 
 * @Date 2024/6/5 21:57
 * @description:
 */
public class WatermarkAllowLatenessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } )
                .assignTimestampsAndWatermarks(
                        //指定WaterMark策略
                        WatermarkStrategy
                                //乱序的WaterMark,有等待时间
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                //指定时间戳分配器
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<WaterSensor>(){
                                            @Override
                                            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                                //返回的时间戳为毫秒
                                                System.out.println("数据="+element+"recordTS="+recordTimestamp);
                                                return element.getTs()*1000L;
                                            }
                                        }
                                )
                );
        SingleOutputStreamOperator<String> process = dataStreamSource
                .keyBy(value -> value.getId())
                //要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //推迟2秒关窗
                .allowedLateness(Time.seconds(2))
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long start = context.window().getStart();
                long end = context.window().getEnd();
                String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
                String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
                long l = elements.spliterator().estimateSize();
                out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
            }
        });
        process.print();
        env.execute();

    }
}

11.4.3、使用侧流接收迟到数据

使用.sideOutputLateData()函数将迟到数据放到侧输出流

package waterMark;

import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.lang.reflect.Type;
import java.time.Duration;

/**
 * @Title:
 * @Author lizhe
 * @Package sink
 * @Date 2024/6/5 21:57
 * @description:
 */
public class WatermarkAllowLatenessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } )
                .assignTimestampsAndWatermarks(
                        //指定WaterMark策略
                        WatermarkStrategy
                                //乱序的WaterMark,有等待时间
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                //指定时间戳分配器
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<WaterSensor>(){
                                            @Override
                                            public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                                //返回的时间戳为毫秒
                                                System.out.println("数据="+element+"recordTS="+recordTimestamp);
                                                return element.getTs()*1000L;
                                            }
                                        }
                                )
                );
        OutputTag<WaterSensor> outputTag = new OutputTag<>("late-data", Types.POJO(WaterSensor.class));
        SingleOutputStreamOperator<String> process = dataStreamSource
                .keyBy(value -> value.getId())
                //要使用事件语义的窗口TumblingEventTimeWindows,WaterMark才能起作用,不能用TumblingProcessingTimeWindows
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                //推迟2秒关窗
                .allowedLateness(Time.seconds(2))
                //关窗后的迟到数据放到侧输出流
                .sideOutputLateData(outputTag)
                .process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long start = context.window().getStart();
                long end = context.window().getEnd();
                String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
                String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
                long l = elements.spliterator().estimateSize();
                out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
            }
        });
        process.print();
        process.getSideOutput(outputTag).print("侧输出流");
        env.execute();

    }
}

11.4.4、总结
  1. 乱序与迟到的区别:

    **乱序:**数据的顺序乱了,出现时间早的比时间晚的晚来

    **迟到:**数据的时间戳<当前的WaterMark

  2. 乱序与迟到数据的处理

    • 在WaterMark中指定乱序等待时间
    • 如果开窗设置窗口允许迟到
    • 关窗后的迟到数据放入侧输出流
  3. WaterMark等待时间与窗口允许迟到时间并不能等同和替换

    WaterMark涉及到窗口第一次计算时间,WaterMark等待时间过长会导致计算延迟变大。

    窗口允许迟到时间只是要保证计算结果更加准确,但不应影响数据计算延迟。

    所以二者不能等价代替

  4. WaterMark等待时间与窗口允许迟到时间设置经验

    WaterMark等待时间不能设置过大,一般秒级。窗口允许迟到时间只考虑大部分的迟到数据。极端情况小部分迟到数据使用侧输出流。

    12、基于时间的合流

上面提到的connect合流可满足大部分需求。但统计固定时间内两条流数据的匹配情况,对于connect要使用自定义,但可以使用更简单的Window来表示,flink 内置了API。

12.1、窗口联结Window Join
  1. 落在同一个时间窗口范围内才能匹配
  2. 根据keyby的key来进行匹配关联
  3. 只能拿到匹配上的数据,类似有固定时间范围的inner join
package waterMark;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @Title: WindowJoinDemo
 * @Author lizhe
 * @Package Window Join
 * @Date 2024/6/8 21:11
 * @description:
 */
public class WindowJoinDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
                .fromElements(
                        Tuple2.of("a", 1),
                        Tuple2.of("a", 2),
                        Tuple2.of("b", 3),
                        Tuple2.of("c", 4)

                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Integer>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
                                    @Override
                                    public long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {
                                        return element.f1 * 1000L;
                                    }
                                })
                );
        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env
                .fromElements(
                        Tuple3.of("a", 1,1),
                        Tuple3.of("a", 11,1),
                        Tuple3.of("b", 2,1),
                        Tuple3.of("b", 12,1),
                        Tuple3.of("c", 14,1),
                        Tuple3.of("d", 15,1)


                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Integer>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, Integer> element, long recordTimestamp) {
                                        return element.f1 * 1000L;
                                    }
                                })
                );
        DataStream<String> join = ds1.join(ds2)
                .where(r1 -> r1.f0)
                .equalTo(r2 -> r2.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                    /**
                     * 关联上的数据调用join方法
                     * @param first ds1的数据
                     * @param second ds2的数据
                     * @return
                     * @throws Exception
                     */
                    @Override
                    public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
                        return first + "----" + second;
                    }
                });
        join.print();
        env.execute();
    }
}

12.2、间隔联结Interval Join

有时要处理的时间间隔并不固定。要匹配的数据可能刚开卡在窗口边缘两侧造成匹配失败。所有窗口联结并不能满足要求。

间隔联结的思路是针对一条流的每个数据开辟出其时间戳前后的一段时间间隔指定,上下界的偏移,负号代表时间往前,正号代表时间往后,看这期间是否有来自另一条流的匹配。(只支持事件时间语义)
在这里插入图片描述

package waterMark;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * @Title:
 * @Author lizhe
 * @Package
 * @Date 2024/6/8 21:11
 * @description:
 */
public class IntervalJoinDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
                .fromElements(
                        Tuple2.of("a", 1),
                        Tuple2.of("a", 2),
                        Tuple2.of("b", 3),
                        Tuple2.of("c", 4)

                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Integer>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
                                    @Override
                                    public long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {
                                        return element.f1 * 1000L;
                                    }
                                })
                );
        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env
                .fromElements(
                        Tuple3.of("a", 1,1),
                        Tuple3.of("a", 11,1),
                        Tuple3.of("b", 2,1),
                        Tuple3.of("b", 12,1),
                        Tuple3.of("c", 14,1),
                        Tuple3.of("d", 15,1)


                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Integer>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, Integer> element, long recordTimestamp) {
                                        return element.f1 * 1000L;
                                    }
                                })
                );
        KeyedStream<Tuple2<String, Integer>, String> stream1 = ds1.keyBy(value -> value.f0);
        KeyedStream<Tuple3<String, Integer, Integer>, String> stream2 = ds2.keyBy(value -> value.f0);
        stream1.intervalJoin(stream2)
                .between(Time.seconds(-2),Time.seconds(2))
                .process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                    /**
                     * 两条流的数据匹配上才会调用方法
                     * @param left stream1的数据
                     * @param right stream2的数据
                     * @param ctx 上下文
                     * @param out 采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {
                        //进入这个方法是关联上的数据
                        out.collect(left+"----"+right);
                    }
                })
                .print();
        env.execute();
    }
}

1.17版本支持将该匹配上的迟到数据通过侧输出流输出

如果当前数据的事件时间<当前的WaterMark就是迟到数据,主流的process不处理。

但在between后使用SideOutputLeftLateData(),SideOutputRightLateData()函数将迟到数据放到侧输出流

13、处理函数

DataStream更下层的API,统一称为process算子,接口就是process function(处理函数)

在这里插入图片描述

13.1、基本处理函数

处理函数提供一个定时服务(TimeService),可以通过它访问流中的事件、时间戳、水位线,甚至可以注册定时事件。处理函数集成了AbstractRichFunction,拥有富函数类的所有特性,可以访问状态和其他运行时信息。处理函数可以直接将数据输出的侧输出流。处理函数是最为灵活的处理方法,可实现各种自定义逻辑

分类:

  1. ProcessFunction
  2. KeyedProcessFunction
  3. ProcessWindowFunction
  4. ProcessAllWindowFunction
  5. CoProcessFunction
  6. ProcessJoinFunction
  7. BroadcastProcessFunction
  8. KeyedBroadcastProcessFunction
13.2、按键分区处理函数KeyedProcessFunction

只有在KeyedStream才支持使用TimeService设置定时器。

13.2.1、定时器和定时服务
keyedStream.process(
                new KeyedProcessFunction<String, WaterSensor, String>() {
                    /**
                     * 来一条数据处理一次
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        //数据中提取出来的事件时间,如果没有则为null
                        Long timestamp = ctx.timestamp();
                        //定时器
                        TimerService timerService = ctx.timerService();
                        //注册定时器:处理时间
                        timerService.registerEventTimeTimer(10L);
                        //注册定时器:事件时间
                        timerService.currentProcessingTime();
                        //删除定时器:事件时间
                        timerService.deleteEventTimeTimer(10L);
                        //删除定时器:处理时间
                        timerService.deleteProcessingTimeTimer(10L);
                        //获取当前处理时间,即系统时间
                        timerService.currentProcessingTime();
                        //获取当前WaterMark
                        timerService.currentWatermark();
                    }
                }
        );

事件时间定时器:

package process;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @Title: KeyedProcessTimerDemo
 * @Author lizhe
 * @Package process
 * @Date 2024/6/9 12:29
 * @description:
 */
public class KeyedProcessTimerDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777)
                .map(
                        value -> {
                            String[] datas = value.split(",");
                            return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
                        }
                );
        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(
                        (element, recordTime) -> {
                            return element.getTs() * 1000L;
                        }
                );

        SingleOutputStreamOperator<WaterSensor> operator = singleOutputStreamOperator.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);
        KeyedStream<WaterSensor, String> keyedStream = operator.keyBy(value -> value.getId());
        keyedStream.process(
                new KeyedProcessFunction<String, WaterSensor, String>() {
                    /**
                     * 来一条数据处理一次
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        //数据中提取出来的事件时间,如果没有则为null
                        Long timestamp = ctx.timestamp();
                        //定时器
                        TimerService timerService = ctx.timerService();
                        //注册定时器:处理时间
                        timerService.registerEventTimeTimer(5000L);
                        System.out.println("当前时间"+timestamp+",注册了一个5s的定时器");
                    }

                    /**
                     * 时间进展到定时器注册的时间,调用该方法
                     * @param timestamp 当前时间进展
                     * @param ctx 上下文
                     * @param out 采集器
                     * @throws Exception
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);

                        System.out.println("现在时间"+timestamp + "定时器触发,key为"+ctx.getCurrentKey());
                    }
                }

        )
                .print();
        env.execute();
    }
}

输出:
在这里插入图片描述
TimeService会以key和时间戳作为标准,对定时器去重;即对每个key和时间戳最多只有一个定时器,如果注册了多次,onTimer()方法也将被调用一次。

处理时间定时器:

package process;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @Title: KeyedProcessTimerDemo
 * @Author lizhe
 * @Package process
 * @Date 2024/6/9 12:29
 * @description:
 */
public class KeyedProcessTimerDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777)
                .map(
                        value -> {
                            String[] datas = value.split(",");
                            return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
                        }
                );
        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(
                        (element, recordTime) -> {
                            return element.getTs() * 1000L;
                        }
                );

        SingleOutputStreamOperator<WaterSensor> operator = singleOutputStreamOperator.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);
        KeyedStream<WaterSensor, String> keyedStream = operator.keyBy(value -> value.getId());
        keyedStream.process(
                new KeyedProcessFunction<String, WaterSensor, String>() {
                    /**
                     * 来一条数据处理一次
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        //数据中提取出来的事件时间,如果没有则为null
                        Long timestamp = ctx.timestamp();
                        //定时器
                        TimerService timerService = ctx.timerService();
                        long currentProcessingTime = timerService.currentProcessingTime();
                        timerService.registerProcessingTimeTimer(currentProcessingTime+5000L);
                        System.out.println("当前时间"+currentProcessingTime+",注册了一个5后的定时器,key为"+ctx.getCurrentKey() );

                    }

                    /**
                     * 时间进展到定时器注册的时间,调用该方法
                     * @param timestamp 当前时间进展
                     * @param ctx 上下文
                     * @param out 采集器
                     * @throws Exception
                     */
                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);

                        System.out.println("现在时间"+timestamp + "定时器触发,key为"+ctx.getCurrentKey());
                    }
                }

        )
                .print();
        env.execute();
    }
}

总结:

  1. 事件时间定时器通过WaterMark来触发的,WaterMark>=注册时间。

    注意:

    WaterMark=当前最大事件时间-等待时间-1ms,因为-1ms会推迟一条数据。比如5s的定时器,如果等待=3s,WaterMark=8s-3s-1ms=4999ms,不会触发5s的定时器。需要WaterMark=9s-3s-1ms=5999ms才能触发5s的定时器

  2. 在Process中获取当前的WaterMark显示的是上一次的的WaterMark(因为Process还没接收到这条数据对应生成的新WaterMark)

13.3、应用案例

统计一段时间内出现次数最多的水位。统计10s内出现次数最多的两个水位,这两个水位每5s更新一次。

可使用滑动窗口实现按照不同水位进行统计

后面仔细看吧,可能有问题!!!

package process;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.*;

/**
 * @Title:
 * @Author lizhe
 * @Package process
 * @Date 2024/6/9 12:29
 * @description:
 */
public class TopNDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> operator  = env.socketTextStream("192.168.132.101", 7777)
                .map(
                        value -> {
                            String[] datas = value.split(",");
                            return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
                        }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner(
                                        (element, recordTime) -> {
                                            return element.getTs() * 1000L;
                                        }
                                )
                );
        //1、按照vc分组,开窗聚合(增量计算+全量打标签)
        //开窗聚合后就是普通的流,丢失了窗口信息需要自己打窗口标签(WindowEnd)
        SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> aggregate = operator.keyBy(value -> value.getVc())
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new VcCountAgg(), new WindowResult());

        //2、按照窗口标签keyby,保证同一个窗口时间范围的结果到一起去。排序去TopN
        aggregate.keyBy(value -> value.f2)
                .process(new TopN(2))
                .print();
        env.execute();
    }
    public  static class VcCountAgg implements AggregateFunction<WaterSensor,Integer,Integer>{

        @Override
        public Integer createAccumulator() {
            return 0;
        }

        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            return ++accumulator;
        }

        @Override
        public Integer getResult(Integer accumulator) {
            return accumulator;
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            return null;
        }
    }

    /**
     * 泛型如下:
     * 第一个:输入类型=增量函数的输出 count值
     * 第二个:输出类型=Tuple(vc,count,WindowEnd)带上窗口结束时间的标签
     * 第三个:key类型,vc,Integer
     * 第四个:窗口类型
     */
    public  static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer,Integer,Long>,Integer, TimeWindow>{

        @Override
        public void process(Integer key, Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {
            Integer count = elements.iterator().next();
            long windowsEnd = context.window().getEnd();
            out.collect(Tuple3.of(key,count,windowsEnd));
        }
    }
    public  static class TopN extends KeyedProcessFunction<Long,Tuple3<Integer, Integer, Long>,String>{
        //存不同窗口的统计结果 key=windowEnd value=list数据
        private Map<Long, List< Tuple3<Integer,Integer,Long>>> dataListMap;
        //要取的Top的数量
        private int threshold;

        public TopN(int threshold) {
            dataListMap = new HashMap<>();
            this.threshold = threshold;
        }

        @Override
        public void processElement(Tuple3<Integer, Integer, Long> value, Context ctx, Collector<String> out) throws Exception {
            //进入这个方法只是一条数据,要排序,要存起来,不同的窗口要分开存
            //1、存到HashMap中
            Long windowEnd = value.f2;
            if (dataListMap.containsKey(windowEnd)){
                //1.1 包含vc 不是该vc的第一条,直接加到list中
                List<Tuple3<Integer, Integer, Long>> tuple3List = dataListMap.get(windowEnd);
                tuple3List.add(value);
            }else {
                //1.1 不包含vc是该vc的第一条,需要初始化list
                List<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>();
                dataList.add(value);
                dataListMap.put(windowEnd,dataList);
            }
            //2、注册一个定时器,WindowsEnd+1ms即可(同一个窗口范围应该同时输出的,只不过是一条条调用ProcessElement方法,只需延迟1ms)
            ctx.timerService().registerProcessingTimeTimer(windowEnd+1);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            super.onTimer(timestamp, ctx, out);
            //定时器触发,同一个窗口范围的计算结果攒齐了,开始、排序、取TopN
            Long windowEnd = ctx.getCurrentKey();
            //1、排序
            List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);
            dataList.sort(
                    new Comparator<Tuple3<Integer, Integer, Long>>() {
                        @Override
                        public int compare(Tuple3<Integer, Integer, Long> o1, Tuple3<Integer, Integer, Long> o2) {
                            return o2.f1-o1.f1;
                        }
                    }
            );
            //2、取TopN
            StringBuilder outStr = new StringBuilder();
            outStr.append("==========\n");
            //遍历 排序后的list,取出前threshold个,dataList要是不够dataList个取dataList.size()
            for (int i = 0; i < Math.min(threshold,dataList.size()); i++) {
                Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);
                outStr.append("Top"+(i+1)+"\n");
                outStr.append("vc="+vcCount.f0+"\n");
                outStr.append("count="+vcCount.f1 + "\n");
                outStr.append("窗口结束时间"+ vcCount.f2 + "\n");
            }
            //用完的list及时清理
            dataList.clear();
            out.collect(outStr.toString());
        }
    }
}

13.4、侧输出流

使用侧输出流实现水位告警

package process;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.time.Duration;

/**
 * @Title:
 * @Author lizhe
 * @Package process
 * @Date 2024/6/9 12:29
 * @description:
 */
public class SideOutputDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(
                        (element, recordTime) -> {
                            return element.getTs() * 1000L;
                        }
                );
        SingleOutputStreamOperator<WaterSensor> singleOutputStreamOperator = env.socketTextStream("192.168.132.101", 7777)
                .map(
                        value -> {
                            String[] datas = value.split(",");
                            return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
                        }
                )
                .assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);
        final OutputTag<String> warnTag = new OutputTag<>("warn", Types.STRING);
        SingleOutputStreamOperator<WaterSensor> process = singleOutputStreamOperator.keyBy(value -> value.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {
                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
                                //使用侧输出流告警
                                if (value.getVc() > 10) {
                                    ctx.output(warnTag, "当前水位=" + value.getVc() + ",大于阈值10!");
                                }
                                out.collect(value);
                            }
                        }
                );
        process.print();
        process.getSideOutput(warnTag).printToErr("warn");


        env.execute();
    }
}

14、状态管理

14.1、flink的状态

分为有状态和无状态两种。

无状态的算子任务只要观察每个独立事件,根据当前输入的数据直接转换输出结果。如:map、filter、flatMap。
在这里插入图片描述
有状态算子任务除当前数据外还要其他数据来得到计算结果。“其他数据”就是状态。如:聚合算子、窗口算子。
在这里插入图片描述
状态的分类:

  1. 托管状态和原始状态

    托管状态:由flink统一管理使用时只需要调用相应接口。

    原始状态:自定义的相当于开辟了一块内存自己管理,自己实现状态的序列化和故障恢复。

    通常采用flink托管状态(重点)

  2. 算子状态和按键分区状态

    通过keyby()函数的称为按键分区状态,其他为算子状态

14.2、算子状态

对于一个并行子任务,处理的所有数据都会访问到相同的状态,状态对于同一任务而言是共享的。

算子状态可以用在所有算子上,类似本地变量。

14.3、按键分区状态

状态根据输入流中定义的键来维护和访问,也就keyby后能用。

14.3.1、值状态

状态只保存一个值。

水位相差10则报警

package state;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.time.Duration;

/**
 * @Title: KeyedValueStateDemo
 * @Author lizhe
 * @Package state
 * @Date 2024/6/11 20:54
 * @description:水位相差10则报警
 */
public class KeyedValueStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777)
                .map(
                        value -> {
                            String[] datas = value.split(",");
                            return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
                        }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner(
                                        (element, recordTime) -> {
                                            return element.getTs() * 1000L;
                                        }
                                )
                );
        sensorDS.keyBy(value -> value.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {
                            ValueState<Integer> lastVcState;
                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                ValueStateDescriptor lastVc = new ValueStateDescriptor<Integer>("lastVc", Integer.class);
                                lastVcState=getRuntimeContext().getState(lastVc);

                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();
                                if (Math.abs(value.getVc()-lastVc)>10) {
                                    out.collect("传感器id="+value.getId()+",当前水位值="+value.getVc()+",上一条水水位值="+lastVc+"相差超过10");
                                }
                                lastVcState.update(value.getVc());
                            }
                        }
                ).print();
        env.execute();
    }
}

14.3.2、列表状态

将要保存的数据以列表形式进行保存

针对每种传感器输出最高的三个水位值

package state;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;

/**
 * @Title:
 * @Author lizhe
 * @Package state
 * @Date 2024/6/11 20:54
 * @description:针对每种传感器输出最高的三个水位值
 */
public class KeyedListStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777)
                .map(
                        value -> {
                            String[] datas = value.split(",");
                            return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
                        }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner(
                                        (element, recordTime) -> {
                                            return element.getTs() * 1000L;
                                        }
                                )
                );
        sensorDS.keyBy(value -> value.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {
                            ListState<Integer> vcListState;
                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                 vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Integer.class));

                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                vcListState.add(value.getVc());
                                ArrayList<Integer> arrayList = new ArrayList<>();
                                for (Integer vc : vcListState.get()) {
                                    arrayList.add(vc);
                                }
                                arrayList.sort((o1,o2)->{
                                    return o2-o1;
                                });
                                if (arrayList.size() > 3) {
                                    arrayList.remove(3);
                                }
                                out.collect("传感器id="+value.getId()+",最大三个水位值="+arrayList.toString());
                                vcListState.update(arrayList);
                            }
                        }
                ).print();
        env.execute();
    }
}

14.3.3、map状态

把键值对最为状态保存起来

统计每种传感器每种水位值出现的次数

package state;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;

/**
 * @Title:
 * @Author lizhe
 * @Package state
 * @Date 2024/6/11 20:54
 * @description:统计每种传感器每种水位值出现的次数
 */
public class KeyedMapStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777)
                .map(
                        value -> {
                            String[] datas = value.split(",");
                            return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
                        }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner(
                                        (element, recordTime) -> {
                                            return element.getTs() * 1000L;
                                        }
                                )
                );
        sensorDS.keyBy(value -> value.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {
                            MapState<Integer,Integer> vcCountMapState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Integer.class, Integer.class));
                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                Integer vc = value.getVc();
                                if (vcCountMapState.contains(vc)){
                                    int count = vcCountMapState.get(vc) ;
                                    vcCountMapState.put(vc,++count);
                                }else {
                                    vcCountMapState.put(vc, 1);
                                }
                                StringBuilder outStr = new StringBuilder();
                                outStr.append("传感器id为"+value.getId()+"\n");
                                for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {
                                    outStr.append(vcCount.toString()+"\n");
                                }
                                outStr.append("==============");
                                out.collect(outStr.toString());
                            }
                        }
                ).print();
        env.execute();
    }
}

14.3.4、规约状态

对添加的数据进行规约,将规约聚合后的值作为状态保存

计算每种传感器的水位和

package state;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @Title:
 * @Author lizhe
 * @Package state
 * @Date 2024/6/11 20:54
 * @description:计算每种传感器的水位和
 */
public class KeyedReduceStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777)
                .map(
                        value -> {
                            String[] datas = value.split(",");
                            return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
                        }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner(
                                        (element, recordTime) -> {
                                            return element.getTs() * 1000L;
                                        }
                                )
                );
        sensorDS.keyBy(value -> value.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {
                            ReducingState<Integer> vcSum;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                vcSum = getRuntimeContext().getReducingState(new ReducingStateDescriptor<Integer>("vcSum", new ReduceFunction<Integer>() {
                                    @Override
                                    public Integer reduce(Integer value1, Integer value2) throws Exception {
                                        return value1 + value2;
                                    }
                                }, Integer.class));
                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                vcSum.add(value.getVc());
                                out.collect("传感器id="+value.getId()+"水位值和="+vcSum.get());
                            }
                        }

                ).print();
        env.execute();
    }
}

14.3.5、聚合状态

类似规约状态,相比于规约状态,聚合里有个累加器来表示状态,聚合的状态类型可与输入数据类型不同

计算水位平均值

package state;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ReducingState;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @Title:
 * @Author lizhe
 * @Package state
 * @Date 2024/6/11 20:54
 * @description:计算水位平均值
 */
public class KeyedAggregateStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777)
                .map(
                        value -> {
                            String[] datas = value.split(",");
                            return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
                        }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner(
                                        (element, recordTime) -> {
                                            return element.getTs() * 1000L;
                                        }
                                )
                );
        sensorDS.keyBy(value -> value.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {
                            AggregatingState<Integer,Double> vcAggregatingState;

                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                 vcAggregatingState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>(
                                        "vcAggregatingState", new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
                                    @Override
                                    public Tuple2<Integer, Integer> createAccumulator() {
                                        return Tuple2.of(0,0);
                                    }

                                    @Override
                                    public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
                                        return Tuple2.of(accumulator.f0+value,accumulator.f1 + 1);
                                    }

                                    @Override
                                    public Double getResult(Tuple2<Integer, Integer> accumulator) {
                                        return accumulator.f0*1D/accumulator.f1;
                                    }

                                    @Override
                                    public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                                        return null;
                                    }
                                }
                                        , Types.TUPLE(Types.INT, Types.INT)));
                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                vcAggregatingState.add(value.getVc());
                                Double vcAvg = vcAggregatingState.get();
                                out.collect("传感器id="+value.getId()+"平均水位="+vcAvg);
                            }
                        }
                ).print();
        env.execute();
    }
}

14.3.6、状态生存时间TTL

状态创建时候,失效时间=当前时间+TTL。可对时效时间进行更新,创建配置对象,调用状态描述器启动TTL

package state;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

/**
 * @Title: KeyedValueStateDemo
 * @Author lizhe
 * @Package state
 * @Date 2024/6/11 20:54
 * @description:
 */
public class KeyedStateTTLDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS = env.socketTextStream("192.168.132.101", 7777)
                .map(
                        value -> {
                            String[] datas = value.split(",");
                            return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
                        }
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                .withTimestampAssigner(
                                        (element, recordTime) -> {
                                            return element.getTs() * 1000L;
                                        }
                                )
                );
        sensorDS.keyBy(value -> value.getId())
                .process(
                        new KeyedProcessFunction<String, WaterSensor, String>() {
                            ValueState<Integer> lastVcState;
                            @Override
                            public void open(Configuration parameters) throws Exception {
                                super.open(parameters);
                                StateTtlConfig stateTtlConfig = StateTtlConfig
                                        .newBuilder(Time.seconds(5))//过期时间5s
                                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//状态的创建和写入会刷新过期时间
                                        .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)//不返回过期的状态值
                                        .build();

                                ValueStateDescriptor<Integer> lastVc = new ValueStateDescriptor<>("lastVc", Integer.class);
                                lastVc.enableTimeToLive(stateTtlConfig);
                                lastVcState=getRuntimeContext().getState(lastVc);

                            }

                            @Override
                            public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                                Integer value1 = lastVcState.value();
                                out.collect("key="+value.getId()+"状态值"+ value1);
                                lastVcState.update(value.getVc());
                            }
                        }
                ).print();
        env.execute();
    }
}

14.4、算子状态

状态分为:列表状态ListState、联合列表状态ListUnionState、广播状态BroadcastState

算子并行实例上定义的状态,作用范围被限定为当前算子任务。

package state;

import bean.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.List;

/**
 * @Title: OperatorListDemo
 * @Author lizhe
 * @Package state
 * @Date 2024/6/13 21:50
 * @description:在map算子中计算数据个数
 */
public class OperatorListDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
         env.socketTextStream("192.168.132.101", 7777)
                .map(new MyCountMapFunction())
                .print();


        env.execute();
    }
    public  static  class MyCountMapFunction implements MapFunction<String,Long>, CheckpointedFunction{
        private  long count =0L;
        private ListState<Long> state;
        @Override
        public Long map(String value) throws Exception {
            return ++count;
        }

        /**
         * 本地变量持久化:将本地变量拷贝到算子状态
         * @param context
         * @throws Exception
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("snapshotState");
            state.clear();
            state.add(count);
        }

        /**
         * 初始化本地变量:程序恢复时,从状态中将数据添加到本地变量,每个子任务调用一次
         * @param context
         * @throws Exception
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("initializeState");
            state = context.getOperatorStateStore().getListState(new ListStateDescriptor<Long>("state", Long.class));
            if (context.isRestored()){
                for (Long aLong : state.get()) {
                    count+=aLong;
                }
            }

        }
    }
}

算子状态List与UnionList区别:

  • list状态:轮询均分给新的子任务
  • UnionList状态:将原先多个子任务状态的合并成一份完整的。给新的并行子任务每人一份完整的

广播状态:算子并行子任务都保持同一份全局状态。

水位超过指定的阈值发送告警,阈值可以动态修改

package state;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.*;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

/**
 * @Title: OperatorListDemo
 * @Author lizhe
 * @Package state
 * @Date 2024/6/13 21:50
 * @description:水位超过指定的阈值发送告警,阈值可以动态修改
 */
public class OperatorBroadcastStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<WaterSensor> sensorDS =  env.socketTextStream("192.168.132.101", 7777)
                .map(
                        value -> {
                            String[] datas = value.split(",");
                            return new WaterSensor(datas[0], Long.parseLong(datas[1]), Integer.parseInt(datas[2]));
                        }
                );
        //配置流用来广播配置
        DataStreamSource<String> configDS = env.socketTextStream("192.168.132.101", 8888);
        final MapStateDescriptor<String, Integer> descriptor = new MapStateDescriptor<>("configDS", String.class, Integer.class);
        BroadcastStream<String> broadcastStream = configDS.broadcast(descriptor);
        BroadcastConnectedStream<WaterSensor, String> connect = sensorDS.connect(broadcastStream);
        connect.process(
                new BroadcastProcessFunction<WaterSensor, String, String>() {
                    /**
                     * 数据流的处理方法
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                        ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(descriptor);
                        Integer integer = broadcastState.get("threshold");
                        //如果数据流先来,广播流为空,要判空
                        integer=integer==null ?0:integer;
                        if (value.getVc()>integer){
                            out.collect("超过阈值,阈值="+integer);
                        }
                    }

                    /**
                     * 广播后的配置流处理方法
                     * @param value
                     * @param ctx
                     * @param out
                     * @throws Exception
                     */
                    @Override
                    public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                        BroadcastState<String, Integer> state = ctx.getBroadcastState(descriptor);
                        state.put("threshold",Integer.valueOf(value));
                    }
                }
        ).print();
        env.execute();
    }


}

14.5、状态后端

状态的存储、访问以及维护都是由一个可插拔的组件决定的,这个组件为状态后端,主要负责管理本地状态的存储方式和位置。

14.5.1、状态后端分类

状态后端开箱即用,可不改变程序逻辑独立配置。有两种,一种为哈希表状态后端(默认),一种为内嵌RocksDB状态后端。

  1. 哈希表状态后端:状态存在内存,直接把状态当对象,存在TaskManager的JVM堆上,以键值对方式存储。
  2. RocksDB状态后端:RocksDB是kv型数据库,将数据存到硬盘。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2119174.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux系统【RockyLinux9.4】下K8S集群【1.31.0】安装部署指南

1.概述 公司之前一直使用的是CentOS系统作为测试、开发、生产环境的基础系统镜像&#xff0c;由于最近的CentOS的镜像彻底终止维护之后&#xff0c;我们在为后续项目的基础系统镜像选型进行的调研&#xff0c; 最好是可以平替的进行类似系统的移植&#xff0c; 经过多番对比&a…

基于锁相环闭环控制AD2S1210旋转变压器测速原理及仿真

旋转变压器通过在转子施加高频励磁信号&#xff0c;通过电磁感应方式在两定子上输出正交的包含转子角度信号&#xff0c;数学建模公式如下&#xff1a; E为幅值&#xff0c;sinwt为转子输入的高频励磁信号&#xff0c;、为电机角度信号 AD2S1210 测角度原理是通过自动控制原理…

进程之间的通信方式

前言 每个进程的用户地址空间都是独立的&#xff0c;一般而言是不能互相访问的&#xff0c;但内核空间是每个进程都共享的&#xff0c;所以进程之间要通信必须通过内核。 Linux提供了以下进程通信方式&#xff1a; 一、管道 所谓的管道&#xff0c;就是内核里面的一串缓存。…

Iceberg与SparkSQL写操作整合

前言 spark操作iceberg之前先要配置spark catalogs&#xff0c;详情参考Iceberg与Spark整合环境配置。 有些操作需要在spark3中开启iceberg sql扩展。 Iceberg使用Apache Spark的DataSourceV2 API来实现数据源和catalog。Spark DSv2是一个不断发展的API&#xff0c;在Spark版…

12. GIS地图制图工程师岗位职责、技术要求和常见面试题

本系列文章目录&#xff1a; 1. GIS开发工程师岗位职责、技术要求和常见面试题 2. GIS数据工程师岗位职责、技术要求和常见面试题 3. GIS后端工程师岗位职责、技术要求和常见面试题 4. GIS前端工程师岗位职责、技术要求和常见面试题 5. GIS工程师岗位职责、技术要求和常见面试…

Mac强制删除文件,碰上“拖拽到废纸篓”无法删除时怎么办?

我们都特别喜欢Mac&#xff0c;不仅是因为它漂亮的外观&#xff0c;还有它的运行顺畅、界面友好。然而&#xff0c;就像所有技术产品一样&#xff0c;有时它也会让我们头疼——比如&#xff0c;当某个文件无论如何都删不掉时。你可能遇到过这样的情况&#xff1a;尝试删除一个文…

亿道三防AI加固平板电脑首亮相,工业级AI PC开启行业新纪元!

8月28日至30日&#xff0c;亿道三防在第22届国际物联网展深圳站上隆重发布了多款AI加固平板电脑和户外三防新品&#xff0c;首次亮相便赢得了现场观众的热烈好评。此外&#xff0c;还有三防平板电脑、工业平板电脑、车载平板电脑以及防爆平板等众多行业类明星产品也悉数登场&am…

【Leetcode算法面试题】-1. 两数之和

文章目录 算法练习题目思路参考答案算法1算法2算法3 算法练习 面试经常会遇到算法题目&#xff0c;今天开启算法专栏&#xff0c;常用算法解析 题目 ** 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&…

【服务器第一期】Xshell、Xftp下载及连接

服务器环境配置 1 Xshell 和 Xftp 的下载安装与使用2 连接服务器2.1. Xshell 连接服务器2.2 文件传输 参考 1 Xshell 和 Xftp 的下载安装与使用 进入 Xshell 下载页面&#xff0c;点击下载 官网-XSHELL-NetSarang Website 选择免费授权页面&#xff1a; 直接下载即可。 PS&…

目标检测从入门到精通——常见iou及变体算法介绍

目标检测中的 IoU 算法及其变体 绪论 在计算机视觉领域&#xff0c;目标检测是一个重要的研究方向&#xff0c;广泛应用于自动驾驶、安防监控、图像搜索等多个场景。为了评估目标检测模型的性能&#xff0c;Intersection over Union&#xff08;IoU&#xff09;作为一种常用的…

SpringBoot OAuth2自定义登陆/授权页

背景 5 月份的时候&#xff0c;我实践并整理了一篇博客&#xff1a;SpringBoot搭建OAuth2&#xff0c;该博客完成之后&#xff0c;很长一段时间里我都有种意犹未尽的感觉。诚然&#xff0c;我把OAuth2搭起来了&#xff0c;各种场景的用例也跑通了&#xff0c;甚至源码也看了&am…

HTTP请求⽅法

HTTP请求⽅法 1. GET &#xff1a;申请获取资源&#xff0c;不对服务器产⽣影响 2. POST &#xff1a; POST 请求通常⽤于发送数据&#xff0c;例如提交表单数据、上传⽂件等&#xff0c;会影响服务器&#xff0c;服务器可能动态创建新的资源或更新原有资源。 3. HEAD &#…

GPU 计算 CMPS224 2021 学习笔记 02

并行类型 &#xff08;1&#xff09;任务并行 &#xff08;2&#xff09;数据并行 CPU & GPU CPU和GPU拥有相互独立的内存空间&#xff0c;需要在两者之间相互传输数据。 &#xff08;1&#xff09;分配GPU内存 &#xff08;2&#xff09;将CPU上的数据复制到GPU上 &…

UE4_后期处理_后期处理材质四—场景物体描边

一、效果如下图&#xff1a; 二、分析&#xff1a; 回顾复习&#xff1a;在后期处理材质三中&#xff0c;我们通过计算开启自定义深度通道物体的像素点上下左右4个像素SceneTextureCustomDepth深度之和来判断物体的外部&#xff08;包含物体的边&#xff09;和内部&#xff0c…

【漏洞利用】2018年-2024年HVV 6000+个漏洞 POC 合集分享

此份poc 集成了Zabbix、用友、通达、Wordpress、Thinkcmf、Weblogic、Tomcat等 下载链接: 链接: https://pan.quark.cn/s/1cd7d8607b8a

Java小白一文讲清Java中集合相关的知识点(七)

LinkedHashSet LinkedHashSet是HashSet的子类 LinkedHashSet底层是一个LinkedHashMap,底层维护了一个数组双向链表 而在之前讲的HashSet中的链表是单向的哈&#xff0c;注意区分&#xff01; LinkedHashSet根据元素的hashcode值来决定元素的存储位置&#xff0c;同时使用链表…

从搜索热度上看Arcgis的衰退

Arcgis已被qgis快速赶上 google trends是一个google综合了每日的搜索情况的统计网站&#xff0c;可以追踪从2004年开始各个关键字的搜索热度。 我用arcgis和qgis作为对比&#xff0c;简单探索了arcgis和qgis的全球相关热度。 假设&#xff0c;搜索arcgis越高的区域&#xff…

机器学习 第8章 集成学习

目录 个体与集成BoostingBagging与随机森林Bagging随机森林 结合策略平均法投票法学习法 个体与集成 定义&#xff1a;集成学习&#xff0c;也叫多分类器系统、基于委员会的学习等&#xff0c;它是一种通过结合多个学习器来构建一个更强大的学习器的技术。如下图所示 在这里&a…

轨道交通系统详解,以及地铁如何精准停靠站台

ATC系统 全称“自动列车控制系统”&#xff0c;Automatic Train Control&#xff0c;ATC ATC是地铁运行的核心系统&#xff0c;它包括列车自动防护&#xff08;ATP&#xff09;、列车自动运行&#xff08;ATO&#xff09;和列车自动监控&#xff08;ATS&#xff09;三个子系统。…

嵌入式day41

哈希表 将要存储的数据的关键字和位置建立对应的关系&#xff0c;通过哈希函数&#xff08;散列函数&#xff09;将数据映射到存储的位置&#xff0c;方便快速查找 哈希冲突/哈希矛盾&#xff1a; key1 ! key2 f(key1) f(key2) 解决方法&#xff1a; 链地址法 算法 解决…