Flink系列之Flink之Time和WaterMark深入剖析

news2024/11/14 20:43:35

title: Flink系列


一、Flink Window 常见需求背景

1.0 理论描述

需求描述:

每隔 5 秒,计算最近 10 秒单词出现的次数。 滑动窗口

每隔 5 秒,计算最近 5 秒单词出现的次数。 滚动窗口

在这里插入图片描述

第一个: 关于 TimeCharacteristic

	ProcessingTime
	IngestionTime
	EventTime

TimeCharacteristic在源码中的位置:

路径: org.apache.flink.streaming.api.TimeCharacteristic

在这里插入图片描述

第二个:SlidingProcessingTimeWindows 可以拆分为:Sliding + ProcessingTime + TimeWindows,是 WindowAssigner 的子类

常见的是下面四类:

	SlidingProcessingTimeWindows
	SlidingEventTimeWindows
	TumblingEventTimeWindows
	TumblingProcessingTimeWindows

SlidingProcessingTimeWindows 在源码中的位置:

路径:org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows

在这里插入图片描述

WindowAssigner的一些子类。

在这里插入图片描述

1.1 TimeWindow 实现

代码如下:

package com.aa.flinkjava.window;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window
 * 时间窗口第一个案例,入门版
 * 时间窗口版本  WordCount
 * 需求: 每隔5秒,统计最近10秒的数据
 */
public class TimeWindow01_TimeWindow {
    public static void main(String[] args) throws Exception {
        //1、获取环境对象
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //使用 老的API timeWindow(Time.seconds(10),Time.seconds(5)) 。需要设置如下参数:
        //executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        //2、输入源
        DataStreamSource<String> dataStreamSource = executionEnvironment.socketTextStream("hadoop12", 9999);

        //3、逻辑处理
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2(word, 1));
                }
            }
        })
                .keyBy(tuple -> tuple.f0)
                //每隔5秒,统计最近10秒的数据
                //下面timeWindow(Time.seconds(10),Time.seconds(5)) 是老的API。
                // 老的API,如果想要使用,需要在前面设置:executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
                //.timeWindow(Time.seconds(10),Time.seconds(5))
                //下面是新的版本的使用方式 window
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .sum(1);

        //4、输出
        result.print();

        //5、启动执行
        executionEnvironment.execute();
    }
}

1.2 ProcessWindowFunction

代码如下:

package com.aa.flinkjava.window;

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window
 * 时间窗口第二个案例  就是为了看一下内部的实现细节。
 * 需求: 每隔5秒,统计最近10秒的数据
 */
public class TimeWindow02_ProcessWindowFunction {
    public static void main(String[] args) throws Exception {
        //1、获取环境对象
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //使用 老的API timeWindow(Time.seconds(10),Time.seconds(5)) 。需要设置如下参数:
        //executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        //2、输入源
        DataStreamSource<String> dataStreamSource = executionEnvironment.socketTextStream("hadoop12", 9999);

        //3、逻辑处理
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2(word, 1));
                }
            }
        })
                .keyBy(tuple -> tuple.f0)
                //每隔5秒,统计最近10秒的数据
                //下面timeWindow(Time.seconds(10),Time.seconds(5)) 是老的API。
                // 老的API,如果想要使用,需要在前面设置:executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
                //.timeWindow(Time.seconds(10),Time.seconds(5))
                //下面是新的版本的使用方式 window
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                //.sum(1);
                .process(new MySumProcessFunction());

        //4、输出
        result.print();

        //5、启动执行
        executionEnvironment.execute();
    }

    /**
     * ProcessWindowFunction[IN, OUT, KEY, W <: Window]
     * Type parameters:
     * IN – The type of the input value.
     * OUT – The type of the output value.
     * KEY – The type of the key.
     * W – The type of the window
     */

    //注意不要导入错包了。这个ProcessWindowFunction 是java包下面的,不是scala包下面的,scala包下面也有这个。注意。
    static class MySumProcessFunction extends ProcessWindowFunction<Tuple2<String, Integer>,Tuple2<String, Integer>,
            String,TimeWindow>{

        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");

        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {

            System.out.println("==========出发窗口的分界线=========");

            System.out.println("当前系统时间: " + df.format(System.currentTimeMillis()));
            System.out.println("当前窗口处理时间: " + df.format(context.currentProcessingTime()));

            System.out.println("当前窗口开始时间: " + df.format(context.window().getStart()));
            System.out.println("当前窗口结束时间: " + df.format(context.window().getEnd()));


            int count = 0;
            for (Tuple2<String, Integer> element : elements) {
                count++;
            }
            out.collect(Tuple2.of(key,count));
        }
    }

}

1.3 Flink Time 种类

Flink 的数据流处理定义了三种 Time,分别是:

  1. Event Time:事件产生的时间,它通常由事件中的时间戳描述。
  2. Ingestion time:事件进入 Flink 的时间(一般不用)
  3. Processing Time:事件被处理时当前系统的时间

官网的一张图,如下:

在这里插入图片描述

举例解释:现有一条生产日志样例,如下

2022-11-11 19:00:01 134 INFO executor.Executor: Finished task in state 0.0

这条数据进入 Flink 的时间是 2022-11-11 20:00:00 102,到达 window 处理的时间为 2022-11-11 20:00:01 100

则对应的 三个 Time 分别是:

Event time:2022-11-11 19:00:01 134
Ingestion time:2022-11-11 20:00:00 102
Processing time:2022-11-11 20:00:01 100

在企业生产环境中,一般使用 EventTime 来进行计算,会更加符合业务需求。比如下述需求:

统计每分钟内接口调用失败的错误日志个数。
统计每分钟每种类型商品的成交单数。

接下来后续的知识点,就是在告诉大家,如果事件是无序的。所有的事件按照 event time 是乱序到达的

假设数据有序,基于 Process Time Window 做处理有问题么? 没问题
假设数据无序,基于 Process Time Window 做处理有问题么? 有问题
解决方案:基于 eventTime  从去执行处理,会纠正部分结果,不会把所有计算都算正确
解决方案:基于 Flink 提供的 watermark 实现这个需求

最终的结论:Flink 基于 Window + EventTime + Watermark 联合起来完成乱序数据的处理

// 如果基于 evnetTime 和 water 去实现乱序数据的处理
.assignTimestampsAndWatermarks(
	// 指定 watermark 的规则
	WatermarkStrategy.forGenerator((ctx) -> new PeriodicWatermarkGenerator())
		// 指定 eventTime 的定义
		.withTimestampAssigner((ctx) -> new TimeStampExtractor())) //指定时间字段

二、Process Time Window(有序)

2.1 需求(理论)

需求:每隔 5秒 计算最近 10秒 的单词出现的次数(类似于需求:接口调用出错的次数)

会产生的窗口有:

20:55:00 - 20:55:10
20:55:05 - 20:55:15
20:55:10 - 20:55:20
....

自定义source,模拟:第 13 秒的时候连续发送 2 个事件,第 16 秒的时候再发送 1 个事件

在这里插入图片描述

2.2 代码

代码如下:

package com.aa.flinkjava.window;

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window
 *
 * 自定义Source方式进行 单词统计
 * 单词过来的顺序 是 正序 的
 */
public class TimeWindow03_WithMySource1 {
    public static void main(String[] args) throws Exception {
        //1、获取环境对象
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //使用 老的API timeWindow(Time.seconds(10),Time.seconds(5)) 。需要设置如下参数:
        //executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        //2、输入源
        DataStreamSource<String> dataStreamSource = executionEnvironment.addSource(new MySource());

        //3、逻辑处理
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2(word, 1));
                }
            }
        })
                .keyBy(tuple -> tuple.f0)
                //每隔5秒,统计最近10秒的数据
                //下面timeWindow(Time.seconds(10),Time.seconds(5)) 是老的API。
                // 老的API,如果想要使用,需要在前面设置:executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
                //.timeWindow(Time.seconds(10),Time.seconds(5))
                //下面是新的版本的使用方式 window
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                //.sum(1);
                .process(new MySumProcessFunction());

        //4、输出
        result.print();

        //5、启动执行
        executionEnvironment.execute();
    }

    //注意不要导入错包了。这个ProcessWindowFunction 是java包下面的,不是scala包下面的,scala包下面也有这个。注意。
    static class MySumProcessFunction extends ProcessWindowFunction<Tuple2<String, Integer>,Tuple2<String, Integer>,
            String, TimeWindow> {

        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");

        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {

            System.out.println("==========出发窗口的分界线=========");

            System.out.println("当前系统时间: " + df.format(System.currentTimeMillis()));
            System.out.println("当前窗口处理时间: " + df.format(context.currentProcessingTime()));

            System.out.println("当前窗口开始时间: " + df.format(context.window().getStart()));
            System.out.println("当前窗口结束时间: " + df.format(context.window().getEnd()));


            int count = 0;
            for (Tuple2<String, Integer> element : elements) {
                count++;
            }
            out.collect(Tuple2.of(key,count));
        }
    }

    static class MySource implements SourceFunction<String>{

        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            String currentTime = String.valueOf(System.currentTimeMillis());
            System.out.println("判断条件之前的currentTime : " + currentTime);

            //下面这个判断的操作是为了保证是 10 s 的倍数
            while (Integer.valueOf(currentTime.substring(currentTime.length() - 4)) > 100) {
                currentTime = String.valueOf(System.currentTimeMillis());
                //System.out.println("while里面的currentTime : " + currentTime);
                continue;
            }
            System.out.println("判断条件之后的currentTime : " + currentTime);

            System.out.println("当前时间:" + df.format(System.currentTimeMillis()));
            /**
             * 当前时间:15:46:30
             * ==========出发窗口的分界线=========
             * 当前系统时间: 15:46:45
             * 当前窗口处理时间: 15:46:45
             * 当前窗口开始时间: 15:46:35
             * 当前窗口结束时间: 15:46:45
             * 7> (flink,2)
             * ==========出发窗口的分界线=========
             * 当前系统时间: 15:46:50
             * 当前窗口处理时间: 15:46:50
             * 当前窗口开始时间: 15:46:40
             * 当前窗口结束时间: 15:46:50
             * 7> (flink,3)
             * ==========出发窗口的分界线=========
             * 当前系统时间: 15:46:55
             * 当前窗口处理时间: 15:46:55
             * 当前窗口开始时间: 15:46:45
             * 当前窗口结束时间: 15:46:55
             * 7> (flink,1)
             */
            //开始之后第12秒的时候放进去两个单词数据
            TimeUnit.SECONDS.sleep(12);
            ctx.collect("flink");
            ctx.collect("flink");

            //开始之后第12+4秒的时候放进去一个单词数据
            TimeUnit.SECONDS.sleep(4);
            ctx.collect("flink");

            TimeUnit.SECONDS.sleep(3600);
        }

        @Override
        public void cancel() {

        }
    }

}

2.3 图解

在这里插入图片描述

三、Process Time Window(无序)

3.1 需求(理论)

自定义 Source,模拟

1、正常情况下第 13 秒的时候连续发送 2 个事件
2、但是有一个事件在第 13 秒的发送出去成功了,另外一个事件数据在 19 秒的时候才发送出去
3、在第 16 秒的时候再发送 1 个事件

3.2 代码

代码如下:

package com.aa.flinkjava.window;

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.util.concurrent.TimeUnit;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window
 *
 * 自定义Source方式进行 单词统计
 * 单词过来的顺序 是 乱序 的
 *
 * 如果数据乱序到达的,基于ProcessingTime进行处理会有什么现象?
 */
public class TimeWindow04_WithMySource2 {
    public static void main(String[] args) throws Exception {
        //1、获取环境对象
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        //使用 老的API timeWindow(Time.seconds(10),Time.seconds(5)) 。需要设置如下参数:
        //executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

        //2、输入源
        DataStreamSource<String> dataStreamSource = executionEnvironment.addSource(new MySource());

        //3、逻辑处理
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2(word, 1));
                }
            }
        })
                .keyBy(tuple -> tuple.f0)
                //每隔5秒,统计最近10秒的数据
                //下面timeWindow(Time.seconds(10),Time.seconds(5)) 是老的API。
                // 老的API,如果想要使用,需要在前面设置:executionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
                //.timeWindow(Time.seconds(10),Time.seconds(5))
                //下面是新的版本的使用方式 window
                .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                //.sum(1);
                .process(new MySumProcessFunction());

        //4、输出
        result.print();

        //5、启动执行
        executionEnvironment.execute();
    }


    //注意不要导入错包了。这个ProcessWindowFunction 是java包下面的,不是scala包下面的,scala包下面也有这个。注意。
    static class MySumProcessFunction extends ProcessWindowFunction<Tuple2<String, Integer>,Tuple2<String, Integer>,
            String, TimeWindow> {

        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");

        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Tuple2<String, Integer>> out) throws Exception {

            System.out.println("==========触发窗口的分界线=========");

            System.out.println("当前系统时间: " + df.format(System.currentTimeMillis()));
            System.out.println("当前窗口处理时间: " + df.format(context.currentProcessingTime()));

            System.out.println("当前窗口开始时间: " + df.format(context.window().getStart()));
            System.out.println("当前窗口结束时间: " + df.format(context.window().getEnd()));


            int count = 0;
            for (Tuple2<String, Integer> element : elements) {
                count++;
            }
            out.collect(Tuple2.of(key,count));
        }
    }

    static class MySource implements SourceFunction<String> {

        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            String currentTime = String.valueOf(System.currentTimeMillis());

            //下面这个判断的操作是为了保证是 10 s 的倍数
            while (Integer.valueOf(currentTime.substring(currentTime.length() - 4)) > 100) {
                currentTime = String.valueOf(System.currentTimeMillis());
                continue;
            }

            System.out.println("当前时间:" + df.format(System.currentTimeMillis()));

            //开始之后第12秒的时候放进去两个单词数据
            TimeUnit.SECONDS.sleep(12);
            String log = "flink";
            ctx.collect(log);

            //开始之后第12+4秒的时候放进去一个单词数据
            TimeUnit.SECONDS.sleep(4);
            ctx.collect("flink");

            //数据出现了延迟,本来这条数据是应该第12秒的时候处理的,现在拖到了第19秒才处理输出。
            TimeUnit.SECONDS.sleep(3);
            ctx.collect(log);

            TimeUnit.SECONDS.sleep(3600);
        }

        @Override
        public void cancel() {

        }
    }
}

3.3 图解

在这里插入图片描述

四、使用 EventTime 处理无序

4.1 需求

由于在 三 中的上述程序执行得到的结果,并不是需求实现,所以需要改进,我们通过 Flink 提供的 EventTime 来改进

4.2 代码

代码如下:

package com.aa.flinkjava.window;

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window
 * 使用EventTime初步处理乱序数据
 */
public class TimeWindow05_WithMySource3_ByEventTime {
    public static void main(String[] args) throws Exception {
        //1、获取环境对象
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        //2、输入源
        DataStreamSource<String> dataStreamSource = executionEnvironment.addSource(new MySource());

        //3、逻辑处理
        SingleOutputStreamOperator<Tuple2<String, Long>> result = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
                String[] words = value.split(",");
                //给传输过来的字段进行拆分。  给数据和时间戳分开
                //第一个元素 是 数据 flink ,第二个是时间戳
                out.collect(Tuple2.of(words[0],Long.valueOf(words[1])));
            }
        })
                //指定日志当中的时间戳字段当做这个事件的EventTime。需要通过下面的assignTimestampsAndWatermarks 来指定。
                //withTimestampAssigner 就是用来指定时间戳定义的。
                //Watermark 在这个案例中先不管,后面案例专门讲解。
                .assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(context -> new MyWaterGenerator())
                .withTimestampAssigner(context -> new MyTimestampAssigner()))
                .keyBy(tuple -> tuple.f0)
                //每隔5秒,统计最近10秒的数据
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                //.sum(1);
                .process(new MySumProcessFunction());

        //4、输出
        result.print();

        //5、启动执行
        executionEnvironment.execute("TimeWindow05_WithMySource3_ByEventTime");
    }

    //注意不要导入错包了。这个ProcessWindowFunction 是java包下面的,不是scala包下面的,scala包下面也有这个。注意。
    static class MySumProcessFunction extends ProcessWindowFunction<Tuple2<String, Long>,Tuple2<String, Long>,
            String, TimeWindow> {

        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");

        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Long>> out) throws Exception {

            System.out.println("==========触发窗口的分界线=========");

            System.out.println("当前系统时间: " + df.format(System.currentTimeMillis()));
            System.out.println("当前窗口处理时间: " + df.format(context.currentProcessingTime()));

            System.out.println("当前窗口开始时间: " + df.format(context.window().getStart()));
            System.out.println("当前窗口结束时间: " + df.format(context.window().getEnd()));

            Long count = 0L;
            for (Tuple2<String, Long> element : elements) {
                count++;
            }
            out.collect(Tuple2.of(key,count));
        }
    }

    static class MySource implements SourceFunction<String> {

        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            String currentTime = String.valueOf(System.currentTimeMillis());

            //下面这个判断的操作是为了保证是 10 s 的倍数
            while (Integer.valueOf(currentTime.substring(currentTime.length() - 4)) > 100) {
                currentTime = String.valueOf(System.currentTimeMillis());
                continue;
            }

            System.out.println("当前时间:" + df.format(System.currentTimeMillis()));

            //开始之后第13秒的时候放进去两个单词数据
            TimeUnit.SECONDS.sleep(13);
            //给日志打上一个时间戳的标记
            String log = "flink," + System.currentTimeMillis();
            String log1 = log;
            ctx.collect(log);

            //开始之后第13+3秒的时候放进去一个单词数据
            TimeUnit.SECONDS.sleep(3);
            ctx.collect("flink," + System.currentTimeMillis());

            //数据出现了延迟,本来这条数据是应该第13秒的时候处理的,现在拖到了第19秒才处理输出。
            TimeUnit.SECONDS.sleep(3);
            ctx.collect(log1);

            TimeUnit.SECONDS.sleep(3600);
        }

        @Override
        public void cancel() {

        }
    }

    /**
     * 指定时间字段
     */
    static class MyWaterGenerator implements WatermarkGenerator<Tuple2<String,Long>>, Serializable{
        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
        @Override
        public void onEvent(Tuple2<String, Long> event, long eventTimestamp, WatermarkOutput output) {
            System.out.println("eventTimestamp: " + eventTimestamp);
            System.out.println("eventTimestamp: " + df.format(eventTimestamp));
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new Watermark(System.currentTimeMillis()));
        }
    }

    /**
     * 指定eventTime的字段
     */
    static class MyTimestampAssigner implements TimestampAssigner<Tuple2<String,Long>>{

        @Override
        public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
            return element.f1;
        }
    }

}

4.3 图解

在这里插入图片描述

五、使用 WaterMark 机制解决无序

5.1 需求

根据上述的测试发现:

纠正了第三个窗口的计算结果
第一个窗口的计算结果依然是错的

解决方案:需要让第一个窗口延迟一段时间再执行计算,也就是等待 第三条数据接收到的时候,再执行计算,就能得到正确结果。Flink 提供了 Watermark 机制来帮我们解决这个问题。

5.2 代码

代码如下:

package com.aa.flinkjava.window;

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window
 */
public class TimeWindow06_ByWaterMark01 {
    public static void main(String[] args) throws Exception {
        //1、获取环境对象
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        //2、输入源
        DataStreamSource<String> dataStreamSource = executionEnvironment.addSource(new MySource());

        //3、逻辑处理
        SingleOutputStreamOperator<Tuple2<String, Long>> result = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
                String[] words = value.split(",");
                //给传输过来的字段进行拆分。  给数据和时间戳分开
                out.collect(Tuple2.of(words[0],Long.valueOf(words[1])));
            }
        })
                // 基于eventTime 和water 去实现乱序数据的处理
                .assignTimestampsAndWatermarks(
                        //指定watermark的规则
                        WatermarkStrategy.forGenerator((context1) -> new MyWaterGenerator())
                                //指定eventTime的定义
                        .withTimestampAssigner((context1) -> new MyTimestampAssigner()))
                .keyBy(tuple -> tuple.f0)
                //每隔5秒,统计最近10秒的数据
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                //.sum(1);
                .process(new MySumProcessFunction());

        //4、输出
        result.print();

        //5、启动执行
        executionEnvironment.execute("TimeWindow06_ByWaterMark01");
    }

    //注意不要导入错包了。这个ProcessWindowFunction 是java包下面的,不是scala包下面的,scala包下面也有这个。注意。
    static class MySumProcessFunction extends ProcessWindowFunction<Tuple2<String, Long>,Tuple2<String, Long>,
            String, TimeWindow> {

        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");

        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Long>> out) throws Exception {

            System.out.println("==========触发窗口的分界线=========");

            System.out.println("当前process系统时间: " + df.format(System.currentTimeMillis()));
            System.out.println("当前窗口处理时间: " + df.format(context.currentProcessingTime()));

            System.out.println("当前窗口开始时间: " + df.format(context.window().getStart()));
            System.out.println("当前窗口结束时间: " + df.format(context.window().getEnd()));

            Long count = 0L;
            for (Tuple2<String, Long> element : elements) {
                count++;
            }
            out.collect(Tuple2.of(key,count));
        }
    }

    /**
     * 自定义数据源
     */
    static class MySource implements SourceFunction<String> {

        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");

        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            String currentTime = String.valueOf(System.currentTimeMillis());

            //下面这个判断的操作是为了保证是 10 s 的倍数
            while (Integer.valueOf(currentTime.substring(currentTime.length() - 4)) > 100) {
                currentTime = String.valueOf(System.currentTimeMillis());
                continue;
            }

            System.out.println("当前时间:" + df.format(System.currentTimeMillis()));

            //开始之后第13秒的时候放进去两个单词数据
            TimeUnit.SECONDS.sleep(13);
            String log = "flink," + System.currentTimeMillis();
            String log1 = log;
            ctx.collect(log);

            //开始之后第13+3秒的时候放进去一个单词数据
            TimeUnit.SECONDS.sleep(3);
            ctx.collect("flink," + System.currentTimeMillis());

            //数据出现了延迟,本来这条数据是应该第13秒的时候处理的,现在拖到了第19秒才处理输出。
            TimeUnit.SECONDS.sleep(3);
            ctx.collect(log1);

            TimeUnit.SECONDS.sleep(3000);
        }

        @Override
        public void cancel() {

        }
    }

    /**
     * 指定时间字段
     *
     * WatermarkGenerator: watermark 生成器。 一个接口。
     * 用的时候自己写个实现这个接口就行了。
     */
    static class MyWaterGenerator implements WatermarkGenerator<Tuple2<String,Long>>, Serializable {
        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");
        //每次接受到一条数据,其实就执行了一次处理。
        @Override
        public void onEvent(Tuple2<String, Long> event, long eventTimestamp, WatermarkOutput output) {
            System.out.println("eventTimestamp: " + eventTimestamp);
            System.out.println("eventTimestamp: " + df.format(eventTimestamp));
        }

        /**
         * 定期发送 watermark
         * @param output
         */
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            //在这个里面指定延迟5秒钟处理。
            output.emitWatermark(new Watermark(System.currentTimeMillis() - 5000));
        }
    }

    /**
     * 指定eventTime的字段
     */
    static class MyTimestampAssigner implements TimestampAssigner<Tuple2<String,Long>> {
        @Override
        public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
            return element.f1;
        }
    }
}

5.3 图解

在这里插入图片描述

六、Flink Watermark 机制定义

​ Flink 使用 EventTime 的时候如何处理乱序数据?

​ 我们知道,流处理从事件产生,到流经 Source,再到 Operator,中间是有一个过程和时间的。虽然大部分情况下,流到 Operator 的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络延迟等原因,导致乱序的产生,特别是使用 kafka 的话,多个分区的数据无法保证有序。所以在进行 window 计算的时候,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发 window 去进行计算了。这个特别的机制,就是 Watermark ,Watermark 是用于处理乱序事件的。Watermark 可以翻译为水位线

	WaterMark 是一种度量 event Time 进度机制,watermark 作为数据流中的一部分,在 Stream 中流动,并携带 time stamp,一个 WaterMark(t) 表明在流中处理的 EventTime 已经到达了 t,那么在流中就不会再有 Event Time 小于 t 的时间产生了 。

有序的流的 Watermarks

在这里插入图片描述

无序的流的 Watermarks

在这里插入图片描述

多并行度的流的 Watermarks

在这里插入图片描述

七、深入理解 Flink Watermark

7.1 理论

需求:得到并打印每隔 3 秒钟统计相同的 key 的所有的事件(string),相当于就是单词计数,每 3s 统计一次

简单总结一下:每隔 3s 做一次单词统计,这是一个滚动窗口的计算需求。

背景:里面的数据可能就是乱序

解决方案:通过 Flink Window 和 Watermark 来解决

当前知识点的重点: 观测:window 是什么时候触发

在这里插入图片描述

7.2 代码

代码如下:

package com.aa.flinkjava.window;

import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.io.Serializable;
import java.util.ArrayList;

/**
 * @Author AA
 * @Project bigdatapre
 * @Package com.aa.flinkjava.window
 *
 *
 * 测试性数据:
 * 数据, eventTime
 * flink,1641756862000
 * flink,1641756866000
 * flink,1641756872000
 * flink,1641756873000
 * flink,1641756874000
 * flink,1641756876000
 * flink,1641756877000
 */
public class TimeWindow06_ByWaterMark02 {
    public static void main(String[] args) throws Exception {
        //1、获取环境对象
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(1);

        //2、输入源
        DataStreamSource<String> dataStreamSource = executionEnvironment.socketTextStream("hadoop12",9999);

        //3、逻辑处理
        SingleOutputStreamOperator<String> result = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Long>> out) throws Exception {
                String[] words = value.split(",");
                //给传输过来的字段进行拆分。  给数据和时间戳分开
                out.collect(Tuple2.of(words[0],Long.valueOf(words[1])));
            }
        })
                // 基于eventTime 和water 去实现乱序数据的处理
                .assignTimestampsAndWatermarks(
                        //指定watermark的规则
                        WatermarkStrategy.forGenerator((context1) -> new MyWaterGenerator())
                                //指定eventTime的定义
                                .withTimestampAssigner((context1) -> new MyTimestampAssigner()))
                .keyBy(tuple -> tuple.f0)
                //每隔5秒,统计最近10秒的数据
                //注意下面是 SlidingEventTimeWindows  。不是  SlidingProcessingTimeWindows 。
                .window(TumblingEventTimeWindows.of(Time.seconds(3)))
                //.sum(1);
                .process(new MySumProcessFunction());

        //4、输出
        result.print();

        //5、启动执行
        executionEnvironment.execute("TimeWindow06_ByWaterMark02");
    }

    //注意不要导入错包了。这个ProcessWindowFunction 是java包下面的,不是scala包下面的,scala包下面也有这个。注意。
    static class MySumProcessFunction extends ProcessWindowFunction<Tuple2<String, Long>,String,
            String, TimeWindow> {

        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");

        @Override
        public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {

            System.out.println("==========触发窗口的分界线=========");

            //System.out.println("当前process系统时间: " + df.format(System.currentTimeMillis()));
            System.out.println("处理时间: " + df.format(context.currentProcessingTime()));

            System.out.println("当前窗口开始时间: " + df.format(context.window().getStart()));

            ArrayList<String> list = new ArrayList<>();
            for (Tuple2<String, Long> element : elements) {
                list.add(element.toString() + "|" + df.format(element.f1));
            }
            out.collect(list.toString());
            System.out.println("当前窗口结束时间: " + df.format(context.window().getEnd()));
        }
    }


    /**
     * 指定时间字段
     *
     * WatermarkGenerator: watermark 生成器。 一个接口。
     * 用的时候自己写个实现这个接口就行了。
     */
    static class MyWaterGenerator implements WatermarkGenerator<Tuple2<String,Long>>, Serializable {
        FastDateFormat df = FastDateFormat.getInstance("HH:mm:ss");

        //当前的窗口里面的最大的事件时间
        private long currentMaxEventTime = 0L;

        //设置最大的允许乱序的时间 , 假如 是 10秒  。
        // 延迟时间
        private long maxOutOfOrderTime = 10000;

        //每次接受到一条数据,其实就执行了一次处理。
        @Override
        public void onEvent(Tuple2<String, Long> event, long eventTimestamp, WatermarkOutput output) {
            //更新记录窗口中的最大的 EventTime
            long currentElementEventTime = event.f1;
            currentMaxEventTime = Math.max(currentMaxEventTime,currentElementEventTime);

            System.out.println("event = " + event
                    // Event Time  事件时间
                    + " | " + df.format(event.f1)
                    // Max Event Time  最大事件时间
                    + " | " + df.format(currentMaxEventTime)
                    // Current Watermark  watermark时间
                    + " | " + df.format(currentMaxEventTime - maxOutOfOrderTime));
        }

        /**
         * 定期发送 watermark
         * @param output
         */
        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            //在这个里面指定延迟 maxOutOfOrderTime 秒钟处理。
            output.emitWatermark(new Watermark(currentMaxEventTime - maxOutOfOrderTime));
        }
    }

    /**
     * 指定eventTime的字段
     */
    static class MyTimestampAssigner implements TimestampAssigner<Tuple2<String,Long>> {
        @Override
        public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
            return element.f1;
        }
    }
}

执行结果日志:

event = (flink,1641756862000) | 03:34:22 | 03:34:22 | 03:34:12
event = (flink,1641756866000) | 03:34:26 | 03:34:26 | 03:34:16
event = (flink,1641756872000) | 03:34:32 | 03:34:32 | 03:34:22
event = (flink,1641756873000) | 03:34:33 | 03:34:33 | 03:34:23
event = (flink,1641756874000) | 03:34:34 | 03:34:34 | 03:34:24
==========触发窗口的分界线=========
处理时间: 17:31:46
当前窗口开始时间: 03:34:21
[(flink,1641756862000)|03:34:22]
当前窗口结束时间: 03:34:24

7.3 计算 window 的触发时间模拟

keyEvent TimecurrentMaxTimestampcurrentWatermark
00119:34:2219:34:2219:34:12
keyEvent TimecurrentMaxTimestampcurrentWatermark
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
keyEvent TimecurrentMaxTimestampcurrentWatermark
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
keyEvent TimecurrentMaxTimestampcurrentWatermark
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
00119:34:3319:34:3319:34:23
keyEvent TimecurrentMaxTimestampcurrentWatermarkwindow_start_timewindow_end_time
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
00119:34:3319:34:3319:34:23
00119:34:3419:34:3419:34:24[19:34:2119:34:24)
keyEvent TimecurrentMaxTimestampcurrentWatermarkwindow_start_timewindow_end_time
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
00119:34:3319:34:3319:34:23
00119:34:3419:34:3419:34:24[19:34:2119:34:24)
00119:34:3619:34:3619:34:26
keyEvent TimecurrentMaxTimestampcurrentWatermarkwindow_start_timewindow_end_time
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
00119:34:3319:34:3319:34:23
00119:34:3419:34:3419:34:24[19:34:2119:34:24)
00119:34:3619:34:3619:34:26
00119:34:3719:34:3719:34:27[19:34:2419:34:27)

总结:Flink Window 触发的时间:

1、watermark 时间 >= window_end_time
2、在 [window_start_time, window_end_time) 区间中有数据存在,注意是左闭右开的区间,而且是以 event time 来计算的

7.4 WaterMark+Window 处理乱序时间

输入数据如下:

之前的数据:

flink,1641756862000
flink,1641756866000
flink,1641756872000
flink,1641756873000
flink,1641756874000
flink,1641756876000
flink,1641756877000

下面是新补的乱序数据:

flink,1641756879000
flink,1641756871000
flink,1641756883000

下面是模拟的过程

keyEvent TimecurrentMaxTimestampcurrentWatermarkwindow_start_timewindow_end_time
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
00119:34:3319:34:3319:34:23
00119:34:3419:34:3419:34:24[19:34:2119:34:24)
00119:34:3619:34:3619:34:26
00119:34:3719:34:3719:34:27[19:34:2419:34:27)
00119:34:3919:34:3919:34:29
00119:34:3119:34:3919:34:29
keyEvent TimecurrentMaxTimestampcurrentWatermarkwindow_start_timewindow_end_time
00119:34:2219:34:2219:34:12
00119:34:2619:34:2619:34:16
00119:34:3219:34:3219:34:22
00119:34:3319:34:3319:34:23
00119:34:3419:34:3419:34:24[19:34:2119:34:24)
00119:34:3619:34:3619:34:26
00119:34:3719:34:3719:34:27[19:34:2419:34:27)
00119:34:3919:34:3919:34:29
00119:34:3119:34:3919:34:29
00119:34:4319:34:4319:34:33[19:34:3019:34:33)

八、Flink 处理太过延迟数据

下图中黄颜色的数据 10 就属于迟到太多的数据,该数据本该在 W(11) 之前执行计算

在这里插入图片描述

一般有三种处理方式:

1、延迟太多的数据,直接丢弃,这个方式是 Flink 的默认方式
2、allowedLateness 指定允许数据延迟的时间(不推荐使用)
3、sideOutputLateData 收集迟到的数据,这是大多数企业里面使用的情况,推荐使用

8.1 Flink 丢弃延迟太多的数据

默认的方式。

根据代码执行结果可知,当数据延迟太多,就会直接丢弃。

8.2 Flink 指定允许再次迟到的时间

代码

总结:

当我们设置允许迟到 2 秒的事件,第一次 window 触发的条件是 watermark >= window_end_time
第二次(或者多次)触发的条件是 watermark < window_end_time + allowedLateness

8.3 Flink 收集迟到的数据单独处理

代码

九、Flink 多并行度 Watermark

一个 window 可能会接受到多个 waterMark,我们以最小的为准。

在这里插入图片描述

代码后面放到仓库中



声明:
        文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除。感谢。转载请注明出处,感谢。


By luoyepiaoxue2014

B站: https://space.bilibili.com/1523287361 点击打开链接
微博地址: http://weibo.com/luoyepiaoxue2014 点击打开链接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/67336.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[附源码]Python计算机毕业设计Django医疗纠纷处理系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

[附源码]Python计算机毕业设计Django校园一卡通服务平台

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

HarmonyOS/OpenHarmony应用开发-FA卡片开发体验

卡片概述 卡片是一种界面展示形式&#xff0c;可以将应用的重要信息或操作前置到卡片&#xff0c;以达到服务直达&#xff0c;减少体验层级的目的。 卡片常用于嵌入到其他应用&#xff08;当前只支持系统应用&#xff09;中作为其界面的一部分显示&#xff0c;并支持拉起页面…

[附源码]Python计算机毕业设计Django校园疫情管理系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

优品汇系统开发机制介绍

优品汇系统通过通过消费增值模式&#xff0c;促进商品流通&#xff0c;打造中g最大的供应链。作为对政策的回应&#xff0c;绿点刺激实体经济。前期通过科学合理的业务体系&#xff0c;将大部分利润分配给客户和朋友&#xff0c;从而快速创造人气和粉丝数据。中期将逐步完善产品…

计算机网络学习笔记(Ⅲ):数据链路层

目录 1 数据链路层概述 1.1 基本概念 1.2 主要功能 2 封装成帧和透明传输 2.1 封装成帧 2.2 透明传输 1.字符计数法 2.字符填充法 3.零比特填充法 4.违规编码法 3 差错控制 3.1 差错 3.1 检错编码 1.奇偶校验码 2.CRC循环冗余码 3.2 纠错编码 1.确定校验码位数…

基于逆滤波算法的无约束图像超分辨重构研究-附Matlab代码

⭕⭕ 目 录 ⭕⭕✳️ 一、引言✳️ 二、逆滤波复原理论✳️ 三、实验验证✳️ 四、Matlab程序获取与验证✳️ 一、引言 图像复原( Image Restoration)&#xff0c;也称图像恢复&#xff0c;是图像处理的一个重要方面。其目的就是尽可能地减少或去除在获取数字图像过程中发生的…

redis高级

redis持久化的两种方式&#xff1a;&#xff08;重点&#xff09; RDB: 全量打包-----------将内存中的所有数据存储在磁盘 执行一个bgsave ----1. 关机redis 2. 缺点&#xff1a;1.大量的数据重复打包和覆盖耗费时间和性能 2.配置save 不能把所有情况考虑在内&#xff0c;red…

微信公众号小程序怎么做?【公众号小程序搭建】

现在我们使用小程序的频率非常高&#xff0c;而且小程序也给我们带来很多便捷的体验&#xff0c;不少企业公司商家都有自己的微信公众号小程序。那么微信公众号小程序怎么做呢&#xff0c;下面跟大家简单说说。 1、注册小程序账号 做微信公众号小程序之前要有小程序账号&…

快速上手Django(七) -Django之登录cookie和session

文章目录快速上手Django(七) -Django之登录cookie和session一、cookie、session基础cookiesession二、Django SessionDjango启用SessionDjango Session存储方式三、Django中自定义用户模型需求背景自定义用户模型整体实现思路自定义User模型示例代码【非必须】自定义一个管理器…

Kafka 设计原理

文章目录1、Kafka 使用场景2、Kafka 架构2.1、工作流程2.2、副本机制2.3、生产者2.3.1、生产方式2.3.2、分区策略Round-robinRandomnessKey-ordering2.4、消费者2.4.1、消息队列模型2.4.2、消费方式2.4.3、分区策略RangeRoundRobin2.5、消息可靠性1、Kafka 使用场景 Kafka 是采…

图书馆座位预约管理系统毕业设计,图书馆座位管理系统设计与实现,图书馆座位预约系统毕业论文毕设作品参考

项目背景和意义 目的&#xff1a;本课题主要目标是设计并能够实现一个基于web网页的教室图书馆座位预约系统&#xff0c;整个网站项目使用了B/S架构&#xff0c;基于python的Django框架下开发&#xff1b;管理员通过后台录入信息、管理信息&#xff0c;设置网站信息&#xff0c…

CUDA入门和网络加速学习(一)

0. 简介 最近作者希望系统性的去学习一下CUDA加速的相关知识&#xff0c;正好看到深蓝学院有这一门课程。所以这里作者以此课程来作为主线来进行记录分享&#xff0c;方便能给CUDA网络加速学习的萌新们去提供一定的帮助。 1. GPU与CPU区别 处理器指标一般主要分为两大类&…

异构体之间通信

一、WIFI通信 1、网络配置 IP地址 网络地址 主机地址 假设IP地址是192.168.100.1&#xff0c;这个地址中包含了很多含义。如下所示&#xff1a; 网络地址(相当于街道地址)&#xff1a;192.168.100.0 主机地址(相当于各户的门号)&#xff1a;0.0.0.1 IP地址(相当于住户地址…

【Java进阶篇】第七章 多线程

文章目录一、多线程概述1、进程与线程2、进程与线程的关系二、多线程并发的实现1、线程的实现方式一2、线程的实现方式二三、线程的生命周期1、线程的五个生命周期2、常用方法3、线程的sleep4、终止线程的睡眠状态5、强行终止线程的执行6、合理终止一个线程的执行四、线程的调度…

apache html调用bash脚本案例

首先安装apache服务,采用yum的方式即可&#xff0c;因为用到的都是apache的基本功能&#xff0c;不需要编译安装 yum -y install httpd 然后准备html页面&#xff0c;这个页面其实就是调用bash脚本的页面&#xff0c;提供页面操作然后调用服务器上的脚步文件 网页布局建议用…

【嵌入式UI框架:LVGL】使用恩智浦GUI设计工具,像Qt一样开发MCU界面

LVGL是一个免费的开源嵌入式图形库&#xff0c;它提供创建嵌入式GUI所需的功能&#xff0c;具有易于使用的图形元素、精美的视觉效果和低内存占用。完整的图形框架包括供您在创建GUI时所用的各种小部件&#xff0c;并支持更高级的功能&#xff0c;例如动画和抗锯齿。 一、工具&…

springcloud入门

微服务架构介绍 微服务架构&#xff0c; 简单的说就是将单体应用进一步拆分&#xff0c;拆分成更小的服务&#xff0c;每个服务都是一个可以独 立运行的项目。 微服务架构的常见问题 一旦采用微服务系统架构&#xff0c;就势必会遇到这样几个问题&#xff1a; 这么多小服务…

MYSQL——毫秒值和日期类型数据的转换,DATE_SUB的用法

MYSQL——毫秒值和日期类型数据的转换&#xff0c;DATE_SUB的用法一、毫秒值转换成日期数据类型二、日期数据类型转换成毫秒值三、DATE_SUB的用法一、毫秒值转换成日期数据类型 语法&#xff1a;FROM_UNIXTIME(毫秒值字段,‘%Y-%m-%d %h:%i:%s’) 举例&#xff1a; select id…

spring-boot-starter-aop及其使用场景说明

如今&#xff0c;AOP&#xff08;Aspect Oriented Programming&#xff09;已经不是什么崭新的概念了&#xff0c;在经历了代码生成、动态代理、字节码增强甚至静态编译等不同时代的洗礼之后&#xff0c;Java 平台上的 AOP 方案基本上已经以 SpringAOP 结合 AspectJ 的方式稳固…