DataStream API的Joining操作

news2024/11/23 20:32:41

目录

Window Join

Tumbling Window Join

Sliding Window Join

Session Window Join

Interval Join

Window CoGroup


 Window Join

        窗口连接(window join)将两个流的元素连接在一起,这两个流共享一个公共键,并且位于同一窗口。这些窗口可以通过使用窗口分配器来定义,并对来自两个流的元素进行计算。

        然后将来自两边的元素传递给用户定义的JoinFunction或FlatJoinFunction,用户可以在其中发出满足连接条件的结果。

        一般用法可以概括如下:

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>);

两个流stream和otherStream通过join连接在一起,where方法中的KeySelector指定stream流的键,equalTo方法中的KeySelector指定otherStream流的键,window方法为两个流分配公共窗口,最后apply方法通过传入JoinFunction实现join结果的具体逻辑。这非常类似如下SQL:

select

    a.id,a.age,b.name

from t_age as a join t_name as b on a.id = b.id
where a.time between 0 and 1 and b.time between 0 and 1

(1)stream.join(otherStream)等价于SQL中的t_age as a join t_name as b的部分,

(2).where(<KeySelector>).equalTo(<KeySelector>)等价于a.id = b.id的部分,

(3)apply(<JoinFunction>)等价于select a.id,a.age,b.name的部分,显示最终join的结果,

(4)window方法等价于a.time between 0 and 1 and b.time between 0 and 1。

        关于语义的一些注意事项:
(1)创建两个流的元素成对组合的行为类似于内部连接,这意味着如果一个流中的元素没有来自另一个流的相应元素要连接,则不会发出这些元素。
(2)那些连接后的元素有它们自己的时间戳,将是各自窗口中仍然存在的最大时间戳。例如,以[5,10)为边界的窗口将导致连接后的元素的时间戳为9。

       在下一节中,我们将使用一些示例场景概述不同类型的窗口连接的行为。

Tumbling Window Join

        在执行滚动窗口连接时,所有具有公共键和公共滚动窗口的元素都以成对组合的形式连接起来,并传递给JoinFunction或FlatJoinFunction。因为它的行为类似于内部连接,所以一个流的元素如果在其滚动窗口中没有来自另一个流的元素,则不会发出。

        如图所示,我们定义了一个大小为2毫秒的滚动窗口,其窗体为[0,2),[2,4),....该图像显示了每个窗口中所有元素的成对组合,这些元素将传递给JoinFunction。请注意,在滚动窗口[6,8)中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦连接的元素。 

package com.leboop.joining;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.JoinedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Arrays;

/**
 * Description TODO.
 * Date 2024/8/13 16:16
 *
 * @author leb
 * @version 2.0
 */
public class TumblingWindowJoinDemo {
    public static void main(String[] args) throws Exception {
        // 初始化环境.
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从集合中读取数据. Tuple2的第二个分量为事件时间.
        DataStreamSource<Tuple2<Integer, Long>> greenStream = env.fromCollection(Arrays.asList(
                new Tuple2<>(0, 0L),
                new Tuple2<>(1, 1000L),
                new Tuple2<>(3, 3000L),
                new Tuple2<>(4, 4000L)
        ));

        // 为流添加水位线,并指定时间戳抽取方法.
        SingleOutputStreamOperator<Tuple2<Integer, Long>> greenWatermarks =
                greenStream.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new MySerializableTimestampAssigner()));

        // 从集合中读取数据. Tuple2的第二个分量为事件时间.
        DataStreamSource<Tuple2<Integer, Long>> orangeStream = env.fromCollection(Arrays.asList(
                new Tuple2<>(0, 0L),
                new Tuple2<>(1, 1000L),
                new Tuple2<>(2, 2000L),
                new Tuple2<>(3, 3000L),
                new Tuple2<>(4, 4000L),
                new Tuple2<>(5, 5000L),
                new Tuple2<>(6, 6000L),
                new Tuple2<>(7, 7000L)
        ));

        // 为流添加水位线,并指定时间戳抽取方法.
        SingleOutputStreamOperator<Tuple2<Integer, Long>> orangeWatermarks =
                orangeStream.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new MySerializableTimestampAssigner()));

        // 读取的两个流做join.
        JoinedStreams<Tuple2<Integer, Long>, Tuple2<Integer, Long>> joinedStreams =
                orangeWatermarks.join(greenWatermarks);

        // 两个流指定公共key和公共窗口.
        DataStream<String> apply = joinedStreams
                .where(new MyKeySelector())
                .equalTo(new MyKeySelector())
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .apply(new MyJoinFunction());

        apply.print();

        env.execute();
    }

    static class MyKeySelector implements KeySelector<Tuple2<Integer, Long>, Integer> {
        @Override
        public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
            return 1;
        }
    }

    static class MyJoinFunction implements JoinFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>, String> {
        @Override
        public String join(Tuple2<Integer, Long> first, Tuple2<Integer, Long> second) throws Exception {
            return first.f0 + "," + second.f0;
        }
    }

    static class MySerializableTimestampAssigner implements SerializableTimestampAssigner<Tuple2<Integer, Long>> {
        @Override
        public long extractTimestamp(Tuple2<Integer, Long> element, long recordTimestamp) {
            return element.f1;
        }
    }

}

 代码运行结果:

6> 0,0
6> 0,1
6> 1,0
6> 1,1
6> 2,3
6> 3,3
6> 4,4
6> 5,4

结果与上图一致。 

Sliding Window Join

       在执行滑动窗口连接时,所有具有公共键和公共滑动窗口的元素都以成对组合的形式连接起来,并传递给JoinFunction或FlatJoinFunction。在当前滑动窗口中,如果一个流中的元素没有另一个流中的元素,则不会发出!请注意,有些元素可能会在一个滑动窗口中连接,而不是在另一个滑动窗口中连接!

        在这个例子中,我们使用大小为2毫秒的滑动窗口,并将它们滑动1毫秒,从而产生滑动窗口[- 1,1),[0,2),[1,3),[2,4),....x轴下方的连接元素是传递给每个滑动窗口的JoinFunction的元素。在这里,你也可以看到,例如,橙色②与窗口[2,4)中的绿色③是如何结合在一起的,但没有与窗口[1,2]中的任何东西结合在一起。

package com.leboop.joining;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.JoinedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Arrays;

/**
 * Description TODO.
 * Date 2024/8/13 16:16
 *
 * @author leb
 * @version 2.0
 */
public class SlidingWindowJoinDemo {
    public static void main(String[] args) throws Exception {
        // 初始化环境.
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从集合中读取数据. Tuple2的第二个分量为事件时间.
        DataStreamSource<Tuple2<Integer, Long>> greenStream = env.fromCollection(Arrays.asList(
                new Tuple2<>(0, 0L),
//                new Tuple2<>(1, 1000L),
                new Tuple2<>(3, 3000L),
                new Tuple2<>(4, 4000L)
        ));

        // 为流添加水位线,并指定时间戳抽取方法.
        SingleOutputStreamOperator<Tuple2<Integer, Long>> greenWatermarks =
                greenStream.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new MySerializableTimestampAssigner()));

        // 从集合中读取数据. Tuple2的第二个分量为事件时间.
        DataStreamSource<Tuple2<Integer, Long>> orangeStream = env.fromCollection(Arrays.asList(
                new Tuple2<>(0, 0L),
                new Tuple2<>(1, 1000L),
                new Tuple2<>(2, 2000L),
                new Tuple2<>(3, 3000L),
                new Tuple2<>(4, 4000L),
                new Tuple2<>(5, 5000L),
                new Tuple2<>(6, 6000L),
                new Tuple2<>(7, 7000L)
        ));

        // 为流添加水位线,并指定时间戳抽取方法.
        SingleOutputStreamOperator<Tuple2<Integer, Long>> orangeWatermarks =
                orangeStream.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new MySerializableTimestampAssigner()));

        // 读取的两个流做join.
        JoinedStreams<Tuple2<Integer, Long>, Tuple2<Integer, Long>> joinedStreams =
                orangeWatermarks.join(greenWatermarks);

        // 两个流指定公共key和公共窗口.
        DataStream<String> apply = joinedStreams
                .where(new MyKeySelector())
                .equalTo(new MyKeySelector())
                .window(SlidingEventTimeWindows.of(Time.seconds(2), Time.seconds(1)))
                .apply(new MyJoinFunction());

        apply.print();

        env.execute();
    }

    static class MyKeySelector implements KeySelector<Tuple2<Integer, Long>, Integer> {
        @Override
        public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
            return 1;
        }
    }

    static class MyJoinFunction implements JoinFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>, String> {
        @Override
        public String join(Tuple2<Integer, Long> first, Tuple2<Integer, Long> second) throws Exception {
            return first.f0 + "," + second.f0;
        }
    }

    static class MySerializableTimestampAssigner implements SerializableTimestampAssigner<Tuple2<Integer, Long>> {
        @Override
        public long extractTimestamp(Tuple2<Integer, Long> element, long recordTimestamp) {
            return element.f1;
        }
    }

}

程序运行结果如下:

6> 0,0
6> 0,0
6> 1,0
6> 2,3
6> 3,3
6> 3,3
6> 3,4
6> 4,3
6> 4,4
6> 4,4
6> 5,4

 运行结果和图中所示结果一致。

Session Window Join

        当执行会话窗口连接时,“组合”时满足会话准则的具有相同键的所有元素以成对组合的方式连接并传递给JoinFunction或FlatJoinFunction。这再次执行内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出!

        这里我们定义了一个会话窗口连接,其中每个会话至少间隔1ms。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三次会话中,绿色流中没有元素,所以⑧和⑨没有连接!

package com.leboop.joining;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.JoinedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.util.Arrays;

/**
 * Description TODO.
 * Date 2024/8/13 16:16
 *
 * @author leb
 * @version 2.0
 */
public class SessionWindowJoinDemo {
    public static void main(String[] args) throws Exception {
        // 初始化环境.
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从集合中读取数据. Tuple2的第二个分量为事件时间.
        DataStreamSource<Tuple2<Integer, Long>> greenStream = env.fromCollection(Arrays.asList(
                new Tuple2<>(0, 0L),
                new Tuple2<>(4, 4000L),
                new Tuple2<>(5, 5000L)
        ));

        // 为流添加水位线,并指定时间戳抽取方法.
        SingleOutputStreamOperator<Tuple2<Integer, Long>> greenWatermarks =
                greenStream.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new MySerializableTimestampAssigner()));

        // 从集合中读取数据. Tuple2的第二个分量为事件时间.
        DataStreamSource<Tuple2<Integer, Long>> orangeStream = env.fromCollection(Arrays.asList(
                new Tuple2<>(1, 1000L),
                new Tuple2<>(2, 2000L),
                new Tuple2<>(5, 5000L),
                new Tuple2<>(6, 6000L),
                new Tuple2<>(8, 8000L),
                new Tuple2<>(9, 9000L)
        ));

        // 为流添加水位线,并指定时间戳抽取方法.
        SingleOutputStreamOperator<Tuple2<Integer, Long>> orangeWatermarks =
                orangeStream.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new MySerializableTimestampAssigner()));

        // 读取的两个流做join.
        JoinedStreams<Tuple2<Integer, Long>, Tuple2<Integer, Long>> joinedStreams =
                orangeWatermarks.join(greenWatermarks);

        // 两个流指定公共key和公共窗口.
        DataStream<String> apply = joinedStreams
                .where(new MyKeySelector())
                .equalTo(new MyKeySelector())
                .window(EventTimeSessionWindows.withGap(Time.seconds(1)))
                .apply(new MyJoinFunction());

        apply.print();

        env.execute();
    }

    static class MyKeySelector implements KeySelector<Tuple2<Integer, Long>, Integer> {
        @Override
        public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
            return 1;
        }
    }

    static class MyJoinFunction implements JoinFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>, String> {
        @Override
        public String join(Tuple2<Integer, Long> first, Tuple2<Integer, Long> second) throws Exception {
            return first.f0 + "," + second.f0;
        }
    }

    static class MySerializableTimestampAssigner implements SerializableTimestampAssigner<Tuple2<Integer, Long>> {
        @Override
        public long extractTimestamp(Tuple2<Integer, Long> element, long recordTimestamp) {
            return element.f1;
        }
    }

}

程序运行结果如下:

6> 1,0
6> 2,0
6> 5,4
6> 5,5
6> 6,4
6> 6,5

 这与图中所示一致。


Interval Join

        区间连接用一个共同的键连接两个流的元素(我们现在称它们为A&B),其中流B的元素具有与流A中元素的时间戳处于相对时间区间的时间戳。下图中橙色流是A流,绿色流是B流。

        这也可以更正式地表示为:

b.timestamp∈[a.timestamp + lowerBound,a.timestamp + upperBound]

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

其中a和b是A和B中共享一个公共键的元素。下界lowerBound和上界lowerBound都可以是正负,只要下界总是小于或等于上界。区间连接目前只执行内部连接。

        当一对元素被传递给ProcessJoinFunction时,它们将被分配两个元素中较大的时间戳(可以通过ProcessJoinFunction.ontext访问)。

        区间连接目前只支持事件时间。

 在上面的例子中,我们以-2毫秒的下限和+1毫秒的上限连接两个流“橙色”和“绿色”。默认情况下,这些边界是包含的,但是可以应用. lowerboundexclusive()和. upperboundexclusive()来改变行为。
再次使用更正式的符号,这将转化为
orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts +upperBound
如三角形所示。 

package com.leboop.joining;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Description TODO.
 * Date 2024/8/13 16:16
 *
 * @author leb
 * @version 2.0
 */
public class IntervalJoinDemo {
    public static void main(String[] args) throws Exception {
        // 初始化环境.
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从集合中读取数据. Tuple2的第二个分量为事件时间.
        DataStreamSource<Tuple2<Integer, Long>> greenStream = env.fromCollection(Arrays.asList(
                new Tuple2<>(0, 0L),
                new Tuple2<>(1, 1000L),
                new Tuple2<>(6, 6000L),
                new Tuple2<>(7, 7000L)
        ));

        // 为流添加水位线,并指定时间戳抽取方法.
        SingleOutputStreamOperator<Tuple2<Integer, Long>> greenWatermarks =
                greenStream.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new MySerializableTimestampAssigner()));

        KeyedStream<Tuple2<Integer, Long>, Integer> greenKeyedStream = greenWatermarks.keyBy(new MyKeySelector());

        // 从集合中读取数据. Tuple2的第二个分量为事件时间.
        DataStreamSource<Tuple2<Integer, Long>> orangeStream = env.fromCollection(Arrays.asList(
                new Tuple2<>(0, 0L),
                new Tuple2<>(2, 2000L),
                new Tuple2<>(3, 3000L),
                new Tuple2<>(4, 4000L),
                new Tuple2<>(5, 5000L),
                new Tuple2<>(7, 7000L)
        ));

        // 为流添加水位线,并指定时间戳抽取方法.
        SingleOutputStreamOperator<Tuple2<Integer, Long>> orangeWatermarks =
                orangeStream.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new MySerializableTimestampAssigner()));

        KeyedStream<Tuple2<Integer, Long>, Integer> orangeKeyedStream = orangeWatermarks.keyBy(new MyKeySelector());

        // 读取的两个流做join.

        SingleOutputStreamOperator<String> intervalJoinStream = orangeKeyedStream.intervalJoin(greenKeyedStream)
                .between(Time.seconds(-2), Time.seconds(1))
                .process(new ProcessJoinFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>, String>() {
                    @Override
                    public void processElement(Tuple2<Integer, Long> left, Tuple2<Integer, Long> right, Context ctx, Collector<String> out) throws Exception {
                        out.collect(left.f0 + "," + right.f0);
                    }
                });


        intervalJoinStream.print();

        env.execute();
    }

    static class MyKeySelector implements KeySelector<Tuple2<Integer, Long>, Integer> {
        @Override
        public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
            return 1;
        }
    }

    static class MyJoinFunction implements JoinFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>, String> {
        @Override
        public String join(Tuple2<Integer, Long> first, Tuple2<Integer, Long> second) throws Exception {
            return first.f0 + "," + second.f0;
        }
    }

    static class MySerializableTimestampAssigner implements SerializableTimestampAssigner<Tuple2<Integer, Long>> {
        @Override
        public long extractTimestamp(Tuple2<Integer, Long> element, long recordTimestamp) {
            return element.f1;
        }
    }

}

程序运行结果如下:

6> 0,0
6> 0,1
6> 2,0
6> 2,1
6> 3,1
6> 5,6
6> 7,6
6> 7,7

结果与图中一致。


Window CoGroup

        Window CoGroup类似SQL中的full outer join。两个流连接,没有匹配上的结果也会输出。根据Window CoGroup根据具体窗口类型不同,也有

(1)Tumbling Window CoGroup

(2)Sliding Window CoGroup

(3)Session Window CoGroup

Window CoGroup的一个例子代码如下:

package com.leboop.joining;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Description TODO.
 * Date 2024/8/13 16:16
 *
 * @author leb
 * @version 2.0
 */
public class WindowCogroupDemo {
    public static void main(String[] args) throws Exception {
        // 初始化环境.
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 从集合中读取数据. Tuple2的第二个分量为事件时间.
        DataStreamSource<Tuple2<Integer, Long>> greenStream = env.fromCollection(Arrays.asList(
                new Tuple2<>(0, 0L),
                new Tuple2<>(1, 1000L),
                new Tuple2<>(3, 3000L),
                new Tuple2<>(4, 4000L)
        ));

        // 为流添加水位线,并指定时间戳抽取方法.
        SingleOutputStreamOperator<Tuple2<Integer, Long>> greenWatermarks =
                greenStream.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new MySerializableTimestampAssigner()));

        // 从集合中读取数据. Tuple2的第二个分量为事件时间.
        DataStreamSource<Tuple2<Integer, Long>> orangeStream = env.fromCollection(Arrays.asList(
                new Tuple2<>(0, 0L),
                new Tuple2<>(1, 1000L),
                new Tuple2<>(2, 2000L),
                new Tuple2<>(3, 3000L),
                new Tuple2<>(4, 4000L),
                new Tuple2<>(5, 5000L),
                new Tuple2<>(6, 6000L),
                new Tuple2<>(7, 7000L)
        ));

        // 为流添加水位线,并指定时间戳抽取方法.
        SingleOutputStreamOperator<Tuple2<Integer, Long>> orangeWatermarks =
                orangeStream.assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple2<Integer, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(new MySerializableTimestampAssigner()));

        // 读取的两个流做join.

        CoGroupedStreams<Tuple2<Integer, Long>, Tuple2<Integer, Long>> coGroupedStreams =
                orangeWatermarks.coGroup(greenWatermarks);

        // 两个流指定公共key和公共窗口.
        DataStream<String> apply = coGroupedStreams
                .where(new MyKeySelector())
                .equalTo(new MyKeySelector())
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .apply(new MyCoGroupFunction());

        apply.print();

        env.execute();
    }

    static class MyKeySelector implements KeySelector<Tuple2<Integer, Long>, Integer> {
        @Override
        public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
            return value.f0;
        }
    }

    static class MyCoGroupFunction implements CoGroupFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>, String> {
        @Override
        public void coGroup(Iterable<Tuple2<Integer, Long>> first, Iterable<Tuple2<Integer, Long>> second, Collector<String> out) throws Exception {
            out.collect(first + "," + second);
        }
    }

    static class MySerializableTimestampAssigner implements SerializableTimestampAssigner<Tuple2<Integer, Long>> {
        @Override
        public long extractTimestamp(Tuple2<Integer, Long> element, long recordTimestamp) {
            return element.f1;
        }
    }

}

程序运行结果如下:

1> [(4,4000)],[(4,4000)]
6> [(0,0)],[(0,0)]
6> [(1,1000)],[(1,1000)]
8> [(3,3000)],[(3,3000)]
8> [(2,2000)],[]
8> [(5,5000)],[]
8> [(7,7000)],[]
2> [(6,6000)],[]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2036460.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从老旧到智慧病房,全视通智慧病房方案减轻医护工作负担

传统的老旧病房面临着诸多挑战。例如&#xff0c;患者风险信息的管理仍依赖于手写记录和人工核查&#xff1b;病房呼叫系统仅支持基本的点对点呼叫&#xff0c;缺乏与其它系统的集成能力&#xff1b;信息传递主要依靠医护人员口头传达&#xff1b;护理信息管理分散且不连贯&…

JavaEE-多线程

前情提要&#xff1a;本文内容过多&#xff0c;建议搭配目录食用&#xff0c;想看哪里点哪里~ PC端目录 手机端目录 话不多说&#xff0c;我们正式开始~~ 目录 多线程的概念进程和线程的区别和联系:使用Java代码进行多线程编程Thread类中的方法和属性线程的核心操作1. 启动…

【mamba学习】(一)SSM原理与说明

mamba输入输出实现与transformer几乎完全一样的功能&#xff0c;但速度和内存占用具有很大优势。对比transformer&#xff0c;transformer存在记忆有限的情况&#xff0c;如果输入或者预测的序列过长可能导致爆炸&#xff08;非线性&#xff09;&#xff0c;而mamba不存在这种情…

物理网卡MAC修改器v3.0-直接修改网卡内部硬件MAC地址,重装系统不变!

直接在操作系统里就能修改网卡硬件mac地址&#xff0c;刷新网卡mac序列号硬件码机器码&#xff0c;电脑主板集成网卡&#xff0c;pcie网卡&#xff0c;usb有线网卡&#xff0c;usb无线网卡&#xff0c;英特尔网卡&#xff0c;瑞昱网卡全支持&#xff01; 一键修改mac&#xff0…

百问网全志系列开发板音频ALSA配置步骤详解

8 ALSA 8.1 音频相关概念 ​ 音频信号是一种连续变化的模拟信号&#xff0c;但计算机只能处理和记录二进制的数字信号&#xff0c;由自然音源得到的音频信号必须经过一定的变换&#xff0c;成为数字音频信号之后&#xff0c;才能送到计算机中作进一步的处理。 ​ 数字音频系…

MySQL本地Windows安装

下载MySQL 官网&#xff1a;MySQL 下载完成后文件 安装类型 选择功能 功能过滤筛选&#xff0c;系统为64位操作系统&#xff0c;所以选【64-bit】&#xff0c;32位可选【32.bit】 下方勾选后自动检查安装环境&#xff0c;如果提示缺少运行库&#xff0c;请先安装VC_redist.x64。…

【Dash】plotly.express 气泡图

一、More about Visualization The Dash Core Compnents module dash.dcc includes a componenet called Graph. Graph renders interactive data visualizations using the open source plotly.js javaScript graphing library.Plotly.js supports over 35 chart types and …

数据结构 之 二叉树功能的代码实现

文章目录 二叉搜索树搜索删除节点二叉树的遍历 代码实现 二叉搜索树 无序树的查找&#xff0c;就是整个遍历&#xff0c;所以时间复杂度为O(N)。为了优化无序查找的时间复杂度&#xff0c;我们把树进行排序&#xff0c;这里引入了二叉搜索树。 二叉搜索树是一个有序树&#xf…

vue el-select下拉框在弹框里面错位,

1&#xff1a;原因是因为 底层滚动条滚动的问题。 2&#xff1a;解决方案 加上这个属性 :popper-append-to-body"false" 和样式 <el-select:popper-append-to-body"false"> </el-select><style> .el-select .el-select-dropdown {t…

数据埋点系列 4|数据分析与可视化:从数据到洞察

在前面的文章中,我们讨论了数据埋点的基础知识、技术实现以及数据质量保证。现在,我们拥有了高质量的数据,是时候深入挖掘这些数据的价值了。本文将带你探索如何通过数据分析和可视化,将原始数据转化为有价值的业务洞察。 目录 1. 数据分析基础1.1 描述性统计1.2 推断统计1.3 相…

Haproxy的配置详解与使用

一、haproxy简介 HAProxy是一个使用C语言编写的自由及开放源代码软件&#xff0c;其提供高可用性、负载均衡&#xff0c;以及基于TCP和HTTP的应用程序代理。 HAProxy特别适用于那些负载特大的web站点&#xff0c;这些站点通常又需要会话保持或七层处理。HAProxy运行在当前的硬…

uniapp实现自定义弹窗组件,支持富文本传入内容

1.首先安装vuex 通过此命令安装 ​​npm install vuex --save​​ 创建initModal.js import Vuex from vuex // 自定义弹窗 export default function initModal (v) {// 挂在store到全局Vue原型上v.prototype.$modalStore new Vuex.Store({state: {show: false,title: 标题,c…

【人工智能】深入理解自监督学习中的表征学习与对比学习

我的主页&#xff1a;2的n次方_ 1. 自监督学习 1.1 自监督学习的概念 自监督学习是一种无需大规模标注数据的学习方法&#xff0c;通过构造代理任务&#xff0c;模型可以从数据本身获取监督信号&#xff0c;从而学习有用的特征表征。 1.2 自监督学习的背景与重要性 在当今大…

【C++进阶学习】第十三弹——C++智能指针的深入解析

前言&#xff1a; 在C编程中&#xff0c;内存管理是至关重要的一个环节。传统的手动内存管理方式容易导致内存泄漏、悬挂指针等问题。为了解决这些问题&#xff0c;C引入了智能指针。本文将详细讲解C中智能指针的概念、种类、使用方法以及注意事项。 目录 一、引言 二、智能指…

链表---数据结构-黑马

链表 定义 链表是数据元素的线性集合&#xff0c;其每个元素都指向下一个元素&#xff0c;元素存储上是不连续的。 分类 单向链表&#xff0c;每个元素只知道自己的下一个元素是谁。 双向链表&#xff0c;每个元素知道自己的上一个元素和下一个元素。 循环链表&#xff0c;…

分布式锁:Mysql实现,Redis实现,Zookeeper实现

目录 前置知识 Mysql实现分布式锁 1.get_lock函数 Java代码实现&#xff1a; 2.for update尾缀 Java代码实现&#xff1a; 3.自己定义锁表 Java代码实现&#xff1a; 4.时间戳列实现乐观锁 Java代码实现&#xff1a; Redis实现分布式锁 Zookeeper实现分布式锁&#…

Oracle搭建一主两备dataguard环境的详细步骤

​ 上一篇文章介绍了Oracle一主两备的DG环境&#xff0c;如何进行switchover切换&#xff0c;也许你会问Oracle一主两备dataguard环境要怎么搭建&#xff0c;本篇文章将为你讲述一主两备dataguard详细搭建步骤。 环境说明 主机名IP地址db_unique_name数据库角色ora11g10.10.1…

驱动数智化升级,AI大模型准备好了吗?

大数据产业创新服务媒体 ——聚焦数据 改变商业 AI大模型的快速崛起&#xff0c;为企业带来了前所未有的变革机遇。从自然语言处理到图像识别&#xff0c;从精准营销到智能制造&#xff0c;AI大模型正逐步渗透到各行各业的核心业务中。然而&#xff0c;随着技术的不断演进&…

力扣刷题-循环队列

&#x1f308;个人主页&#xff1a;羽晨同学 &#x1f4ab;个人格言:“成为自己未来的主人~” 思路&#xff1a; 我们在这里采用的是用数组的形式实现循环链表&#xff0c;我认为这个用数组是更为简单的&#xff0c;我们只需要控制下标就可以实现循环链表的效果。具体实现代…

Python数据可视化案例——折线图

目录 json介绍&#xff1a; Pyecharts介绍 安装pyecharts包 构建一个基础的折线图 配置全局配置项 综合案例&#xff1a; 使用工具对数据进行查看 &#xff1a; 数据处理 json介绍&#xff1a; json是一种轻量级的数据交互格式&#xff0c;采用完全独立于编程语言的文…