Flink窗口与WaterMark

news2024/9/22 1:16:01

本文目录

  • 窗口的生命周期

  • Window Assigners

  • 窗口函数(Window Functions)

  • Triggers

  • Evictors

  • Allowed Lateness

窗口

窗口(Window)是处理无界流的关键所在。窗口可以将数据流装入大小有限的“桶”中,再对每个“桶”加以处理。本文的重心将放在 Flink 如何进行窗口操作以及开发者如何尽可能地利用 Flink 所提供的功能。

下面展示了 Flink 窗口在 keyed streams 和 non-keyed streams 上使用的基本结构。我们可以看到,这两者唯一的区别仅在于:keyed streams 要调用 keyBy(...)后再调用 window(...) , 而 non-keyed streams 只用直接调用 windowAll(...)。留意这个区别,它能帮我们更好地理解后面的内容。

Keyed Windows

stream
       .keyBy(...)               <-  仅 keyed 窗口需要
       .window(...)              <-  必填项:"assigner"
      [.trigger(...)]            <-  可选项:"trigger" (省略则使用默认 trigger)
      [.evictor(...)]            <-  可选项:"evictor" (省略则不使用 evictor)
      [.allowedLateness(...)]    <-  可选项:"lateness" (省略则为 0)
      [.sideOutputLateData(...)] <-  可选项:"output tag" (省略则不对迟到数据使用 side output)
       .reduce/aggregate/apply()      <-  必填项:"function"
      [.getSideOutput(...)]      <-  可选项:"output tag"

这里有个demo,比如统计百度热搜,百度热搜一般是最近15分钟,为了演示简单起见,这里3s,并且是滚动窗口:

public static void demo() throws Exception {
    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> dataStream = senv.socketTextStream("192.168.20.130", 9999)
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] values = value.split(" ");
                    for(String v : values) {
                        out.collect(Tuple2.of(v, 1));
                    }
                }
            });
    dataStream.keyBy(item -> item.f0)
            .window(TumblingProcessingTimeWindows.of(Time.milliseconds(3000)))
            .reduce((a,b) -> Tuple2.of(a.f0, a.f1 + b.f1))
            .print();

    senv.execute("WindowDemo");
}

启动程序,并且nc输入:flink hadoop flink,结果:

7> (flink,2)
8> (hadoop,1)

Non-Keyed Windows

stream
       .windowAll(...)           <-  必填项:"assigner"
      [.trigger(...)]            <-  可选项:"trigger" (else default trigger)
      [.evictor(...)]            <-  可选项:"evictor" (else no evictor)
      [.allowedLateness(...)]    <-  可选项:"lateness" (else zero)
      [.sideOutputLateData(...)] <-  可选项:"output tag" (else no side output for late data)
       .reduce/aggregate/apply()      <-  必填项:"function"
      [.getSideOutput(...)]      <-  可选项:"output tag"

windowAll所有数据进入一个窗口,不需要keyBy()算子:

public static void demo2() throws Exception {
    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> dataStream = senv.socketTextStream("192.168.20.130", 9999)
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] values = value.split(" ");
                    for(String v : values) {
                        out.collect(Tuple2.of(v, 1));
                    }
                }
            });
    dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.milliseconds(3000)))
            .reduce((a,b) -> Tuple2.of(a.f0, a.f1 + b.f1))
            .print();

    senv.execute("WindowDemo");
}

启动程序,nc键入数据:flink hadoop flink,结果

4> (flink,3)

首先必须要在定义窗口前确定的是你的 stream 是 keyed 还是 non-keyed。keyBy(...) 会将你的无界 stream 分割为逻辑上的 keyed stream。如果 keyBy(...) 没有被调用,你的 stream 就不是 keyed。

对于 keyed stream,其中数据的任何属性都可以作为 key (详见此处)。使用 keyed stream 允许你的窗口计算由多个 task 并行,因为每个逻辑上的 keyed stream 都可以被单独处理。属于同一个 key 的元素会被发送到同一个 task。

对于 non-keyed stream,原始的 stream 不会被分割为多个逻辑上的 stream, 所以所有的窗口计算会被同一个 task 完成,也就是 parallelism 为 1。

窗口的生命周期

简单来说,一个窗口在第一个属于它的元素到达时就会被创建,然后在时间(event 或 processing time) 超过窗口的“结束时间戳 + 用户定义的 allowed lateness (详见 Allowed Lateness)”时 被完全删除。Flink 仅保证删除基于时间的窗口,其他类型的窗口不做保证, 比如全局窗口(详见 Window Assigners)。例如,对于一个基于 event time 且范围互不重合(滚动)的窗口策略, 如果窗口设置的时长为五分钟、可容忍的迟到时间(allowed lateness)为 1 分钟, 那么第一个元素落入 12:00 至 12:05 这个区间时,Flink 就会为这个区间创建一个新的窗口。当 watermark 越过 12:05 时,这个窗口将被摧毁。

另外,每个窗口会设置自己的 Trigger (详见 Triggers)和 function (ProcessWindowFunction、ReduceFunction、或 AggregateFunction, 详见 Window Functions)。该 function 决定如何计算窗口中的内容, 而 Trigger 决定何时窗口中的数据可以被 function 计算。Trigger 的触发(fire)条件可能是“当窗口中有多于 4 条数据”或“当 watermark 越过窗口的结束时间”等。Trigger 还可以在 window 被创建后、删除前的这段时间内定义何时清理(purge)窗口中的数据。这里的数据仅指窗口内的元素,不包括窗口的 meta data。也就是说,窗口在 purge 后仍然可以加入新的数据。

除此之外,你也可以指定一个 Evictor (详见 Evictors),在 trigger 触发之后,Evictor 可以在窗口函数的前后删除数据。

接下来我们会更详细地介绍上面提到的内容。开头的例子中有必填项和可选项。我们先从必填项开始(详见 Keyed vs Non-Keyed Windows、 Window Assigners、Window Functions)。

Window Assigners

指定了你的 stream 是否为 keyed 之后,下一步就是定义 window assigner。

Window assigner 定义了 stream 中的元素如何被分发到各个窗口。你可以在 window(...)(用于 keyed streams)或 windowAll(...) (用于 non-keyed streams)中指定一个 WindowAssigner。WindowAssigner 负责将 stream 中的每个数据分发到一个或多个窗口中。Flink 为最常用的情况提供了一些定义好的 window assigner,也就是 tumbling windows、 sliding windows、 session windows 和 global windows。你也可以继承 WindowAssigner 类来实现自定义的 window assigner。所有内置的 window assigner(除了 global window)都是基于时间分发数据的,processing time 或 event time 均可。请阅读我们对于 event time 的介绍来了解这两者的区别, 以及 timestamp 和 watermark 是如何产生的。

基于时间的窗口用 start timestamp(包含)和 end timestamp(不包含)描述窗口的大小。在代码中,Flink 处理基于时间的窗口使用的是 TimeWindow, 它有查询开始和结束 timestamp 以及返回窗口所能储存的最大 timestamp 的方法 maxTimestamp()。

接下来我们会说明 Flink 内置的 window assigner 如何工作,以及他们如何用在 DataStream 程序中。下面的图片展示了每种 assigner 如何工作。紫色的圆圈代表 stream 中按 key 划分的元素(本例中是按 user 1、user 2 和 user 3 划分)。x 轴表示时间的进展。

通过一些示例来展示关于这些窗口如何使用,或者如何区分它们:

滚动时间窗口

  • 每分钟页面浏览量

  • TumblingEventTimeWindows.of(Time.minutes(1))

滑动时间窗口

  • 每10秒钟计算前1分钟的页面浏览量

  • SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))

会话窗口

  • 每个会话的网页浏览量,其中会话之间的间隔至少为30分钟

  • EventTimeSessionWindows.withGap(Time.minutes(30))

以下都是一些可以使用的间隔时间 Time.milliseconds(n), Time.seconds(n), Time.minutes(n), Time.hours(n), 和 Time.days(n)。

滚动窗口(Tumbling Windows)

滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。比如说,如果你指定了滚动窗口的大小为 5 分钟,那么每 5 分钟就会有一个窗口被计算,且一个新的窗口被创建(如下图所示)。

5e7e463722056c05b5ec49e3eda552b7.jpeg
tumb-window

下面的代码展示了如何使用滚动窗口。

DataStream<T> input = ...;

// 滚动 event-time 窗口
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 滚动 processing-time 窗口
input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 长度为一天的滚动 event-time 窗口, 偏移量为 -8 小时。
input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

时间间隔可以用 Time.milliseconds(x)、Time.seconds(x)、Time.minutes(x) 等来指定。

如上一个例子所示,滚动窗口的 assigners 也可以传入可选的 offset 参数。这个参数可以用来对齐窗口。比如说,不设置 offset 时,长度为一小时的滚动窗口会与 linux 的 epoch 对齐。你会得到如 1:00:00.000 - 1:59:59.999、2:00:00.000 - 2:59:59.999 等。如果你想改变对齐方式,你可以设置一个 offset。如果设置了 15 分钟的 offset, 你会得到 1:15:00.000 - 2:14:59.999、2:15:00.000 - 3:14:59.999 等。一个重要的 offset 用例是根据 UTC-0 调整窗口的时差。比如说,在中国你可能会设置 offset 为 Time.hours(-8)。

滑动窗口(Sliding Windows)

与滚动窗口类似,滑动窗口的 assigner 分发元素到指定大小的窗口,窗口大小通过 window size 参数设置。滑动窗口需要一个额外的滑动距离(window slide)参数来控制生成新窗口的频率。因此,如果 slide 小于窗口大小,滑动窗口可以允许窗口重叠。这种情况下,一个元素可能会被分发到多个窗口。

比如说,你设置了大小为 10 分钟,滑动距离 5 分钟的窗口,你会在每 5 分钟得到一个新的窗口, 里面包含之前 10 分钟到达的数据(如下图所示)。

2b0b72a55375b6f82692e90d91372914.jpeg
slide-window

下面的代码展示了如何使用滑动窗口。

DataStream<T> input = ...;

// 滑动 event-time 窗口
input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 滑动 processing-time 窗口
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

// 滑动 processing-time 窗口,偏移量为 -8 小时
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

时间间隔可以使用 Time.milliseconds(x)、Time.seconds(x)、Time.minutes(x) 等来指定。

如上一个例子所示,滚动窗口的 assigners 也可以传入可选的 offset 参数。这个参数可以用来对齐窗口。比如说,不设置 offset 时,长度为一小时、滑动距离为 30 分钟的滑动窗口会与 linux 的 epoch 对齐。你会得到如 1:00:00.000 - 1:59:59.999, 1:30:00.000 - 2:29:59.999 等。如果你想改变对齐方式,你可以设置一个 offset。如果设置了 15 分钟的 offset,你会得到 1:15:00.000 - 2:14:59.999、1:45:00.000 - 2:44:59.999 等。一个重要的 offset 用例是根据 UTC-0 调整窗口的时差。比如说,在中国你可能会设置 offset 为 Time.hours(-8)。

会话窗口(Session Windows)

会话窗口的 assigner 会把数据按活跃的会话分组。与滚动窗口和滑动窗口不同,会话窗口不会相互重叠,且没有固定的开始或结束时间。会话窗口在一段时间没有收到数据之后会关闭,即在一段不活跃的间隔之后。会话窗口的 assigner 可以设置固定的会话间隔(session gap)或 用 session gap extractor 函数来动态地定义多长时间算作不活跃。当超出了不活跃的时间段,当前的会话就会关闭,并且将接下来的数据分发到新的会话窗口。

976a266ea452203aff2a020259177197.jpeg
session-window

下面的代码展示了如何使用会话窗口。

DataStream<T> input = ...;

// 设置了固定间隔的 event-time 会话窗口
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// 设置了动态间隔的 event-time 会话窗口
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
        // 决定并返回会话间隔
    }))
    .<windowed transformation>(<window function>);

// 设置了固定间隔的 processing-time session 窗口
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);
    
// 设置了动态间隔的 processing-time 会话窗口
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
        // 决定并返回会话间隔
    }))
    .<windowed transformation>(<window function>);

这里有个demo:

public static void demo3() throws Exception {
    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> dataStream = senv.socketTextStream("192.168.20.130", 9999)
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] values = value.split(" ");
                    for(String v : values) {
                        out.collect(Tuple2.of(v, 1));
                    }
                }
            });
    dataStream.keyBy(item -> item.f0)
            .window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(10 * 1000)))
            .reduce((a,b) -> Tuple2.of(a.f0, a.f1 + b.f1))
            .print();

    senv.execute("WindowDemo");
}
全局窗口(Global Windows)

全局窗口的 assigner 将拥有相同 key 的所有数据分发到一个全局窗口。这样的窗口模式仅在你指定了自定义的 trigger 时有用。否则,计算不会发生,因为全局窗口没有天然的终点去触发其中积累的数据。

86d580c11e1a7b2a9522d2586729f6f9.jpeg
global-window

下面的代码展示了如何使用全局窗口。

DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>);

这里给出简单demo,当有数据来,就计算词频并且打印出来;没有数据来,就不用计算不用打印,来一条数据,就触发trigger计算:

public static void demo4() throws Exception {
    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> dataStream = senv.socketTextStream("192.168.20.130", 9999)
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] values = value.split(" ");
                    for(String v : values) {
                        out.collect(Tuple2.of(v, 1));
                    }
                }
            });
    dataStream.keyBy(item -> item.f0)
            .window(GlobalWindows.create())
            .trigger(new Trigger<Tuple2<String, Integer>, GlobalWindow>() {
                @Override
                public TriggerResult onElement(Tuple2<String, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
                    return TriggerResult.FIRE;
                }

                @Override
                public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
                    return null;
                }

                @Override
                public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
                    return null;
                }

                @Override
                public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {

                }
            })
            .reduce((a,b) -> Tuple2.of(a.f0, a.f1 + b.f1))
            .print();

    senv.execute("WindowDemo");
}

窗口函数(Window Functions)

定义了 window assigner 之后,我们需要指定当窗口触发之后,我们如何计算每个窗口中的数据, 这就是 window function 的职责了。关于窗口如何触发,详见 triggers。

窗口函数有三种:ReduceFunction、AggregateFunction 或 ProcessWindowFunction。前两者执行起来更高效(详见 State Size)因为 Flink 可以在每条数据到达窗口后 进行增量聚合(incrementally aggregate)。而 ProcessWindowFunction 会得到能够遍历当前窗口内所有数据的 Iterable,以及关于这个窗口的 meta-information。

使用 ProcessWindowFunction 的窗口转换操作没有其他两种函数高效,因为 Flink 在窗口触发前必须缓存里面的所有数据。ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 合并来提高效率。这样做既可以增量聚合窗口内的数据,又可以从 ProcessWindowFunction 接收窗口的 metadata。我们接下来看看每种函数的例子。

ReduceFunction

ReduceFunction 指定两条输入数据如何合并起来产生一条输出数据,输入和输出数据的类型必须相同。Flink 使用 ReduceFunction 对窗口中的数据进行增量聚合。

ReduceFunction 可以像下面这样定义:

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });

上面的例子是对窗口内元组的第二个属性求和,这里有个demo:

public static void reduceFunctionDemo() throws Exception {
    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> dataStream = senv.socketTextStream("192.168.20.130", 9999)
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] values = value.split(" ");
                    for(String v : values) {
                        out.collect(Tuple2.of(v, 1));
                    }
                }
            });
    dataStream.keyBy(item -> item.f0)
            .window(TumblingProcessingTimeWindows.of(Time.milliseconds(3000)))
            .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                    return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                }
            })
            .print();

    senv.execute("reduceFunctionDemo");
    
}
AggregateFunction

ReduceFunction 是 AggregateFunction 的特殊情况。AggregateFunction 接收三个类型:输入数据的类型(IN)、累加器的类型(ACC)和输出数据的类型(OUT)。输入数据的类型是输入流的元素类型,AggregateFunction 接口有如下几个方法:把每一条元素加进累加器、创建初始累加器、合并两个累加器、从累加器中提取输出(OUT 类型)。我们通过下例说明。

与 ReduceFunction 相同,Flink 会在输入数据到达窗口时直接进行增量聚合。

AggregateFunction 可以像下面这样定义:

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .aggregate(new AverageAggregate());

上例计算了窗口内所有元素第二个属性的平均值。

public static void aggregateFunctionDemo() throws Exception {
    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> dataStream = senv.socketTextStream("192.168.20.130", 9999)
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] values = value.split(" ");
                    for(String v : values) {
                        out.collect(Tuple2.of(v, 1));
                    }
                }
            });
    dataStream.keyBy(item -> item.f0)
            .window(TumblingProcessingTimeWindows.of(Time.milliseconds(3000)))
            .aggregate(new AggregateFunction<Tuple2<String,Integer>, Tuple2<Integer, Integer>, Float>() {
                @Override
                public Tuple2<Integer, Integer> createAccumulator() {
                    return new Tuple2<>(0, 0);
                }

                @Override
                public Tuple2<Integer, Integer> add(Tuple2<String, Integer> value, Tuple2<Integer, Integer> accumulator) {
                    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1);
                }

                @Override
                public Float getResult(Tuple2<Integer, Integer> accumulator) {
                    return (float) accumulator.f0 / accumulator.f1;
                }

                @Override
                public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
                    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
                }
            })
            .print();

    senv.execute("aggregateFunctionDemo");

}
ProcessWindowFunction

ProcessWindowFunction 有能获取包含窗口内所有元素的 Iterable, 以及用来获取时间和状态信息的 Context 对象,比其他窗口函数更加灵活。ProcessWindowFunction 的灵活性是以性能和资源消耗为代价的, 因为窗口中的数据无法被增量聚合,而需要在窗口触发前缓存所有数据。

ProcessWindowFunction 的签名如下:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param context The context in which the window is being evaluated.
     * @param elements The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     *
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public abstract void process(
            KEY key,
            Context context,
            Iterable<IN> elements,
            Collector<OUT> out) throws Exception;

    /**
     * Deletes any state in the {@code Context} when the Window expires (the watermark passes its
     * {@code maxTimestamp} + {@code allowedLateness}).
     *
     * @param context The context to which the window is being evaluated
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public void clear(Context context) throws Exception {}

    /**
     * The context holding window metadata.
     */
    public abstract class Context implements java.io.Serializable {
        /**
         * Returns the window that is being evaluated.
         */
        public abstract W window();

        /** Returns the current processing time. */
        public abstract long currentProcessingTime();

        /** Returns the current event-time watermark. */
        public abstract long currentWatermark();

        /**
         * State accessor for per-key and per-window state.
         *
         * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
         * by implementing {@link ProcessWindowFunction#clear(Context)}.
         */
        public abstract KeyedStateStore windowState();

        /**
         * State accessor for per-key global state.
         */
        public abstract KeyedStateStore globalState();
    }

}

key 参数由 keyBy() 中指定的 KeySelector 选出。如果是给出 key 在 tuple 中的 index 或用属性名的字符串形式指定 key,这个 key 的类型将总是 Tuple, 并且你需要手动将它转换为正确大小的 tuple 才能提取 key。

ProcessWindowFunction 可以像下面这样定义:

DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(t -> t.f0)
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new MyProcessWindowFunction());

/* ... */

public class MyProcessWindowFunction 
    extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

  @Override
  public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple2<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + context.window() + "count: " + count);
  }
}

这里给个demo:

public static void processWindowFunctionDemo() throws Exception {
    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

    DataStream<Tuple2<String, Integer>> dataStream = senv.socketTextStream("192.168.20.130", 9999)
            .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

                @Override
                public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                    String[] values = value.split(" ");
                    for(String v : values) {
                        out.collect(Tuple2.of(v, 1));
                    }
                }
            });
    dataStream.keyBy(item -> item.f0)
            .window(TumblingProcessingTimeWindows.of(Time.milliseconds(3000)))
            .process(new ProcessWindowFunction<Tuple2<String,Integer>, String, String, TimeWindow>() {
                @Override
                public void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<String> out) throws Exception {
                    long count = 0;
                    for (Tuple2<String, Integer> in: elements) {
                        count ++;
                    }
                    out.collect("key: " + key + ", Window: " + context.window() + ", count: " + count);
                }
            })
            .print();

    senv.execute("processWindowFunctionDemo");

}

控制台nc工具键入数据,打印结果:

# nc工具键入数据
[root@hm-001 logs]# nc -lk 9999
flink hadoop flink

# 程序运行结果
7> key: flink, Window: TimeWindow{start=1704352803000, end=1704352806000}, count: 2
8> key: hadoop, Window: TimeWindow{start=1704352803000, end=1704352806000}, count: 1

演示代码[地址] https://gitee.com/ddxygq/BigDataTechnical/blob/main/Flink/src/main/java/window/WindowFunctionDemo.java

上例使用 ProcessWindowFunction 对窗口中的元素计数,并且将窗口本身的信息一同输出。

注意,使用 ProcessWindowFunction 完成简单的聚合任务是非常低效的。下一章会说明如何将 ReduceFunction 或 AggregateFunction 与 ProcessWindowFunction 组合成既能 增量聚合又能获得窗口额外信息的窗口函数。

增量聚合的 ProcessWindowFunction

ProcessWindowFunction 可以与 ReduceFunction 或 AggregateFunction 搭配使用, 使其能够在数据到达窗口的时候进行增量聚合。当窗口关闭时,ProcessWindowFunction 将会得到聚合的结果。这样它就可以增量聚合窗口的元素并且从 ProcessWindowFunction中获得窗口的元数据。

你也可以对过时的 WindowFunction 使用增量聚合。

使用 ReduceFunction 增量聚合

下例展示了如何将 ReduceFunction 与 ProcessWindowFunction 组合,返回窗口中的最小元素和窗口的开始时间。

DataStream<SensorReading> input = ...;

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .reduce(new MyReduceFunction(), new MyProcessWindowFunction());

// Function definitions

private static class MyReduceFunction implements ReduceFunction<SensorReading> {

  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      return r1.value() > r2.value() ? r2 : r1;
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<SensorReading> minReadings,
                    Collector<Tuple2<Long, SensorReading>> out) {
      SensorReading min = minReadings.iterator().next();
      out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
  }
}
使用 AggregateFunction 增量聚合

下例展示了如何将 AggregateFunction 与 ProcessWindowFunction 组合,计算平均值并与窗口对应的 key 一同输出。

DataStream<Tuple2<String, Long>> input = ...;

input
  .keyBy(<key selector>)
  .window(<window assigner>)
  .aggregate(new AverageAggregate(), new MyProcessWindowFunction());

// Function definitions

/**
 * The accumulator is used to keep a running sum and a count. The {@code getResult} method
 * computes the average.
 */
private static class AverageAggregate
    implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
  @Override
  public Tuple2<Long, Long> createAccumulator() {
    return new Tuple2<>(0L, 0L);
  }

  @Override
  public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
    return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
  }

  @Override
  public Double getResult(Tuple2<Long, Long> accumulator) {
    return ((double) accumulator.f0) / accumulator.f1;
  }

  @Override
  public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
    return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
  }
}

private static class MyProcessWindowFunction
    extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {

  public void process(String key,
                    Context context,
                    Iterable<Double> averages,
                    Collector<Tuple2<String, Double>> out) {
      Double average = averages.iterator().next();
      out.collect(new Tuple2<>(key, average));
  }
}
在 ProcessWindowFunction 中使用 per-window state

除了访问 keyed state (任何富函数都可以),ProcessWindowFunction 还可以使用作用域仅为 “当前正在处理的窗口”的 keyed state。在这种情况下,理解 per-window 中的 window 指的是什么非常重要。总共有以下几种窗口的理解:

在窗口操作中定义的窗口:比如定义了长一小时的滚动窗口或长两小时、滑动一小时的滑动窗口。对应某个 key 的窗口实例:比如 以 user-id xyz 为 key,从 12:00 到 13:00 的时间窗口。具体情况取决于窗口的定义,根据具体的 key 和时间段会产生诸多不同的窗口实例。Per-window state 作用于后者。也就是说,如果我们处理有 1000 种不同 key 的事件, 并且目前所有事件都处于 [12:00, 13:00) 时间窗口内,那么我们将会得到 1000 个窗口实例, 且每个实例都有自己的 keyed per-window state。

process() 接收到的 Context 对象中有两个方法允许我们访问以下两种 state:

globalState(),访问全局的 keyed state windowState(), 访问作用域仅限于当前窗口的 keyed state 如果你可能将一个 window 触发多次(比如当你的迟到数据会再次触发窗口计算, 或你自定义了根据推测提前触发窗口的 trigger),那么这个功能将非常有用。这时你可能需要在 per-window state 中储存关于之前触发的信息或触发的总次数。

当使用窗口状态时,一定记得在删除窗口时清除这些状态。他们应该定义在 clear() 方法中。

Triggers

Trigger 决定了一个窗口(由 window assigner 定义)何时可以被 window function 处理。每个 WindowAssigner 都有一个默认的 Trigger。如果默认 trigger 无法满足你的需要,你可以在 trigger(...) 调用中指定自定义的 trigger。

Trigger 接口提供了五个方法来响应不同的事件:

  • onElement() 方法在每个元素被加入窗口时调用。

  • onEventTime() 方法在注册的 event-time timer 触发时调用。

  • onProcessingTime() 方法在注册的 processing-time timer 触发时调用。

  • onMerge() 方法与有状态的 trigger 相关。该方法会在两个窗口合并时, 将窗口对应 trigger 的状态进行合并,比如使用会话窗口时。

  • 最后,clear() 方法处理在对应窗口被移除时所需的逻辑。

有两点需要注意:

  1. 前三个方法通过返回 TriggerResult 来决定 trigger 如何应对到达窗口的事件。应对方案有以下几种:

  • CONTINUE: 什么也不做

  • FIRE: 触发计算

  • PURGE: 清空窗口内的元素

  • FIRE_AND_PURGE: 触发计算,计算结束后清空窗口内的元素

  1. 上面的任意方法都可以用来注册 processing-time 或 event-time timer。

触发(Fire)与清除(Purge)

当 trigger 认定一个窗口可以被计算时,它就会触发,也就是返回 FIRE 或 FIRE_AND_PURGE。这是让窗口算子发送当前窗口计算结果的信号。如果一个窗口指定了 ProcessWindowFunction,所有的元素都会传给 ProcessWindowFunction。如果是 ReduceFunction 或 AggregateFunction,则直接发送聚合的结果。

当 trigger 触发时,它可以返回 FIRE 或 FIRE_AND_PURGE。FIRE 会保留被触发的窗口中的内容,而 FIRE_AND_PURGE 会删除这些内容。Flink 内置的 trigger 默认使用 FIRE,不会清除窗口的状态。

Purge 只会移除窗口的内容, 不会移除关于窗口的 meta-information 和 trigger 的状态。

WindowAssigner 默认的 Triggers

WindowAssigner 默认的 Trigger 足以应付诸多情况。比如说,所有的 event-time window assigner 都默认使用 EventTimeTrigger。这个 trigger 会在 watermark 越过窗口结束时间后直接触发。

GlobalWindow 的默认 trigger 是永远不会触发的 NeverTrigger。因此,使用 GlobalWindow 时,你必须自己定义一个 trigger。

当你在 trigger() 中指定了一个 trigger 时, 你实际上覆盖了当前 WindowAssigner 默认的 trigger。比如说,如果你指定了一个 CountTrigger 给 TumblingEventTimeWindows,你的窗口将不再根据时间触发, 而是根据元素数量触发。如果你希望即响应时间,又响应数量,就需要自定义 trigger 了。

内置 Triggers 和自定义 Triggers

Flink 包含一些内置 trigger。

  • 之前提到过的 EventTimeTrigger 根据 watermark 测量的 event time 触发。

  • ProcessingTimeTrigger 根据 processing time 触发。

  • CountTrigger 在窗口中的元素超过预设的限制时触发。

  • PurgingTrigger 接收另一个 trigger 并将它转换成一个会清理数据的 trigger。

如果你需要实现自定义的 trigger,你应该看看这个抽象类 Trigger 。请注意,这个 API 仍在发展,所以在之后的 Flink 版本中可能会发生变化。

Evictors

Flink 的窗口模型允许在 WindowAssigner 和 Trigger 之外指定可选的 Evictor。如本文开篇的代码中所示,通过 evictor(...) 方法传入 Evictor。Evictor 可以在 trigger 触发后、调用窗口函数之前或之后从窗口中删除元素。Evictor 接口提供了两个方法实现此功能:

/**
 * Optionally evicts elements. Called before windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

/**
 * Optionally evicts elements. Called after windowing function.
 *
 * @param elements The elements currently in the pane.
 * @param size The current number of elements in the pane.
 * @param window The {@link Window}
 * @param evictorContext The context for the Evictor
 */
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

evictBefore() 包含在调用窗口函数前的逻辑,而 evictAfter() 包含在窗口函数调用之后的逻辑。在调用窗口函数之前被移除的元素不会被窗口函数计算。

Flink 内置有三个 evictor:

CountEvictor: 仅记录用户指定数量的元素,一旦窗口中的元素超过这个数量,多余的元素会从窗口缓存的开头移除 DeltaEvictor: 接收 DeltaFunction 和 threshold 参数,计算最后一个元素与窗口缓存中所有元素的差值, 并移除差值大于或等于 threshold 的元素。TimeEvictor: 接收 interval 参数,以毫秒表示。它会找到窗口中元素的最大 timestamp max_ts 并移除比 max_ts - interval 小的所有元素。默认情况下,所有内置的 evictor 逻辑都在调用窗口函数前执行。

指定一个 evictor 可以避免预聚合,因为窗口中的所有元素在计算前都必须经过 evictor。

Note: Evictor 在 Python DataStream API 中还不支持. Flink 不对窗口中元素的顺序做任何保证。也就是说,即使 evictor 从窗口缓存的开头移除一个元素,这个元素也不一定是最先或者最后到达窗口的。

Allowed Lateness

在使用 event-time 窗口时,数据可能会迟到,即 Flink 用来追踪 event-time 进展的 watermark 已经 越过了窗口结束的 timestamp 后,数据才到达。对于 Flink 如何处理 event time, event time 和 late elements 有更详细的探讨。

默认情况下,watermark 一旦越过窗口结束的 timestamp,迟到的数据就会被直接丢弃。但是 Flink 允许指定窗口算子最大的 allowed lateness。Allowed lateness 定义了一个元素可以在迟到多长时间的情况下不被丢弃,这个参数默认是 0。在 watermark 超过窗口末端、到达窗口末端加上 allowed lateness 之前的这段时间内到达的元素, 依旧会被加入窗口。取决于窗口的 trigger,一个迟到但没有被丢弃的元素可能会再次触发窗口,比如 EventTimeTrigger。

为了实现这个功能,Flink 会将窗口状态保存到 allowed lateness 超时才会将窗口及其状态删除 (如 Window Lifecycle 所述)。

默认情况下,allowed lateness 被设为 0。即 watermark 之后到达的元素会被丢弃。

你可以像下面这样指定 allowed lateness:

DataStream<T> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>);

使用 GlobalWindows 时,没有数据会被视作迟到,因为全局窗口的结束 timestamp 是 Long.MAX_VALUE。

从旁路输出(side output)获取迟到数据

通过 Flink 的 旁路输出 功能,你可以获得迟到数据的数据流。

首先,你需要在开窗后的 stream 上使用 sideOutputLateData(OutputTag) 表明你需要获取迟到数据。然后,你就可以从窗口操作的结果中获取旁路输出流了。

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

DataStream<T> input = ...;

SingleOutputStreamOperator<T> result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>);

DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
关于状态大小的考量

窗口可以被定义在很长的时间段上(比如几天、几周或几个月)并且积累下很大的状态。当你估算窗口计算的储存需求时,可以铭记几条规则:

  • Flink 会为一个元素在它所属的每一个窗口中都创建一个副本。因此,一个元素在滚动窗口的设置中只会存在一个副本(一个元素仅属于一个窗口,除非它迟到了)。与之相反,一个元素可能会被拷贝到多个滑动窗口中,就如我们在 Window Assigners 中描述的那样。因此,设置一个大小为一天、滑动距离为一秒的滑动窗口可能不是个好想法。

  • ReduceFunction 和 AggregateFunction 可以极大地减少储存需求,因为他们会就地聚合到达的元素, 且每个窗口仅储存一个值。而使用 ProcessWindowFunction 需要累积窗口中所有的元素。

  • 使用 Evictor 可以避免预聚合, 因为窗口中的所有数据必须先经过 evictor 才能进行计算(详见 Evictors)。

WaterMark

watermark是为了解决数据延迟到达的问题的,在介绍watermark之前,先介绍一下Event Time的概念:

  • Event Time事件时间(event time):事件产生的时间,记录的是设备生产(或者存储)事件的时间

  • 摄取时间(ingestion time):Flink 读取事件时记录的时间

  • 处理时间(processing time):Flink pipeline 中具体算子处理事件的时间

让我们通过一个简单的示例来演示为什么需要 watermarks 及其工作方式。

在此示例中,我们将看到带有混乱时间戳的事件流,如下所示。显示的数字表达的是这些事件实际发生时间的时间戳。到达的第一个事件发生在时间 4,随后发生的事件发生在更早的时间 2,依此类推:

de3298a13e1b28219347a81a94261202.png假设我们要对数据流排序,我们想要达到的目的是:应用程序应该在数据流里的事件到达时就有一个算子(我们暂且称之为排序)开始处理事件,这个算子所输出的流是按照时间戳排序好的。

让我们重新审视这些数据:

(1) 我们的排序器看到的第一个事件的时间戳是 4,但是我们不能立即将其作为已排序的流释放。因为我们并不能确定它是有序的,并且较早的事件有可能并未到达。事实上,如果站在上帝视角,我们知道,必须要等到时间戳为 2 的元素到来时,排序器才可以有事件输出。

需要一些缓冲,需要一些时间,但这都是值得的

(2) 接下来的这一步,如果我们选择的是固执的等待,我们永远不会有结果。首先,我们看到了时间戳为 4 的事件,然后看到了时间戳为 2 的事件。可是,时间戳小于 2 的事件接下来会不会到来呢?可能会,也可能不会。再次站在上帝视角,我们知道,我们永远不会看到时间戳 1。

最终,我们必须勇于承担责任,并发出指令,把带有时间戳 2 的事件作为已排序的事件流的开始

(3)然后,我们需要一种策略,该策略定义:对于任何给定时间戳的事件,Flink 何时停止等待较早事件的到来。

这正是 watermarks 的作用 — 它们定义何时停止等待较早的事件。

Flink 中事件时间的处理取决于 watermark 生成器,后者将带有时间戳的特殊元素插入流中形成 watermarks。事件时间 t 的 watermark 代表 t 之前(很可能)都已经到达。

当 watermark 以 2 或更大的时间戳到达时,事件流的排序器应停止等待,并输出 2 作为已经排序好的流。

(4) 我们可能会思考,如何决定 watermarks 的不同生成策略

每个事件都会延迟一段时间后到达,然而这些延迟有所不同,有些事件可能比其他事件延迟得更多。一种简单的方法是假定这些延迟受某个最大延迟的限制。Flink 将此策略称为 最大无序边界 (bounded-out-of-orderness) watermark。当然,我们可以想像出更好的生成 watermark 的方法,但是对于大多数应用而言,固定延迟策略已经足够了。

使用 Watermarks

如果想要使用基于带有事件时间戳的事件流,Flink 需要知道与每个事件相关的时间戳,而且流必须包含 watermark。但是,在您自己的应用程序中,您将必须自己进行处理,这通常是通过实现一个类来实现的,该类从事件中提取时间戳,并根据需要生成 watermarks。最简单的方法是使用 WatermarkStrategy:

DataStream<Event> stream = ...;

WatermarkStrategy<Event> strategy = WatermarkStrategy
        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
        .withTimestampAssigner((event, timestamp) -> event.timestamp);

DataStream<Event> withTimestampsAndWatermarks =
    stream.assignTimestampsAndWatermarks(strategy);

这里有个demo,用作演示:

public static void demo() throws Exception {
    StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();

    // 每条数据最多延迟 5s
    long maxOutOfOrderness = 5000;
    // 窗口大小5s
    long windowSize = 5000;

    DataStream<Tuple2<String, Integer>> dataStream = senv.socketTextStream("192.168.20.130", 9999)
            .assignTimestampsAndWatermarks(WatermarkStrategy.<String>forBoundedOutOfOrderness(Duration.ofMillis(maxOutOfOrderness))
                    .withTimestampAssigner((event, timestamp) ->
                            // 获取数据里面的time字段
                        JSON.parseObject(event).getLong("time")
                    )).map(item -> Tuple2.of(JSON.parseObject(item).getString("name"), 1));
    dataStream.keyBy(item -> item.f0)
            .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSize)))
            .sum(1)
            .print();

    senv.execute("reduceFunctionDemo");

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1361191.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

K8S陈述式资源管理(1)

命令行: kubectl命令行工具 优点: 90%以上的场景都可以满足对资源的增&#xff0c;删&#xff0c;查比较方便&#xff0c;对改不是很友好 缺点:命令比较冗长&#xff0c;复杂&#xff0c;难记声明式 声明式&#xff1a;K8S当中的yaml文件来实现资源管理 GUI&#xff1a;图形…

box-shadow参数学习及渲染过程研究

参数定义 CSS 的 box-shadow 属性用于在元素的框架周围添加阴影效果。它可以接受多个由逗号分隔的阴影效果&#xff0c;每个阴影效果由以下几部分组成&#xff1a; h-offset&#xff1a;水平阴影的位置。正值将阴影向右移动&#xff0c;负值将阴影向左移动。v-offset&#xf…

vue-video-player播放hls视频流

需求 最近需要接入海康视频摄像头&#xff0c;然后把视频的画面接入到自己的网站系统中。以前对接过rtsp固定IP的显示视频&#xff0c;这次的不一样&#xff0c;没有了固定IP。海康的解决办法是&#xff0c;摄像头通过配置服务器到萤石云平台&#xff0c;然后购买企业版账号和…

【unity小技巧】实现没有动画的FPS武器摇摆和摆动效果

文章目录 前言开始完结 前言 添加程序摇摆和摆动是为任何FPS游戏添加一些细节的非常简单的方法。但是并不是所以的模型动画都会配有武器摆动动画效果&#xff0c;在本文中&#xff0c;将实现如何使用一些简单的代码实现武器摇摆和摆动效果&#xff0c;这比设置动画来尝试实现类…

调整几行代码,接口吞吐提升 10 倍,性能调优妙啊!

景 分析过程 总结 背景 公司的一个ToB系统,因为客户使用的也不多,没啥并发要求,就一直没有经过压测。这两天来了一个“大客户”,对并发量提出了要求:核心接口与几个重点使用场景单节点吞吐量要满足最低500/s的要求。 当时一想,500/s吞吐量还不简单。Tomcat按照100个线程…

小心JDK20 ZipOutputStream

Oracle 團隊竟然這麽粗心&#xff0c;編譯JDK 20 時ZipOutputStream沒有編譯成功就發佈了。 所以這個20版本不可以使用ZipOutputStream。 GZIPInputStream 只能做最後的壓縮&#xff0c;不能添加多個附件ZipEntry。 下一個版本21不存在這個問題。 try(var zipOut new ZipOu…

C++之STL库简介

目录 一、STL&#xff08;Standard Template Library&#xff0c;标准模板库&#xff09; 二、容器&#xff08;Containers&#xff09; 1.vector&#xff08;动态数组&#xff09; 2.list&#xff08;双向链表&#xff09; 3.deque&#xff08;双端队列&#xff09; 4.st…

怎么做表单二维码来获取用户数据?扫码填表的制作方法

​怎么用二维码来收集其他人的信息&#xff0c;比如用户反馈、信息采集、问卷调查等等&#xff0c;都是现在表单二维码的常见应用方式。那么如果我们想制作一个表单二维码用来采集其他人员的反馈信息&#xff0c;用二维码生成器来制作的步骤有哪些呢&#xff1f;下面来教大家在…

Redis高级特性和应用(慢查询、Pipeline、事务、Lua)

Redis的慢查询 许多存储系统(例如 MySQL)提供慢查询日志帮助开发和运维人员定位系统存在的慢操作。所谓慢查询日志就是系统在命令执行前后计算每条命令的执行时间,当超过预设阀值,就将这条命令的相关信息(例如:发生时间,耗时,命令的详细信息)记录下来,Redis也提供了类似…

高压放大器输出接法及其注意事项

高压放大器应用场景非常广泛&#xff0c;非常适用于半导体高压驱动、TFT产业高压驱动、各种高压工程等应用&#xff1b;也很适用当作音频信号产生器或函数波形产生器的波形放大使用。使用场景广泛&#xff0c;放大器的输出接法也多种&#xff0c;对于不同的放大器也有对应的输出…

Linux vi/vim 教程

文章目录 【 1. vi/vim 的三种模式 】1.1 命令模式1.2 输入模式1.3 底线命令模式 【 2. 实例 】【 3. vim 的其他命令 】 所有的 Unix Like 系统都会内建 vi 文本编辑器&#xff0c;其他的文本编辑器则不一定会存在。目前我们使用比较多的是 vim 编辑器。vim 从 vi 发展出来&am…

Leetcode 剑指 Offer II 060. 前 K 个高频元素

题目难度: 中等 原题链接 今天继续更新 Leetcode 的剑指 Offer&#xff08;专项突击版&#xff09;系列, 大家在公众号 算法精选 里回复 剑指offer2 就能看到该系列当前连载的所有文章了, 记得关注哦~ 题目描述 给定一个整数数组 nums 和一个整数 k &#xff0c;请返回其中出现…

【Java集合篇】接上篇博文--为什么在JDK8中HashMap要转成红黑树

为什么在JDK8中HashMap要转成红黑树 ✔️为什么不继续使用链表✔️为什么是红黑树✔️红黑树的性能优势 ✔️ 拓展知识仓✔️为什么是链表长度达到8的时候转✔️为什么不在冲突的时候立刻转✔️关于为什么长度为8的时候转(源码注释解读)✔️为什么长度为6的时候转回来?✔️双向…

使用jmeter从0开始完成性能测试

使用JMeter从0开始完成性能测试 介绍 在软件开发过程中&#xff0c;性能测试是一项关键任务&#xff0c;它可以帮助我们评估系统在不同负载条件下的性能表现&#xff0c;发现潜在的性能瓶颈。JMeter是一款功能强大且易于使用的性能测试工具&#xff0c;它可以帮助我们完成各种…

iec104和iec61850

iec104和iec61850 IEC104 规约详细解读(一) 协议结构 IEC104 规约详细解读(二)交互流程以及协议解析 61850开发知识总结与分享【1】 Get the necesarry projects next to each other in the same directory; $ git clone https://github.com/robidev/iec61850_open_server.g…

NGUI基础-Widget

目录 Widget是什么 Widget组件包含的属性 Pivot Depth Size snap Aspect Free Based on Width Based on Height Widget是什么 在Unity UI系统中&#xff0c;"Widget"是指UI元素的基类&#xff0c;它为UI元素提供了位置、大小和锚点等基本属性。通过使用&qu…

VINS-MONO拓展2----更快地makeHessian矩阵

1. 目标 完成大作业T2 作业提示&#xff1a; 多线程方法主要包括以下几种(参考博客)&#xff1a; MPI(多主机多线程开发),OpenMP(为单主机多线程开发而设计)SSE(主要增强CPU浮点运算的能力)CUDAStream processing, 之前已经了解过std::thread和pthread&#xff0c;拓展1…

冠军团队!第二届百度搜索创新大赛AI方案

Datawhale干货 作者&#xff1a;李柯辰&#xff0c;Datawhale成员 写在前面 大家好&#xff0c;我们是2023年第二届百度搜索创新大赛 赛道三——AI应用设计赛道的冠军团队——“肝到凌晨”&#xff0c;很高兴能与大家分享我们这次比赛的经验&#xff0c;同时也希望以后有机会可…

【机器学习:欧氏距离 】机器学习中欧氏距离的理解和应用

【机器学习&#xff1a;欧氏距离 】机器学习中欧氏距离的理解和应用 距离公式二维更高的维度点以外的物体属性欧几里得距离的平方概括历史 在数学中&#xff0c;欧氏距离’是指欧氏空间中任意两点之间的直线距离。这种距离可以通过应用勾股定理来计算&#xff0c;利用两点的笛卡…

【userfaultfd 条件竞争】starCTF2019 - hackme

前言 呜呜呜&#xff0c;这题不难&#xff0c;但是差不多一个多月没碰我的女朋友 kernel pwn 了&#xff0c;对我的 root 宝宝也是非常想念&#xff0c;可惜这题没有找到我的 root 宝宝&#xff0c;就偷了她的 flag。 哎有点生疏了&#xff0c;这题没看出来堆溢出&#xff0c…