【基础】Flink -- Time and Window

news2024/11/25 18:37:15

Flink -- Time and Window

  • Flink 时间语义
  • 水位线 Watermark
    • 水位线的概念
      • 有序流中的水位线
      • 乱序流中的水位线
      • 水位线的特性
    • 水位线的基本使用
      • 水位线生成策略
      • 内置水位线生成器
      • 自定义水位线策略
      • 在自定义数据源中发送水位线
  • 窗口 Window
    • 窗口的基本概述
      • 窗口的基本概念
      • 窗口的分类
      • 窗口的 API
    • 窗口的基本使用
      • 窗口分配器
        • 时间窗口
        • 计数窗口
      • 窗口函数
        • 增量聚合函数
        • 全窗口函数
        • 增量聚合和全窗口函数的结合使用
      • 测试水位线与窗口的使用
    • 窗口的其他 API
      • 触发器 Trigger
      • 移除器 Evictor
      • 允许延迟 Allowed Lateness
      • 侧入流 Side Output
    • 迟到数据的处理

Flink 时间语义

在一台机器当中,所谓的时间当然就是指系统时间;但是在一个分布式的系统当中,各个节点彼此独立、互不影响,因此并不存在一个统一的时钟。因此,当我们需要进行数据的聚合计算时,”并不同时被处理“的事件可能会导致计算结果出错。

此外,当流中的数据进行传输时,也会存在传输的延迟,延迟将会导致时间语义的模糊,比如:上游任务在8点59分59秒发出一条数据,到下游要做窗口计算时已经是9点零1秒了,那这条数据到底该不该被收到 8~9 点的窗口呢?

因此,我们需要指定一个统一的时间标准,按照这个标准进行数据的收集和计算。Flink 中为我们提供了两种时间语义,分别为事件时间和处理时间。
在这里插入图片描述

  • 处理时间(Processing Time)

    处理时间就是在执行处理操作时机器的系统时间。这种时间语义简单粗暴,不需要各个节点之间的同步协调,因此处理时间是最简单的语义。

  • 事件时间(Event Time)

    事件时间指的是数据生成的时间,这个时间伴随着数据的生成而生成,并且是不会变化的,因此我们可以将这个时间作为一个属性嵌入数据当中,并以该时间作为指标进行数据的收集和计算。

在 Flink 中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从 1.12 版本开始,Flink 已经将事件时间作为了默认的时间语义。

水位线 Watermark

水位线的概念

当使用事件时间作为时间语义时,就相当于每条数据都携带了一个时钟。因此,在数据处理过程中的时钟则根据数据的到来而不停的驱动。即何时产生的数据到达处理系统时,系统的时间就会推进到当前数据产生的时间,从而实现数据的时间标准化。

但是在分布式系统中,上述的时间驱动方式会存在以下问题:

  1. 若数据在处理转换的过程当中存在窗口聚合操作,那么会导致下游的数据量减少,因此对于时间控制的精细度便会下降;

  2. 在一般情况下,数据向下游的传递只会传递给一个子任务,此时其他并行子任务的时钟便无法推进;

为解决上述问题,我们在进行数据传递时,需要把时钟也以数据的形式进行传递,这个时间的标志不会因窗口聚合运算而停滞,同时可以直接广播到下游。

在 Flink 中,这种用来衡量事件时间 Event Time 进展的标记,就被称作”水位线“ Watermark 。水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后,这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。
在这里插入图片描述

如上图所示,当某个数据到来之后,就可以依据这条数据的事件时间,在其后插入一个水位线,并伴随着数据一起向下游流动,以推进时间进度。

有序流中的水位线

在理想状态下,数据会按照生成的顺序,排队进入流中进行处理。这就可以保证时间的推进都是从小到大的,即插入的水位线是不断增长的。

在实际应用中,若当前的数据量非常大,就可能会存在很多具有相同时间戳的数据,此时若针对每条数据都插入水位线,就会产生很多相同的无意义的水位线。因此对于有序流,一般采取周期性的水位线。如下图所示。
在这里插入图片描述

注意,这里的周期也是一个时间的概念,周期是以系统时间为基准的。

乱序流中的水位线

这里所说的乱序是指数据到达的顺序是混乱的,由于网络延迟的存在,先产生的数据可能之后才会到达。如果还采取之前的策略,那么有可能会产生“时间倒流”的问题。比如第9秒的数据到达之后,流中的时间水位推移到第 9 秒,随后第7秒数据到达,此时再插入水位线将导致时间水位变回第 7 秒。

针对这个问题,Flink 采取判断 + 周期生成的办法。假设我们的周期设置为 5s,那么 Flink 会记录五秒内数据携带的时间戳的最大值,并每隔五秒将水位线添加进流中传递。采用这种方法可以有效避免“时间倒流”的问题。如下图所示。
在这里插入图片描述

此外,乱序流还存在另外一个头疼的问题,即当我们需要对数据按照时间窗口进行聚合计算时,“迟到”的数据将会导致汇总数据结果不准确。比如我们需要每 10 秒计算一下某电站的发电量,若第8秒的发电量在第10秒之后达到,那么在第10秒的数据到达之后,该时间窗口就已经关闭进行计算并输出结果,因此第8秒的数据就不会被统计,这将导致错误的汇总结果。

为解决“迟到”数据的问题,Flink 允许我们采取“等待策略”。在 Flink 当中,水位线就是时钟,若我们在确定水位线时故意延迟几秒,这样就可以达到等待“迟到”数据的效果。如下图所示。我们对“迟到”数据的容忍度设置为 2 秒。因此,在第一次插入水位线时,即使该周期内的最大时间为第9秒的数据,但我们插入的水位线时间为 7,这意味着第7秒前的数据已经全部到达,即使第8秒的数据随后才抵达,那也不会影响聚合计算的结果,因为水位线就是时钟。
在这里插入图片描述

该方法通过牺牲数据的实时性而保证了数据计算的准确定。值得注意的一点是,如果数据"迟到"的太久,那么我们只能通过加长等待时间来处理。

水位线的特性

  • 水位线是插入数据流中的一个标记,可以看作一个特殊的数据;

  • 水位线主要内容就是一个时间戳,用于推进时钟;

  • 水位线是基于数据的时间戳而生成的;

  • 水位线的时间戳必须单调递增,以保证任务时钟一直向前推进;

  • 水位线可以通过设置延迟的方式来保证乱序数据的正确处理;

  • 若水位线到达某个时间 t,则代表第 t 秒之前的数据已经全部到达(包含 t),在之后的数据中不会有时间戳小于等于 t 的数据出现;

水位线的基本使用

水位线生成策略

Flink 的 DataStream API 中提供了一个用于生成水位线的方法.assignTimestampsAndWatermarks(),该方法可以为流中的数据分配时间戳,并生成水位线来指示时钟。

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks
(WatermarkStrategy<T> watermarkStrategy)

该方法需要传入一个WatermarkStrategy类对象作为参数,该类中包含一个时间戳分配器方法createTimestampAssigner()以及生成水位线生成器方法createWatermarkGenerator()

下述代码中使用的实体类与源算子的代码分别如下:

实体类 Event

@Data
@AllArgsConstructor
@NoArgsConstructor
public class Event {

    public String user;
    public String url;
    public Long timestamp;

}

源算子 EventSource

public class EventSource implements SourceFunction<Event> {

    private Boolean flag = true;

    String[] users = {"曹操", "刘备", "孙权", "诸葛亮"};
    String[] urls = {"/home", "/test?id=1", "/test?id=2", "/play/football", "/play/basketball"};

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        Random random = new Random();
        while (flag) {
            sourceContext.collect(new Event(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar.getInstance().getTimeInMillis()
            ));
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }
}

内置水位线生成器

Flink 提供了内置的水位线生成器(WatermarkGenerator),不仅开箱即用简化了编程,
而且也为我们自定义水位线策略提供了模板。这两个生成器可以通过调用WatermarkStrategy的静态辅助方法来创建。它们都是周期性生成水位线的,分别对应着处理有序流和乱序流的场景。

示例代码如下,该代码段中提取实体类 Event 的 Timestamp 字段作为时间戳,并设置 5s 的延迟时间用于处理乱序流的迟到数据。

public class WaterMarkDemo {

    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(1);
        // 2. 加载数据源
        environment.addSource(new EventSource())
                // 3. 设置水位线逻辑
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 3.1 针对乱序流插入水位线,延迟时间设置为 5s
                        // 针对有序流周期性生成水位线即可:.<Event>forMonotonousTimestamps();
                        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        // 3.2 抽取时间戳的逻辑
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        }))
                // 4. 打印输出
                .print();
        // 5. 执行程序
        environment.execute();
    }

}

这里需要注意的是,乱序流中生成的水位线真正的时间戳,其实是当前最大时间戳 – 延 迟时间 – 1,位是毫秒。

为什么要减 1 毫秒呢?回想一下水位线的特点:时间戳为 t 的水位线,表示时间戳小于等于 t 的数据全部到齐,不会再来了。如果考虑有序流,也就是延迟时间为 0 的情况,那么时间戳为 7 秒的数据到来时,之后其实是还有可能继续来 7 秒的数据的;所以生成的水位线不是 7 秒,而是 6 秒 999 毫秒,7 秒的数据还可以继续来。这一点可以在BoundedOutOfOrdernessWatermarks的源码中明显地看到:

public void onPeriodicEmit(WatermarkOutput output) {
    output.emitWatermark(new Watermark(this.maxTimestamp - this.outOfOrdernessMillis - 1L));
}

自定义水位线策略

自定义水位线策略时,需要我们创建一个类并实现WatermarkStrategy接口,重写接口中的时间戳分配器方法createTimestampAssigner与水位线生成方法createWatermarkGenerator即可。

  • 时间戳分配器:接收数据的实体类,提取某个字段作为时间戳即可;

  • 水位线生成器:需要返回一个WatermarkGenerator接口类,实现该接口需要实现两个方法

    • onEvent():每来一条数据便调用一次,用于观察判断输入的事件;

    • onPeriodicEmit():由框架周期性的进行调用,生成水位线;

示例代码如下:

public class CustomWaterMarkDemo {

    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(1);
        // 2. 加载数据源
        environment.addSource(new EventSource())
                // 3. 设置水位线逻辑
                .assignTimestampsAndWatermarks(new CustomWatermarkStrategy())
                // 4. 打印输出
                .print();
        // 5. 执行程序
        environment.execute();
    }

}

自定义水位线策略 CustomWatermarkStrategy:

public class CustomWatermarkStrategy implements WatermarkStrategy<Event> {

    @Override
    public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        return new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event event, long l) {
                return event.timestamp;
            }
        };
    }

    @Override
    public WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new CustomPeriodicGenerator();
//        return new CustomPunctuatedGenerator();
    }

    /**
     * 周期性水位线生成器(Periodic Generator)
     */
    public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
        private Long delayTime = 5000L;
        private Long maxTs = Long.MIN_VALUE + delayTime + 1L;

        /**
         * 每来一条数据就调用一次
         */
        @Override
        public void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {
            // 更新最大时间戳
            maxTs = Math.max(event.timestamp, maxTs);
        }

        /**
         * 发射水位线,默认 200ms 调用一次
         * 可以调用环境配置的.setAutoWatermarkInterval()方法来设置
         */
        @Override
        public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
            watermarkOutput.emitWatermark(new Watermark(maxTs - delayTime - 1L));
        }
    }

    /**
     * 断点式水位线生成器(Punctuated Generator)
     */
    public static class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {
        /**
         * 遇到特定的事件时发出水位线
         */
        @Override
        public void onEvent(Event event, long l, WatermarkOutput watermarkOutput) {
            if ("曹操".equals(event.user)) {
                watermarkOutput.emitWatermark(new Watermark(event.timestamp - 1));
            }
        }

        /**
         * 发射水位线
         */
        @Override
        public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
            // 不需要做任何处理,在 onEvent() 中发射水位线
        }
    }
}

在自定义数据源中发送水位线

也可以在自定义的源算子中定义发送水位线的逻辑,采用该源算子就不需要在代码中使用assignTimestampsAndWatermarks重复设置水位线了。

示例代码如下:

public class ClickSourceWithWatermark implements SourceFunction<Event> {

    private Boolean flag = true;

    String[] users = {"曹操", "刘备", "孙权", "诸葛亮"};
    String[] urls = {"/home", "/test?id=1", "/test?id=2", "/play/football", "/play/basketball"};

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        Random random = new Random();
        while (flag) {
            Event event = new Event(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(users.length)],
                    Calendar.getInstance().getTimeInMillis()
            );
            // 使用 collectWithTimestamp 方法将数据发送出去,并指明数据中的时间戳的字段
            sourceContext.collectWithTimestamp(event, event.timestamp);
            // 发送水位线
            sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        flag = false;
    }

}

窗口 Window

窗口的基本概述

窗口的基本概念

Flink 是一种流式计算的引擎,主要用于处理无界数据流。但是对于无界的数据,有时也需要进行一些“批处理”的操作,即将无限的数据按照一定的规则切割成有限的数据块进行处理,这就是所谓的窗口。

在 Flink 中,窗口实际上就是一个数据的存储桶,如下图所示。窗口可以把流切割成有限大小的多个“存储桶”,每个数据都会分发到对应的桶中。当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理。
在这里插入图片描述

窗口的分类

Flink 中的窗口可以按照不同的角度进行分类:

  • 按照驱动类型分类

    • 时间窗口Time Window:可以定义窗口的开始和结束时间,到达结束时间时,出发计算并输出结果,同时窗口关闭销毁。时间窗口是一个左闭右开的区间;

    • 计数窗口Count Window:基于元素的个数来截取数据,到达固定的个数时出发计算并关闭窗口;

  • 按照窗口分配数据的规则分类

    • 滚动窗口Tumbling Windows:滚动窗口拥有固定的大小,是一种对数据均匀划分的切割方式,窗口之间没有间隔、没有重叠,是一种“首尾相接”的状态;

    • 滑动窗口Sliding Windows:滑动窗口同样拥有固定的大小,与滚动窗口不同的是,滑动窗口是存在重叠部分的。其存在一个滑动步长,每间隔该步长,则向前滑动一个步长的距离;

    • 会话窗口Session Windows:类似 web 应用中 session 的概念,需要制定一个超时时间参数,超过该时间没有数据到来则关闭窗口,因此绘画窗口只能基于时间定义;

    • 全局窗口Global Windows:该窗口没有切割数据流,而是将所有相同 key 的数据分配到同一个窗口,数据依旧是无界流。若希望对数据进行计算处理,还需要自定义“触发器”实现;

窗口的 API

在定义窗口操作之前,首先需要确定是基于按键分区的数据流KeyedStream来开窗还是在没有按键分区的DataStream上开窗。

  • 按键分区窗口:相同 key 的数据会被发送至同一个并行子任务,故窗口计算会被分配到多个并行子任务上执行,即每一组 key 都定义了一组窗口,各自独立统计;

    stream.keyBy(...)
          .window(...)
    
  • 非按键分区:并行度为 1,一般不使用

    stream.windowAll(...)
    

窗口的基本使用

窗口分配器

定义窗口分配器是构建窗口算子的第一步,定义数据应该被分到哪个窗口,即窗口分配器就是在指定窗口的类型。

时间窗口

  • 滚动处理时间窗口

    • 窗口分配器类:TumblingProcessingTimeWindows

    • 方法:of

    • 参数

      • 参数一:传入Time类型的参数 size,表示滚动窗口的大小;

      • 参数二:可以传入Time类型的参数 offset,表示窗口开始的时间偏移量;

  • 滑动处理时间窗口

    • 窗口分配器类:SlidingProcessingTimeWindows

    • 方法:of

    • 参数:

      • 参数一:传入Time类型的参数 size,表示滚动窗口的大小;

      • 参数二:传入Time类型的参数 slide,表示滑动窗口的滑动步长;

  • 处理时间会话窗口

    • 窗口分配器类:ProcessingTimeSessionWindows

    • 方法:withGap()withDynamicGap()

    • 参数:

      • withGap()传入一个Time类型的参数 size,表示会话的超时时间;

      • withDynamicGap()传入一个SessionWindowTimeGapExtractor,定义动态提取超时时间的逻辑;

  • 滚动事件时间窗口

    • 窗口分配器类:TumblingEventTimeWindows

    • 使用方法同“滚动处理时间窗口”;

  • 滑动事件时间窗口

    • 窗口分配器类:SlidingEventTimeWindows

    • 使用方法同“滑动处理时间窗口”;

  • 事件时间会话窗口

    • 窗口分配器类:EventTimeSessionWindows

    • 使用方法同“处理时间会话窗口”;

计数窗口

  • 滚动计数窗口

    • 直接调用countWindow()方法,传入长整型的参数 size,表示窗口的大小;
  • 滑动计数窗口

    • 直接调用countWindow()方法,传入长整型的参数 size 和 slide,表示窗口的大小以及窗口的滑动步长;
  • 全局窗口

    • 直接调用window(),分配器由GlobalWindows类提供;

窗口函数

窗口分配器定义了数据的收集逻辑,窗口函数则定义了收集起来的数据的计算逻辑。

经窗口分配器处理之后,数据被分配到对应的窗口中,而数据流经过转换得到的数据类型是 WindowedStream。这个类型并不是DataStream,因此并不能直接进行其他转换,必须
进一步调用窗口函数,对收集的数据进行处理计算之后,才能最终再次得到DataStream
在这里插入图片描述

增量聚合函数

规约函数 ReduceFunction

窗口的归约就是将窗口中收集到的数据两两进行归约。当我们进行流处理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。

示例代码如下,该代码段统计了在 5s 的时间窗口内各个 url 的访问量。

public class WindowReduceDemo {

    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(1);
        // 2. 加载数据源并设置水位线
        SingleOutputStreamOperator<Event> stream = environment
                // 2.1 加载数据源
                .addSource(new EventSource())
                // 2.2 获取时间戳、设置水位线
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));
        // 3. 数据处理及输出
        stream
                // 3.1 将数据转换为 tuple 二元组,方便计算
                .map(new MapFunction<Event, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(Event event) throws Exception {
                        return Tuple2.of(event.getUrl(), 1L);
                    }
                })
                // 3.2 按键进行分区
                .keyBy(tuple -> tuple.f0)
                // 3.3 设置滚动事件时间窗口,窗口大小为 5s
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 3.4 定义规约规则,窗口闭合时向下游发送结果
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                })
                // 3.5 输出结果
                .print();
        // 4. 执行程序
        environment.execute();
    }

}

聚合函数 AggregateFunction

增量聚合函数可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就要求我们必须在聚合前,先将数据转换成预期结果类型。

Flink 的 Window API 中的aggregate则提供了更加灵活的操作,该方法需要传入一个AggregateFunction的实现类作为参数。该接口存在四个方法,在下列代码示例中有功能解释。该代码段计算了在 10s 的时间窗口内,所有 url 的人均重复访问量(总访问量 / 访问用户数),每 2s 统计一次。

public class WindowAggregateDemo {

    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(1);
        // 2. 加载数据源并设置水位线
        SingleOutputStreamOperator<Event> stream = environment
                // 2.1 加载数据源
                .addSource(new EventSource())
                // 2.2 获取时间戳、设置水位线
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));
        // 3. 数据处理及输出
        stream
                // 3.1 分区,将所有数据发送到一个分区进行统计
                .keyBy(item -> true)
                // 3.2 设置滑动事件事件窗口,窗口大小为 10s,滑动步长为 2s
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(2)))
                // 3.3 定义聚合规则
                .aggregate(new CustomAggregate())
                // 3.4 输出结果
                .print();
        // 4. 执行程序
        environment.execute();
    }

    public static class CustomAggregate implements AggregateFunction<Event, Tuple2<HashSet<String>, Long>, Double> {
        /**
         * 创建累加器
         */
        @Override
        public Tuple2<HashSet<String>, Long> createAccumulator() {
            return Tuple2.of(new HashSet<String>(), 0L);
        }

        /**
         * 累加逻辑设计
         */
        @Override
        public Tuple2<HashSet<String>, Long> add(Event event, Tuple2<HashSet<String>, Long> accumulator) {
            // 每当一条属于该窗口的数据到来,就进行累加并返回累加器
            accumulator.f0.add(event.user);
            return Tuple2.of(accumulator.f0, accumulator.f1 + 1L);
        }

        /**
         * 窗口闭合时执行计算
         */
        @Override
        public Double getResult(Tuple2<HashSet<String>, Long> accumulator) {
            // 计算结果并发送到下游
            return (double) accumulator.f1 / accumulator.f0.size();
        }

        /**
         * 合并两个累加器
         */
        @Override
        public Tuple2<HashSet<String>, Long> merge(Tuple2<HashSet<String>, Long> hashSetLongTuple2, Tuple2<HashSet<String>, Long> acc1) {
            return null;
        }
    }

}

全窗口函数

ProcessWindowFunction是 Window API 中最底层的通用窗口函数接口。之所以说它“最底
层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction还可以获取到一个“上下文对象”Context。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。

ProcessWindowFunction的优势是以牺牲性能和资源为代价的,作为一个全窗口函数其需要将所有的数据缓存下来,等到窗口触发计算时再使用。

示例代码如下,该代码段计算了在 5s 的时间窗口内所有 url 的独立访客数量。

public class ProcessWindowDemo {

    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(1);
        // 2. 加载数据源并设置水位线
        SingleOutputStreamOperator<Event> stream = environment
                // 2.1 加载数据源
                .addSource(new EventSource())
                // 2.2 获取时间戳、设置水位线
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));
        // 3. 数据处理及输出
        stream
                // 3.1 分区,将所有数据发送到一个分区进行统计
                .keyBy(item -> true)
                // 3.2 设置滚动事件时间窗口,窗口大小为 10s
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                // 3.3 定义窗口函数处理规则
                .process(new CustomProcessWindow())
                // 3.4 输出结果
                .print();
        // 4. 执行程序
        environment.execute();
    }

    public static class CustomProcessWindow extends ProcessWindowFunction<Event, String, Boolean, TimeWindow> {
        /**
         * 窗口函数处理规则,窗口关闭时执行处理
         */
        @Override
        public void process(Boolean aBoolean, ProcessWindowFunction<Event, String, Boolean, TimeWindow>.Context context,
                            Iterable<Event> iterable, Collector<String> collector) {
            // 创建用户统计Set
            HashSet<String> userSet = new HashSet<>();
            for (Event event: iterable) {
                userSet.add(event.user);
            }
            long start = context.window().getStart();
            long end = context.window().getEnd();
            // 定制输出内容
            collector.collect("窗口【" + new TimeStamp(start) + "~" + new TimeStamp(end)
                    + "】的独立访客数量为>>>" + userSet.size());
        }
    }

}

增量聚合和全窗口函数的结合使用

通过上面对增量聚合以及全窗口函数的了解,不难发现增量聚合的计算更加高效,其将计算过程分摊到窗口收集数据的过程当中,而全窗口函数则是将数据全部收集起来,等到窗口要闭合时再统一处理,因此增量聚合函数效率更高。

但是全窗口函数的优势在于其可以提供更多的信息,其只负责收集数据并提供信息,这让我们可以对数据执行更加灵活的操作。因此,在实际使用中,常把增量聚合以及全窗口函数结合使用。

通过结合使用,只需要在增量聚合中维护一个状态。当窗口触发需要执行计算时,全窗口函数直接获取增量函数的结果即可。

示例代码如下,该代码段计算了在 10s 的时间窗口内各个 url 的人均重复访问量(url 访问量 / url 访问用户数),并以 EventUrlCount 对象的形式输出结果,每 5s 计算一次。

public class CombinedUseDemo {

    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(1);
        // 2. 加载数据源并设置水位线
        SingleOutputStreamOperator<Event> stream = environment
                // 2.1 加载数据源
                .addSource(new EventSource())
                // 2.2 获取时间戳、设置水位线
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));
        // 3. 数据处理及输出
        stream
                // 3.1 分区,将所有数据发送到一个分区进行统计
                .keyBy(item -> item.url)
                // 3.2 设置滑动事件事件窗口,窗口大小为 10s,滑动步长为 5s
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                // 3.3 结合使用增量聚合函数和全窗口函数
                .aggregate(new UrlCountAgg(), new UrlCountRes())
                // 3.4 输出结果
                .print();
        // 4. 执行程序
        environment.execute();
    }

    /**
     * 增量聚合函数定义
     */
    public static class UrlCountAgg implements AggregateFunction<Event, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event event, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return null;
        }
    }

    /**
     * 全窗口函数定义
     */
    public static class UrlCountRes extends ProcessWindowFunction<Long, EventUrlCount, String, TimeWindow> {
        @Override
        public void process(String url, ProcessWindowFunction<Long, EventUrlCount, String, TimeWindow>.Context context,
                            Iterable<Long> iterable, Collector<EventUrlCount> collector) {
            collector.collect(new EventUrlCount(
                    url,
                    iterable.iterator().next(),
                    context.window().getStart(),
                    context.window().getEnd()
            ));
        }
    }

}

实体类 EventUrlCount

@Data
@NoArgsConstructor
@AllArgsConstructor
public class EventUrlCount {

    public String url;
    public Long count;
    public Long windowStart;
    public Long windowEnd;


}

测试水位线与窗口的使用

在之前的介绍中,水位线就是程序时钟的指示,而时间窗口也是根据时钟对数据进行分割的,在了解完水位线和窗口的知识后,可以将两者结合起来,通过下列测试程序简单的感受一下水位线对窗口的控制。

测试代码如下:此处采用 socket 数据源算子,详情参考Flink – DataStream API

public class WaterMarkWithWindowDemo {

    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(1);
        // 2. 加载数据源进行数据处理
        environment
                // 2.1 加载 socket 数据源
                .socketTextStream("XXX.XX.XX.XXX", 8080)
                // 2.2 对数据源进行简单处理,封装成对象
                .map(new MapFunction<String, Event>() {
                    @Override
                    public Event map(String s) throws Exception {
                        String[] split = s.split(",");
                        return new Event(
                                split[0].trim(),
                                split[1].trim(),
                                Long.valueOf(split[2].trim())
                        );
                    }
                })
                // 2.3 设置水位线,时延5s,timestamp 字段作为时间戳
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long recordTimestamp) {
                                return event.timestamp;
                            }
                        }))
                // 2.4 按照 user 字段进行分区
                .keyBy(event -> event.user)
                // 2.5 配置滚动时间窗口,窗口大小为 10s
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                // 2.6 配置全窗口处理函数
                .process(new WaterMarkResult())
                // 2.7 输出结果
                .print();
        // 3. 执行处理
        environment.execute();
    }

    public static class WaterMarkResult extends ProcessWindowFunction<Event, String, String, TimeWindow> {
        /**
         * 全局窗口处理函数逻辑
         */
        @Override
        public void process(String s, ProcessWindowFunction<Event, String, String, TimeWindow>.Context context,
                            Iterable<Event> iterable, Collector<String> collector) throws Exception {
            long start = context.window().getStart();
            long end = context.window().getEnd();
            long currentWatermark = context.currentWatermark();
            long count = iterable.spliterator().getExactSizeIfKnown();
            collector.collect("窗口[" + start + "~" + end + "]中共有" + count + "个元素|窗口闭合计算时,水位线处于>>>" + currentWatermark);
        }
    }

}

执行测试,观察结果:

注意:窗口左闭右开,如 10000ms 的数据,应属于【10000-20000】窗口
在这里插入图片描述

窗口的其他 API

对于一个窗口算子,窗口分配器与窗口函数是必不可少的。除此之外,Flink 还提供了一些可选的 API,让我们可以更加灵活的控制窗口行为。

触发器 Trigger

触发器主要用于控制窗口何时进行触发计算,其是窗口算子的内部属性。对于 Flink 内置的窗口类型,其内部触发器都已经做了相应的实现,因此一般情况下不需要我们自定义触发器执行。

若业务逻辑确实需求自定义触发器,则直接调用 WindowedStream 的trigger()方法并传入自定义的触发器即可。如下所示。

stream.keyBy(...)
      .window(...)
      .trigger(new CustomTrigger())

自定义触发器需要继承Trigger抽象类,并实现下列四个方法:

  • onElement():窗口中每到来一个元素,都会调用该方法;

  • onEventTime():当注册的事件时间定时器触发时,将调用该方法;

  • onProcessingTime:当注册的处理时间定时器触发时,将调用该方法;

  • clear():当窗口关闭销毁时,将调用该方法,一般用于清除某些自定义的状态;

上述的方法中,处最后一个方法,另外三个方法的返回值均为枚举类型TriggerResult,其中定义了对窗口进行操作的四种类型,用于响应事件:

  • CONTINUE:继续,什么都不做;

  • FIRE:触发,触发计算,输出结果;

  • PURGE:清除,清空窗口中的所有数据,销毁窗口;

  • FIRE_AND_PURGE:触发并清除,触发计算输出结果,并清除窗口;

移除器 Evictor

移除器用于定义移除某些数据的逻辑,不同的窗口类型都有各自预实现的移除器。

若业务逻辑确实需求自定义移除器,则直接调用 WindowedStream 的evictor()方法并传入自定义的移除器即可。如下所示。

stream.keyBy(...)
      .window(...)
      .evictor(new CustomEvictor())

自定义移除器需要实现 Evictor 接口并实现下列两个方法:

  • evictBefore():定义执行窗口函数之前的移除数据操作;

  • evictAfter():定义执行窗口函数之后的一处数据操作;

允许延迟 Allowed Lateness

在事件时间语义当中,窗口中会出现迟到数据。若这部分数据在窗口触发计算并输出结果之后才到达,此时窗口一般已经销毁,因此迟到数据一般就会直接丢弃,这将导致统计结果不准确。

为解决迟到数据问题,除了之前提到过的设置水位线延迟的方法,Flink 还为开发者提供了一个特殊的接口。该接口可以为窗口算子提供一个”允许最大延迟“,在这段时间内,窗口不会被销毁,迟到的数据仍然可以被收集到相应的窗口并触发计算,直到时钟被推进到”水位线 + 允许最大延迟“时才会真正的关闭窗口。

基于 WindowedStream 调用allowedLateness()方法并传入一个Time类型的延迟时间即可完成允许延迟的设置,如下所示。

stream.keyBy(...)
      .window(TumblingEventTimeWindows.of(Time.hours(1)))
      .allowedLateness(Time.minutes(5))

侧入流 Side Output

在某些情况下,即便设置了”水位线延迟“+”允许延迟时间“,很多数据还是会被丢弃。Flink 提供了侧输出流 side output 的方式来处理迟到的数据。该输出流相当于是数据流的一个分支,专门用于存放迟到过久的、本应该丢弃的数据。

基于 WindowedStream 调用sideOutputLateData()方法并传入一个OutputTag类型的侧入流输出标签即可实现该功能,OutputTag中的类型应与数据流中的类型相同,如下所示。

OutputTag<Event> outputTag = new OutputTag<Event>("late") {};
stream.keyBy(...)
      .window(TumblingEventTimeWindows.of(Time.hours(1)))
      .sideOutputLateData(outputTag)

基于窗口处理完成之后的 DataStream,调用getSideOutput()方法并传入对应的输出标签,就可以获取到迟到数据所在的流,如下所示。

这里需要注意,getSideOutput()是 SingleOutputStreamOperator 的方法,获取到的侧输出流数据类型应该和OutputTag指定的类型一致,与窗口聚合之后流中的数据类型可以不同。

SingleOutputStreamOperator<AggResult> winAggStream = stream
     .keyBy(...)
     .window(TumblingEventTimeWindows.of(Time.hours(1)))
     .sideOutputLateData(outputTag)
     .aggregate(new MyAggregateFunction())
DataStream<Event> lateStream = winAggStream.getSideOutput(outputTag);

迟到数据的处理

通过对水位线以及窗口相关知识的了解,在此总结一下对于迟到数据的处理方式。在 Flink 中,对迟到数据的处理主要有下述三种方式:

  1. 设置水位线延迟时间;

  2. 设置允许延时,允许窗口处理迟到数据;

  3. 将迟到数据放入侧输出流;

一般来说,对于可控的较小延迟,通过 1、2 所述的方法就可以解决。但某些情况下,即便有前面的双重保证,但窗口也无法一直等待下去,总归是要关闭的。此时只能用 3 所属方法来兜底,保证数据不会丢失。后续开发者可以从侧输出流中重新获取数据并判断其所属的窗口,对数据进行手动的汇总更新。

下述代码在之前测试增量聚合函数与全窗口函数结合使用的代码上加以改进,增加了对迟到数据的处理,可以作为简单示例感受一下整个处理过程,代码如下:

public class LateDataDemo {

    public static void main(String[] args) throws Exception {
        // 1. 环境准备
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        environment.setParallelism(1);
        // 2. 加载数据源并设置水位线
        SingleOutputStreamOperator<Event> stream = environment
                // 2.1 加载数据源
                .socketTextStream("47.92.146.85", 8080)
                // 2.2 对数据源进行简单处理,封装成对象
                .map(new MapFunction<String, Event>() {
                    @Override
                    public Event map(String s) throws Exception {
                        String[] split = s.split(",");
                        return new Event(
                                split[0].trim(),
                                split[1].trim(),
                                Long.valueOf(split[2].trim())
                        );
                    }
                })
                // 2.3 获取时间戳、设置水位线
                .assignTimestampsAndWatermarks(WatermarkStrategy
                        // 迟到数据处理方式一:设置水位线延时
                        .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2L))
                        .withTimestampAssigner((SerializableTimestampAssigner<Event>) (event, l) -> event.timestamp));

        // 定义侧输出流标签
        OutputTag<Event> outputTag = new OutputTag<Event>("late"){};

        // 3. 数据处理及输出
        SingleOutputStreamOperator<EventUrlCount> result = stream
                // 3.1 分区,将所有数据发送到一个分区进行统计
                .keyBy(event -> event.url)
                // 3.2 设置滚动窗口,窗口大小为 10s
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                // 3.3 设置窗口允许延迟时间为 1min
                // 迟到数据处理方式二:设置窗口允许延时时间
                .allowedLateness(Time.minutes(1L))
                // 3.4 将最终的迟到数据输出到侧输出流
                // 迟到数据处理方式三:迟到数据入侧输出流
                .sideOutputLateData(outputTag)
                // 3.5 结合使用增量聚合函数和全窗口函数
                .aggregate(new UrlCountAgg(), new UrlCountRes());

        // 4. 输出结果
        // 4.1 正常数据输出
        result.print("result");
        // 4.2 迟到数据输出
        result.getSideOutput(outputTag).print("late");

        // 5. 执行程序
        environment.execute();
    }

    /**
     * 增量聚合函数定义
     */
    public static class UrlCountAgg implements AggregateFunction<Event, Long, Long> {
        @Override
        public Long createAccumulator() {
            return 0L;
        }

        @Override
        public Long add(Event event, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long aLong, Long acc1) {
            return null;
        }
    }

    /**
     * 全窗口函数定义
     */
    public static class UrlCountRes extends ProcessWindowFunction<Long, EventUrlCount, String, TimeWindow> {
        @Override
        public void process(String url, ProcessWindowFunction<Long, EventUrlCount, String, TimeWindow>.Context context,
                            Iterable<Long> iterable, Collector<EventUrlCount> collector) {
            collector.collect(new EventUrlCount(
                    url,
                    iterable.iterator().next(),
                    context.window().getStart(),
                    context.window().getEnd()
            ));
        }
    }

}

测试结果如下,数据的关键时间点用红色方框标出,箭头指示的是聚合结果中包含的数据
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/346088.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ccc-Backpropagation-李宏毅(7)

文章目录NotationBackpropagationForward passBackward passSummaryNotation 神经网络求解最优化Loss function时参数非常多&#xff0c;反向传播使用链式求导的方式提升计算梯度向量时的效率&#xff0c;链式法则如下&#xff1a; Backpropagation 损失函数计算为所有样本…

Pulsar

一、简介Apache Pulsar是Apache软件基金会顶级项目&#xff0c;是下一代云原生分布式消息流平台&#xff0c;集消息、存储、轻量化函数式计算为一体&#xff0c;采用计算与存储分离架构设计&#xff0c;支持多租户、持久化存储、多机房跨区域数据复制&#xff0c;具有强一致性、…

常见的函数式编程操作

1、柯里化&#xff1a; (概念 & 应用 & 好处) 柯里化的概念&#xff1a; 柯里化&#xff08;Currying&#xff09;是把接受多个参数的函数变成接受单一参数的函数&#xff0c;并且返回一个用于接受剩余参数的新函数&#xff0c;当参 数都传递完成后&#xff0c; 则立…

Linux 文件权限讲解

目录 文件的一般权限 一般权限有哪些 使用ls -l查看文件/目录权限 配置一般权限和文件所属信息 chmod 修改文件权限 chown 修改文件所属信息&#xff08;所有者和所属组&#xff09; 文件特殊权限 SUID 针对所有者的特殊权限 SGID SBID 配置特殊权限 文件的隐藏权限…

智能网联汽车ASIL安全等级如何划分

目录一、功能安全标准二、功能安全等级定义三、危险事件的确定四、ASIL安全等级五、危险分析和风险评定六、功能安全目标的分解一、功能安全标准 ISO 26262《道路车辆功能安全》脱胎于IEC 61508《电气/电子/可编程电子安全系统的功能安全》&#xff0c;主要定位在汽车行业&…

qt 配置open3d

一、配置前要先编程open3d二、开始配置新建txt 把txt 修改为 pri在pro 文件中添加 include(F:/xuwanlu/control.pri)重新构建项目然后回多出来pri在pri中添加open3d目录INCLUDEPATH F:\open3d\include\Open3D\3rdparty\GLFW\include \F:\open3d\include\Open3D\3rdparty\gle…

Golang 给视频添加背景音乐 | Golang工具

目录 前言 环境依赖 代码 总结 前言 本文提供给视频添加背景音乐&#xff0c;一如既往的实用主义。 主要也是学习一下golang使用ffmpeg工具的方式。 环境依赖 ffmpeg环境安装&#xff0c;可以参考我的另一篇文章&#xff1a;windows ffmpeg安装部署_阿良的博客-CSDN博客 …

当你按下方向键,电视是如何寻找下一个焦点的

我工作的第一家公司主要做的是一个在智能电视上面运行的APP&#xff0c;其实就是一个安卓APP&#xff0c;也是混合开发的应用&#xff0c;里面很多页面是H5开发的。 电视我们都知道&#xff0c;是通过遥控器来操作的&#xff0c;没有鼠标也不能触屏&#xff0c;所以“点击”的…

ChatGPT已应用到跨境电商领域,规模化运营指日可待

最近各大平台都卷起了一股“ChatGPT”的热潮&#xff0c;论坛、贴吧、微博甚至短视频都对这个新兴的东西津津乐道&#xff0c;在这些评论区里我们可以发现&#xff0c;不管说什么职业&#xff0c;不管年龄性别&#xff0c;ChatGPT都开始被许多人关注。那么ChatGPT到底是个什么东…

大数据框架之Hadoop:HDFS(五)NameNode和SecondaryNameNode(面试开发重点)

5.1NN和2NN工作机制 5.1.1思考&#xff1a;NameNode中的元数据是存储在哪里的&#xff1f; 首先&#xff0c;我们做个假设&#xff0c;如果存储在NameNode节点的磁盘中&#xff0c;因为经常需要进行随机访问&#xff0c;还有响应客户请求&#xff0c;必然是效率过低。因此&am…

FPGA入门系列17--task

文章简介 本系列文章主要针对FPGA初学者编写&#xff0c;包括FPGA的模块书写、基础语法、状态机、RAM、UART、SPI、VGA、以及功能验证等。将每一个知识点作为一个章节进行讲解&#xff0c;旨在更快速的提升初学者在FPGA开发方面的能力&#xff0c;每一个章节中都有针对性的代码…

如何选择传感器输出模式——电流输出还是电压输出?

一 背景及挑战 传感器在汽车测试系统中发挥着信息的采集和传输作用&#xff0c;可以称为汽车的“神经元”。 按照功能可以将传感器分为压力传感器、流量传感器、温湿度传感器和电流传感器等。传感器的主要指标是精度、测量范围和响应时间等。在满足指标的情况下&#xff0c;通…

御黑行动来袭--助力三月重保,构筑安全防线!

三月重保在即&#xff0c;重要网站及业务系统“零风险 零事故”是终极目标&#xff0c;作为业界网络安全实战派“老兵”--知道创宇将一如既往&#xff0c;为您提供重保期间“万无一失”的重要网站及业务系统防护。 值此三月重保的重要备战期&#xff0c;知道创宇推出由主力产品…

高灵敏度压电传感器频率温度特性测量中的TEC型精密温控系统

摘要&#xff1a;为解决石英晶体微量天平这类压电传感器频率温度特性全自动测量中存在的温度控制精度差和测试效率低的问题&#xff0c;本文在TEC半导体制冷技术基础上&#xff0c;提出了小尺寸、高精度和全自动程序温控的解决方案&#xff0c;给出了温控装置的详细结构和实现高…

计算机网络常见面试题总结

网络分层结构 计算机网络体系大致分为三种&#xff0c;OSI七层模型、TCP/IP四层模型和五层模型。一般面试的时候考察比较多的是五层模型。 TCP/IP五层模型&#xff1a;应用层、传输层、网络层、数据链路层、物理层。 应用层&#xff1a;为应用程序提供交互服务。在互联网中的…

一、产品经理——【岗位和能力要求】【项目流程】【产品体验报告】

0. 产品经理课程路线图 产品基础阶段&#xff1a;核心目的是了解行业、掌握技能 1. 认识互联网行业 1.1. 传统行业 vs 互联网行业 1.2. 互联网行业概念 1.3. 小结 2. 认识产品经理 2.1. 不同场景下的产品经理的职责差异 公司团队、领导对产品经理的期望不同&#xff0c;做的…

交叉编译的概念及交叉编译工具的安装

目录 一.什么是交叉编译 二.为什么要交叉编译&#xff1f; 三.交叉编译链的安装 四.相关使用方法 五.软连接 一.什么是交叉编译 交叉编译是指将一种编程语言编写的程序编译成另一种编程语言的程序&#xff0c;通常是在不同的操作系统或硬件环境中使用的。这种编译过程会产…

【手写 Vuex 源码】第十一篇 - Vuex 插件的开发

一&#xff0c;前言 上一篇&#xff0c;主要介绍了 Vuex-namespaced 命名空间的实现&#xff0c;主要涉及以下几个点&#xff1a; 命名空间的介绍和使用&#xff1b;命名空间的逻辑分析与代码实现&#xff1b;命名空间核心流程梳理&#xff1b; 本篇&#xff0c;继续介绍 Vu…

GWAS:mtag (Multi-Trait Analysis of GWAS) 分析

mtag (Multi-Trait Analysis of GWAS)作用&#xff1a;通过对多个表型相似的GWAS summary结果进行联合分析&#xff0c;发现更多的表型相关基因座。 以抑郁症状、神经质和主观幸福感这三个表型为例&#xff0c;分别对他们进行GWAS分析&#xff0c;鉴定得到32、9 和 13个基因座与…

前端食堂技术周刊第 70 期:Volar 的新开端、Lighthouse 10、良好的组件设计、React 纪录片、2022 大前端总结

美味值&#xff1a;&#x1f31f;&#x1f31f;&#x1f31f;&#x1f31f;&#x1f31f; 口味&#xff1a;黑巧克力 食堂技术周刊仓库地址&#xff1a;https://github.com/Geekhyt/weekly 本期摘要 Volar 的新开端Chrome 110 的新功能Lighthouse 10Nuxt v3.2.0加速 JavaSc…