Flink 中双流 Join 的深度解析与实战

news2024/12/26 21:57:17

目录

一、Join 算子

一)语义与特性

二)通用用法

三)不同窗口类型表现

滚动窗口 Join

滑动窗口 Join

二、CoGroup 算子

一)功能特点

二)通用用法与连接类型实现

内连接(InnerJoin)

左连接(LeftJoin)

右连接(RightJoin)

三、IntervalJoin 算子

一)独特之处

二)使用示例

四、对比

五、总结


        在大数据实时处理领域,Apache Flink 凭借其强大的流处理能力备受青睐。当面临多流数据关联分析场景时,双流 Join 操作至关重要。Flink DataStream API 贴心地提供了joincoGroupintervalJoin三个算子助力我们达成双流 Join,接下来将深入探究它们的原理、使用方式及差异。

一、Join 算子

一)语义与特性

        join算子秉持 “Window join” 语义,依指定字段与(滚动 / 滑动 / 会话)窗口开展内连接(InnerJoin)操作,仅会关联有相同Key且处于同一窗口内两条流的元素,同时支持处理时间和事件时间两种时间特征。

二)通用用法

stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)

        Join 语义类似与离线 Hive 的 InnnerJoin (内连接),这意味着如果一个流中的元素在另一个流中没有相对应的元素,则不会输出该元素。

下面我们看一下 Join 算子在不同类型窗口上的具体表现。

三)不同窗口类型表现

滚动窗口 Join

        

        当在滚动窗口上进行 Join 时,所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。

        如上图所示,我们定义了一个大小为 2 秒的滚动窗口,最终产生 [0,1],[2,3],… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是,在滚动窗口 [6,7] 中,由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素,因此该窗口不会输出任何内容。

下面我们一起看一下如何实现上图所示的滚动窗口 Join:

可以通过两个socket流,将数据合并为一个三元组,key,value1,value2

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;


public class _ShuangLiuJoinDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 并行度不为1 ,效果很难出来,因为本地的并行度是16,只有16个并行度都触发才能看到效果
        env.setParallelism(1);

        //2. source-加载数据   key,0,2021-03-26 12:09:00
        DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888)
                .map(new MapFunction<String, Tuple3<String, Integer, String>>() {
                    @Override
                    public Tuple3<String, Integer, String> map(String line) throws Exception {
                        String[] arr = line.split(",");
                        System.out.println("绿色:"+ Arrays.toString(arr));
                        return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);
                    }
                })
                // 因为用到了EventTime 所以势必用到水印,否则报错
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                        Long timeStamp = 0L;

                                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                        Date date = null;
                                        try {
                                            date = simpleDateFormat.parse(element.f2);
                                        } catch (ParseException e) {
                                            throw new RuntimeException(e);
                                        }
                                        timeStamp = date.getTime();
                                        System.out.println("绿色的时间:"+timeStamp);
                                        System.out.println(element.f0);
                                        return timeStamp;
                                    }
                                })
                );
                ;
        // 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00
        DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777)
                .map(new MapFunction<String, Tuple3<String,Integer,String>>() {
                    @Override
                    public Tuple3<String, Integer, String> map(String line) throws Exception {
                        String[] arr = line.split(",");
                        System.out.println("橘色:"+ Arrays.toString(arr));

                        return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);
                    }
                })
                // 因为用到了EventTime 所以势必用到水印,否则报错
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                        Long timeStamp = 0L;

                                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                        Date date = null;
                                        try {
                                            date = simpleDateFormat.parse(element.f2);
                                        } catch (ParseException e) {
                                            throw new RuntimeException(e);
                                        }
                                        timeStamp = date.getTime();
                                        System.out.println("橘色的时间:"+timeStamp);

                                        return timeStamp;
                                    }
                                })
                );

        //3. transformation-数据处理转换
        DataStream resultStream = greenStream.join(orangeStream)
                .where(tup3 -> tup3.f0)
                .equalTo(tup3 -> tup3.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {

                    @Override
                    public Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> t1, Tuple3<String, Integer, String> t2) throws Exception {
                        System.out.println(t1.f2);
                        System.out.println(t2.f2);
                        return Tuple3.of(t1.f0, t1.f1, t2.f1);
                    }
                });

        //4. sink-数据输出
        resultStream.print();

        //5. execute-执行
        env.execute();
    }
}

1) 要想测试这个效果,需要将并行度设置为1

2)窗口中数据的打印是需要触发的,没有触发的数据,窗口内是不会进行计算的,所以记得输入触发的数据。

假如使用了EventTime 作为时间语义,不管是窗口开始和结束时间还是触发的条件,都跟系统时间没有关系,而跟输入的数据有关系,举例:

假如你的第一条数据是:key,0,2021-03-26 12:09:01 窗口的大小是5s,水印是3秒 ,窗口的开始时间为:

2021-03-26 12:09:00 结束时间是 2021-03-26 12:09:05 ,触发时间是2021-03-26 12:09:08

为什么呢? 水印时间 >= 结束时间

水印时间是:2021-03-26 12:09:08 - 3 = 2021-03-26 12:09:05 >=2021-03-26 12:09:05

        如上代码所示为绿色流和橘色流指定 BoundedOutOfOrdernessWatermarks Watermark 策略,设置100毫秒的最大可容忍的延迟时间,同时也会为流分配事件时间戳。假设输入流为 格式,两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11

橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11

滑动窗口 Join

        

        当在滑动窗口上进行 Join 时,所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 进行处理。

        如上图所示,我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是,一个元素可能会落在不同的窗口中,因此会在不同窗口中发生关联,例如,绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素,则不会输出该元素。

下面我们一起看一下如何实现上图所示的滑动窗口 Join:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;


public class Demo02Join {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 将并行度设置为1,否则很难看到现象
        env.setParallelism(1);
        // 创建一个绿色的流
        DataStreamSource<String> greenSource = env.socketTextStream("localhost", 8899);
        // key,0,2021-03-26 12:09:00 将它变为三元组
        SingleOutputStreamOperator<Tuple3<String, Integer, String>> greenDataStream = greenSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
            @Override
            public Tuple3<String, Integer, String> map(String value) throws Exception {
                String[] arr = value.split(",");
                return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        // 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                // 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。
                                String time = element.f2; //2021-03-26 12:09:00
                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                try {
                                    Date date = sdf.parse(time);
                                    return date.getTime();
                                } catch (ParseException e) {
                                    throw new RuntimeException(e);
                                }
                            }
                        })
        );
        // 创建一个橘色的流
        DataStreamSource<String> orangeSource = env.socketTextStream("localhost", 9988);
        // key,0,2021-03-26 12:09:00 将它变为三元组
        SingleOutputStreamOperator<Tuple3<String, Integer, String>> orangeDataStream = orangeSource.map(new MapFunction<String, Tuple3<String, Integer, String>>() {
            @Override
            public Tuple3<String, Integer, String> map(String value) throws Exception {
                String[] arr = value.split(",");
                return new Tuple3<>(arr[0], Integer.valueOf(arr[1]), arr[2]);
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        // 为什么这个地方的代码比之前要长,原因是以前获取的数据都是long类型,并且都是毫秒值
                        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                // 指定你的数据中哪一个是时间戳,并且时间戳必须是long类型,必须是毫秒为单位的。
                                String time = element.f2; //2021-03-26 12:09:00
                                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                try {
                                    Date date = sdf.parse(time);
                                    return date.getTime();
                                } catch (ParseException e) {
                                    throw new RuntimeException(e);
                                }
                            }
                        })
        );

        //2. source-加载数据
        //3. transformation-数据处理转换
        DataStream<Tuple3<String, Integer, Integer>> resultStream = greenDataStream.join(orangeDataStream)
                .where(tuple3 -> tuple3.f0)
                .equalTo(tuple3 -> tuple3.f0)
                .window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(1)))
                .apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {
                    @Override
                    public Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> first, Tuple3<String, Integer, String> second) throws Exception {
                        return Tuple3.of(first.f0, first.f1, second.f1);
                    }
                });
        //4. sink-数据输出
        greenDataStream.print("绿色的流:");
        orangeDataStream.print("橘色的流:");

        resultStream.print("最终的结果:");


        //5. execute-执行
        env.execute();
    }
}

假设输入流为 格式,两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,9,2021-03-26 12:09:09

橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,9,2021-03-26 12:09:09

二、CoGroup 算子

一)功能特点

        CoGroup 算子是将两条数据流按照 Key 进行分组,然后将相同 Key 的数据进行处理。要实现 CoGroup 功能需要为两个输入流分别指定 KeySelector 和 WindowAssigner。它的调用方式类似于 Join 算子,但是 CoGroupFunction 比 JoinFunction 更加灵活,可以按照用户指定的逻辑匹配左流或者右流的数据,基于此我们可以实现内连接(InnerJoin)、左连接(LeftJoin)以及右连接(RightJoin)。

        目前,这些分组中的数据是在内存中保存的,因此需要确保保存的数据量不能太大,否则,JVM 可能会崩溃

二)通用用法与连接类型实现

通用调用形式如下:

stream.coGroup(otherStream)
      .where(<KeySelector>)
      .equalTo(<KeySelector>)
      .window(<WindowAssigner>)
      .apply(<CoGroupFunction>);

        下面我们看一下如何使用 CoGroup 算子实现内连接(InnerJoin)、左连接(LeftJoin)以及右连接(RightJoin)。

最大的优势是可以实现内连接,左连接,右连接,但是缺点是内存压力大,而上面的join只能实现内连接。

CoGroup 从写法上,是coGroup 和 join的区别,而且apply 里面的函数也是不一样的,一定要注意观察。

内连接(InnerJoin)

下面我们看一下如何使用 CoGroup 实现内连接:

如上图所示,我们定义了一个大小为 2 秒的滚动窗口。InnerJoin 只有在两个流对应窗口中都存在元素时,才会输出。

我们以滚动窗口为例来实现 InnerJoin

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;

public class _ShuangLiuCoGroupDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        env.setParallelism(1);

        //2. source-加载数据   key,0,2021-03-26 12:09:00
        DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888)
                .map(new MapFunction<String, Tuple3<String, Integer, String>>() {
                    @Override
                    public Tuple3<String, Integer, String> map(String line) throws Exception {
                        String[] arr = line.split(",");
                        System.out.println("绿色:"+ Arrays.toString(arr));
                        return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);
                    }
                })
                // 因为用到了EventTime 所以势必用到水印,否则报错
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                        Long timeStamp = 0L;

                                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                        Date date = null;
                                        try {
                                            date = simpleDateFormat.parse(element.f2);
                                        } catch (ParseException e) {
                                            throw new RuntimeException(e);
                                        }
                                        timeStamp = date.getTime();
                                        System.out.println("绿色的时间:"+timeStamp);
                                        System.out.println(element.f0);
                                        return timeStamp;
                                    }
                                })
                );
                ;
        // 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00
        DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777)
                .map(new MapFunction<String, Tuple3<String,Integer,String>>() {
                    @Override
                    public Tuple3<String, Integer, String> map(String line) throws Exception {
                        String[] arr = line.split(",");
                        System.out.println("橘色:"+ Arrays.toString(arr));

                        return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);
                    }
                })
                // 因为用到了EventTime 所以势必用到水印,否则报错
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                        Long timeStamp = 0L;

                                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                        Date date = null;
                                        try {
                                            date = simpleDateFormat.parse(element.f2);
                                        } catch (ParseException e) {
                                            throw new RuntimeException(e);
                                        }
                                        timeStamp = date.getTime();
                                        System.out.println("橘色的时间:"+timeStamp);

                                        return timeStamp;
                                    }
                                })
                );


        //3. transformation-数据处理转换
        CoGroupedStreams<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>> coGroup = greenStream.coGroup(orangeStream);
        coGroup.where(tup3 -> tup3.f0)
                .equalTo(tup3 -> tup3.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {
                    @Override
                    public void coGroup(Iterable<Tuple3<String, Integer, String>> i1, Iterable<Tuple3<String, Integer, String>> i2, Collector<String> collector) throws Exception {

                        // 凭借这两个迭代器实现内连接,左右连接
                        // 内连接  外面这个循环和里面的循环必须都有数据才会进行输出,典型的内连接
                        for (Tuple3<String, Integer, String> t1 : i1) {
                            for (Tuple3<String, Integer, String> t2 : i2) {
                                collector.collect("key="+t1.f0+",t1.value="+t1.f1+",t2.value="+t2.f1);
                            }
                        }

                    }
                }).print();


        //5. execute-执行
        env.execute();
    }

}

        如上代码所示,我们实现了 CoGroupFunction 接口,重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable),本示例中绿色流为 greenIterable,橘色流为 orangeIterable,如果要实现 InnerJoin ,只需要两个迭代器中的元素两两组合即可。两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11

橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11


左连接(LeftJoin)

下面我们看一下如何使用 CoGroup 实现左连接:

如上图所示,我们定义了一个大小为 2 秒的滚动窗口。LeftJoin 只要绿色流窗口中有元素时,就会输出。即使在橘色流对应窗口中没有相对应的元素。

我们以滚动窗口为例来实现 LeftJoin

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.CoGroupedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;


public class _ShuangLiuCoGroupLeftDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //2. source-加载数据   key,0,2021-03-26 12:09:00
        DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888)
                .map(new MapFunction<String, Tuple3<String, Integer, String>>() {
                    @Override
                    public Tuple3<String, Integer, String> map(String line) throws Exception {
                        String[] arr = line.split(",");
                        System.out.println("绿色:"+ Arrays.toString(arr));
                        return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);
                    }
                })
                // 因为用到了EventTime 所以势必用到水印,否则报错
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                        Long timeStamp = 0L;

                                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                        Date date = null;
                                        try {
                                            date = simpleDateFormat.parse(element.f2);
                                        } catch (ParseException e) {
                                            throw new RuntimeException(e);
                                        }
                                        timeStamp = date.getTime();
                                        System.out.println("绿色的时间:"+timeStamp);
                                        System.out.println(element.f0);
                                        return timeStamp;
                                    }
                                })
                );
                ;
        // 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00
        DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777)
                .map(new MapFunction<String, Tuple3<String,Integer,String>>() {
                    @Override
                    public Tuple3<String, Integer, String> map(String line) throws Exception {
                        String[] arr = line.split(",");
                        System.out.println("橘色:"+ Arrays.toString(arr));

                        return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);
                    }
                })
                // 因为用到了EventTime 所以势必用到水印,否则报错
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                        Long timeStamp = 0L;

                                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                        Date date = null;
                                        try {
                                            date = simpleDateFormat.parse(element.f2);
                                        } catch (ParseException e) {
                                            throw new RuntimeException(e);
                                        }
                                        timeStamp = date.getTime();
                                        System.out.println("橘色的时间:"+timeStamp);

                                        return timeStamp;
                                    }
                                })
                );


        //3. transformation-数据处理转换
        CoGroupedStreams<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>> coGroup = greenStream.coGroup(orangeStream);
        coGroup.where(tup3 -> tup3.f0)
                .equalTo(tup3 -> tup3.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {
                    @Override
                    public void coGroup(Iterable<Tuple3<String, Integer, String>> i1, Iterable<Tuple3<String, Integer, String>> i2, Collector<String> collector) throws Exception {

                        // 凭借这两个迭代器实现内连接,左右连接
                        // 内连接

                        for (Tuple3<String, Integer, String> t1 : i1) {
                            boolean noEelement = true;

                            for (Tuple3<String, Integer, String> t2 : i2) {
                                noEelement = false;
                                collector.collect("key="+t1.f0+",t1.value="+t1.f1+",t2.value="+t2.f1);
                            }
                            if(noEelement){
                                collector.collect("key="+t1.f0+",t1.value="+t1.f1+",t2.value="+null);
                            }
                        }


                    }
                }).print();


        //5. execute-执行
        env.execute();
    }

}

        如上代码所示,我们实现了 CoGroupFunction 接口,重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable),本示例中绿色流为 green Iterable,橘色流为 orange Iterable,如果要实现 LeftJoin ,需要保证 orange Iterable 中没有元素,green Iterable 中的元素也能输出。因此我们定义了一个 noElements 变量来判断 orange Iterable 是否有元素,如果 orange Iterable 中没有元素,单独输出 greenIterable 中的元素即可。

右连接(RightJoin)

下面我们看一下如何使用 CoGroup 实现右连接:

        如上图所示,我们定义了一个大小为 2 秒的滚动窗口。LeftJoin 只要橘色流窗口中有元素时,就会输出。即使在绿色流对应窗口中没有相对应的元素。

我们以滚动窗口为例来实现 RightJoin

// Join流
CoGroupedStreams coGroupStream = greenStream.coGroup(orangeStream);
DataStream<String> result = coGroupStream
// 绿色流
.where(new KeySelector<Tuple3<String, String, String>, String>() {
    @Override
    public String getKey(Tuple3<String, String, String> tuple3) throws Exception {
        return tuple3.f0;
    }
})
// 橘色流
.equalTo(new KeySelector<Tuple3<String, String, String>, String>() {
    @Override
    public String getKey(Tuple3<String, String, String> tuple3) throws Exception {
        return tuple3.f0;
    }
})
// 滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.apply(new RightJoinFunction());

// 右连接
private static class RightJoinFunction implements CoGroupFunction<Tuple3<String, String, String>, Tuple3<String, String, String>, String> {
    @Override
    public void coGroup(Iterable<Tuple3<String, String, String>> greenIterable, Iterable<Tuple3<String, String, String>> orangeIterable, Collector<String> collector) throws Exception {
        for (Tuple3<String, String, String> orangeTuple : orangeIterable) {
            boolean noElements = true;
            for (Tuple3<String, String, String> greenTuple : greenIterable) {
                noElements = false;
                LOG.info("[Join流] Key : {}, Value: {}, EventTime: {}",
                         greenTuple.f0, greenTuple.f1 + ", " + orangeTuple.f1, greenTuple.f2 + ", " + orangeTuple.f2
                        );
                collector.collect(greenTuple.f1 + ", " + orangeTuple.f1);
            }
            if (noElements) {
                LOG.info("[Join流] Key : {}, Value: {}, EventTime: {}",
                         orangeTuple.f0, "null, " + orangeTuple.f1, "null, " + orangeTuple.f2
                        );
                collector.collect("null, " + orangeTuple.f2);
            }
        }
    }
}

        如上代码所示,我们实现了 CoGroupFunction 接口,重写 coGroup 方法。一个流中有相同 Key 并且位于同一窗口的元素都会保存在同一个迭代器(Iterable),本示例中绿色流为 greenIterable,橘色流为 orangeIterable,如果要实现 RightJoin,实现原理跟 LeftJoin 一样,需要保证 greenIterable 中没有元素,orangeIterable 中的元素也能输出。因此我们定义了一个 noElements 变量来判断 greenIterable 是否有元素,如果 greenIterable 中没有元素,单独输出 orangeIterable 中的元素即可。

三、IntervalJoin 算子

一)独特之处

        Interval Join 不同于 Join以及CoGroup 原因是 Join和CoGroup 他们是窗口Join ,必须给定窗口的 ,Interval Join不需要给窗口。Interval Join 必须先分组才能使用。

        Flink 中基于 DataStream 的 Join,只能实现在同一个窗口的两个数据流进行 Join,但是在实际中常常会存在数据乱序或者延时的情况,导致两个流的数据进度不一致,就会出现数据跨窗口的情况,那么数据就无法在同一个窗口内 Join。Flink 基于 KeyedStream 提供的 Interval Join 机制可以对两个keyedStream 进行 Join, 按照相同的 key 在一个相对数据时间的时间段内进行 Join。按照指定字段以及右流相对左流偏移的时间区间进行关联:

b.timestamp ∈ [a.timestamp + lowerBound, a.timestamp + upperBound]

或者

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound


        其中a和b分别是上图中绿色流和橘色流中的元素,并且有相同的 key。只需要保证 lowerBound 永远小于等于 upperBound 即可,均可以为正数或者负数。
        从上面可以看出绿色流可以晚到 lowerBound(lowerBound为负的话)时间,也可以早到 upperBound(upperBound为正的话)时间。也可以理解为橘色流中的每个元素可以和绿色流中指定区间的元素进行 Join。需要注意的是 Interval Join 当前仅支持事件时间(EventTime):

public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {
    if (timeBehaviour != TimeBehaviour.EventTime) {
        throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
    }
}

二)使用示例

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;

public class _ShuangLiuIntervalJoinDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 并行度不为1 ,效果很难出来,因为本地的并行度是16,只有16个并行度都触发才能看到效果
        env.setParallelism(1);

        //2. source-加载数据   key,0,2021-03-26 12:09:00
        DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888)
                .map(new MapFunction<String, Tuple3<String, Integer, String>>() {
                    @Override
                    public Tuple3<String, Integer, String> map(String line) throws Exception {
                        String[] arr = line.split(",");
                        System.out.println("绿色:"+ Arrays.toString(arr));
                        return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);
                    }
                })
                // 因为用到了EventTime 所以势必用到水印,否则报错
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                        Long timeStamp = 0L;

                                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                        Date date = null;
                                        try {
                                            date = simpleDateFormat.parse(element.f2);
                                        } catch (ParseException e) {
                                            throw new RuntimeException(e);
                                        }
                                        timeStamp = date.getTime();
                                        System.out.println("绿色的时间:"+timeStamp);
                                        System.out.println(element.f0);
                                        return timeStamp;
                                    }
                                })
                );
                ;
        // 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00
        DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777)
                .map(new MapFunction<String, Tuple3<String,Integer,String>>() {
                    @Override
                    public Tuple3<String, Integer, String> map(String line) throws Exception {
                        String[] arr = line.split(",");
                        System.out.println("橘色:"+ Arrays.toString(arr));

                        return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);
                    }
                })
                // 因为用到了EventTime 所以势必用到水印,否则报错
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                        Long timeStamp = 0L;

                                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                        Date date = null;
                                        try {
                                            date = simpleDateFormat.parse(element.f2);
                                        } catch (ParseException e) {
                                            throw new RuntimeException(e);
                                        }
                                        timeStamp = date.getTime();
                                        System.out.println("橘色的时间:"+timeStamp);

                                        return timeStamp;
                                    }
                                })
                );

        //3. transformation-数据处理转换
        DataStream resultStream = greenStream.keyBy(tup -> tup.f0).intervalJoin(orangeStream.keyBy(tup -> tup.f0))
                .between(Time.seconds(-2),Time.seconds(1))
                        .process(new ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>() {
                            @Override
                            public void processElement(Tuple3<String, Integer, String> left, Tuple3<String, Integer, String> right, ProcessJoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, String>.Context ctx, Collector<String> out) throws Exception {

                                out.collect("left中的key:"+left.f0+",value="+left.f1+",time="+left.f2+",right中的key:"+right.f0+",value="+right.f1+",time="+right.f2);
                            }
                        });

        //4. sink-数据输出
        resultStream.print();

        //5. execute-执行
        env.execute();
    }
}

        需要注意的是 Interval Join 当前仅支持事件时间(EventTime),所以需要为流指定事件时间戳(毫秒值)。

两条流输入元素如下所示:

绿色流:
c,0,2021-03-23 12:09:00
c,1,2021-03-23 12:09:01
c,6,2021-03-23 12:09:06
c,7,2021-03-23 12:09:07

橘色流:
c,0,2021-03-23 12:09:00
c,2,2021-03-23 12:09:02
c,3,2021-03-23 12:09:03
c,4,2021-03-23 12:09:04
c,5,2021-03-23 12:09:05
c,7,2021-03-23 12:09:07

四、对比

join算子

        专注窗口内的内连接,语法简洁,适用于确定窗口内精准匹配关联场景,像电商场景按固定时段统计同品类商品订单与库存关联。

coGroup算子

        内存消耗大,但连接灵活性高,应对需多种连接(内、左、右)复杂业务逻辑时优势凸显,比如统计用户行为与商品推荐关联,依用户有无历史行为选择不同连接策略。

intervalJoin算子

        突破窗口限制,应对数据乱序、延时致跨窗口问题,依据时间区间关联,常用于对时间先后有要求且窗口难界定的数据关联,如实时监控系统中事件与后续处理动作关联,事件可能延迟到达。

五、总结

        在 Flink 实时流处理双流 Join 实战中,依据数据特性、业务逻辑、时间要求等因素合理抉择算子,方能高效、精准达成数据分析与处理目标,深挖其原理与实践要点,为构建强大实时数据处理应用筑牢根基。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2254527.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenStack-Glance组件

Glance Glance使用磁盘格式和容器格式基础配置镜像转换 Glance 是 OpenStack 的镜像服务&#xff0c;负责存储、发现和管理虚拟机镜像。它允许用户创建和共享镜像&#xff0c;用于启动虚拟机实例。 Glance 的主要功能 &#xff08;1&#xff09;虚拟机镜像的管理 支持镜像的上…

基于神经网络的弹弹堂类游戏弹道快速预测

目录 一、 目的... 1 1.1 输入与输出.... 1 1.2 隐网络架构设计.... 1 1.3 激活函数与损失函数.... 1 二、 训练... 2 2.1 数据加载与预处理.... 2 2.2 训练过程.... 2 2.3 训练参数与设置.... 2 三、 测试与分析... 2 3.1 性能对比.... 2 3.2 训练过程差异.... 3 四、…

Linux入门攻坚——40、Linux集群系统入门-lvs(1)

Cluster&#xff0c;集群&#xff0c;为了解决某个特定问题将多台计算机组合起来形成的单个系统。 这个单个集群系统可以扩展&#xff0c;系统扩展的方式&#xff1a;scale up&#xff0c;向上扩展&#xff0c;更换更好的主机&#xff1b;scale out&#xff0c;向外扩展&…

威联通-001 手机相册备份

文章目录 前言1.Qfile Pro2.Qsync Pro总结 前言 威联通有两种数据备份手段&#xff1a;1.Qfile Pro和2.Qsync Pro&#xff0c;实践使用中存在一些区别&#xff0c;针对不同备份环境选择是不同。 1.Qfile Pro 用来备份制定目录内容的。 2.Qsync Pro 主要用来查看和操作文…

Docker单机网络:解锁本地开发环境的无限潜能

作者简介&#xff1a;我是团团儿&#xff0c;是一名专注于云计算领域的专业创作者&#xff0c;感谢大家的关注 座右铭&#xff1a; 云端筑梦&#xff0c;数据为翼&#xff0c;探索无限可能&#xff0c;引领云计算新纪元 个人主页&#xff1a;团儿.-CSDN博客 目录 前言&#…

【Linux操作系统】多线程控制(创建,等待,终止、分离)

目录 一、线程与轻量级进程的关系二、进程创建1.线程创建线程创建函数&#xff08;pthread&#xff09;查看和理解线程id主线程与其他线程之间的关系 三、线程等待&#xff08;回收&#xff09;四、线程退出线程退出情况线程退出方法 五、线程分离线程的优点线程的缺点 一、线程…

解决IDEA的easycode插件生成的mapper.xml文件字段之间逗号丢失

问题 easycode插件生成的mapper.xml文件字段之间逗号丢失&#xff0c;如图 解决办法 将easycode(在settings里面的othersettings)设置里面的Template的mapper.xml.vm和Global Config的mybatisSupport.vm的所有$velocityHasNext换成$foreach.hasNext Template的mapper.xml.vm(…

Android 实现中英文切换

在开发海外项目的时候&#xff0c;需要实现app内部的中英文切换功能&#xff0c;所有的英文都是内置的&#xff0c;整体思路为&#xff1a; 创建一个sp对象&#xff0c;存储当前系统的语言类型&#xff0c;然后在BaseActivity中对语言进行判断&#xff1b; //公共Activitypubl…

11月 | Apache DolphinScheduler月度进展总结

各位热爱 Apache DolphinScheduler 的小伙伴们&#xff0c;社区10月份月报更新啦&#xff01;这里将记录 DolphinScheduler 社区每月的重要更新&#xff0c;欢迎关注&#xff01; 月度Merge之星 感谢以下小伙伴11月份为 Apache DolphinScheduler 所做的精彩贡献&#xff08;排…

[软件开发幼稚指数评比]《软件方法》自测题解析010

第1章自测题 Part2 **9 [**单选题] 以下说法和其他三个最不类似的是: A)如果允许一次走两步&#xff0c;新手也能击败象棋大师 B)百米短跑比赛才10秒钟&#xff0c;不可能为每一秒做周密计划&#xff0c;凭感觉跑就是 C)即使是最好的足球队&#xff0c;也不能保证每…

【JavaWeb后端学习笔记】使用IDEA连接MySQL数据库

IDEA连接MySQL IDEA中集成了DataGrip&#xff0c;因此可以直接使用IDEA操作MySQL数据库。 1.创建一个新的空工程。点击右侧的数据库标志。 2.选择要连接的数据库。第一步&#xff1a;点击“”&#xff1b;第二步&#xff1a;点击 Data Source&#xff1b;第三步&#xff1a;选…

大模型分类2—按训练方式

版权声明 本文原创作者:谷哥的小弟作者博客地址:http://blog.csdn.net/lfdfhl根据训练方式,大模型可分为监督学习、无监督学习、自监督学习和强化学习大模型。 1. 监督学习大模型 1.1 定义与原理 监督学习大模型是一种机器学习范式,它依赖于标记数据集进行训练。这些数据…

鸿蒙特色实战2

服务卡片开发 创建服务卡片 创建一个新的工程后&#xff0c;可以通过如下方法进行创建服务卡片&#xff1a; 创建服务卡片包括如下两种方式&#xff1a; 选择模块&#xff08;如entry模块&#xff09;下的任意文件&#xff0c;单击菜单栏File > New > Service Widget创…

LCD1602液晶显示屏指令详解

文章目录 LCD1602液晶显示屏1.简介2. 液晶引脚说明3. 指令介绍3.1 清屏指令3.2 光标归位指令3.3 进入模式设置指令3.4 显示开关设置指令3.5 设定显示或光标移动方向指令3.6 功能设定指令3.7 设定CGRAM地址指令3.8 设定DDRAM地址指令3.9 读取忙或AC地址指令3.10 总图3.11 DDRAM …

Python毕业设计选题:基于大数据的旅游景区推荐系统_django

开发语言&#xff1a;Python框架&#xff1a;djangoPython版本&#xff1a;python3.7.7数据库&#xff1a;mysql 5.7数据库工具&#xff1a;Navicat11开发软件&#xff1a;PyCharm 系统展示 系统首页界面 用户注册界面 用户登录界面 景点信息界面 景点资讯界面 个人中心界面 …

引领素养教育行业,猿辅导素养课斩获“2024影响力教育品牌”奖项

近日&#xff0c;由教育界网、校长邦联合主办&#xff0c;鲸媒体、职教共创会协办的“第9届榜样教育年度盛典”评奖结果揭晓。据了解&#xff0c;此次评选共有近500家企业提交参评资料进行奖项角逐&#xff0c;历经教育界权威专家、资深教育从业者以及专业评审团队的多轮严格筛…

十七、监控与度量-Prometheus/Grafana/Actuator

文章目录 前言一、Spring Boot Actuator1. 简介2. 添加依赖2. 开启端点3. 暴露端点4. 总结 二、Prometheus1. 简介2. Prometheus客户端3. Prometheus服务端4. 总结 三、Grafana1. 简介2. Grafana安装3. Grafana配置 前言 系统监控‌ 在企业级的应用中&#xff0c;系统监控至关…

PHP语法学习(第六天)

&#x1f4a1;依照惯例&#xff0c;回顾一下昨天讲的内容 PHP语法学习(第五天)主要讲了PHP中的常量和运算符的运用。 &#x1f525; 想要学习更多PHP语法相关内容点击“PHP专栏” 今天给大家讲课的角色是&#x1f34d;菠萝吹雪&#xff0c;“我菠萝吹雪吹的不是雪&#xff0c;而…

关于遥感图像镶嵌后出现斑点情况的解决方案

把几张GF1的影像镶嵌在一起后&#xff0c;结果在Arcgis里出现了明显的斑点情况&#xff08;在ENVI里显示则不会出现&#xff09;&#xff0c;个人觉得可能是斑点噪声问题&#xff0c;遂用Arcgis的滤波工具进行滤波处理&#xff0c;但由于该工具本身没有直接设置对多波段处理方式…

【嵌套查询】.NET开源 ORM 框架 SqlSugar 系列

.NET开源 ORM 框架 SqlSugar 系列 【开篇】.NET开源 ORM 框架 SqlSugar 系列【入门必看】.NET开源 ORM 框架 SqlSugar 系列【实体配置】.NET开源 ORM 框架 SqlSugar 系列【Db First】.NET开源 ORM 框架 SqlSugar 系列【Code First】.NET开源 ORM 框架 SqlSugar 系列【数据事务…