Flink 连接流详解

news2024/12/23 3:58:30

连接流

1 Union

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。这种合流方式非常简单粗暴,就像公路上多个车道汇在一起一样。

在这里插入图片描述

在代码中,我们只要基于 DataStream 直接调用.union()方法,传入其他 DataStream 作为参数,就可以实现流的联合了;得到的依然是一个 DataStream:

stream1.union(stream2, stream3, ...)

union()的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。这里需要考虑一个问题。在事件时间语义下,水位线是时间的进度标志;不同的流中可能水位线的进展快慢完全不同,如果它们合并在一起,水位线又该以哪个为准呢?还以要考虑水位线的本质含义,是“之前的所有数据已经到齐了”;所以对于合流之后的水位线,也是要以最小的那个为准,这样才可以保证所有流都不会再传来之前的数据。换句话说,多流合并时处理的时效性是以最慢的那个流为准的。我们自然可以想到,这与之前介绍的并行任务水位线传递的规则是完全一致的;多条流的合并,某种意义上也可以看作是多个并行任务向同一个下游任务汇合的过程。

package com.rosh.flink.combine;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

import java.time.Duration;

public class UnionTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // nc -lk 7777
        DataStreamSource<String> stream1 = env.socketTextStream("hadoop4", 7777);
        // nc -lk 7778
        DataStreamSource<String> stream2 = env.socketTextStream("hadoop4", 7778);

        //转换
        SingleOutputStreamOperator<UserPojo> user1DS = stream1.map(new MapFunction<String, UserPojo>() {
            @Override
            public UserPojo map(String value) throws Exception {
                return getUserPojo(value);
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {
            @Override
            public long extractTimestamp(UserPojo element, long recordTimestamp) {
                return element.getTimestamp();
            }
        }));

        SingleOutputStreamOperator<UserPojo> user2DS = stream2.map(new MapFunction<String, UserPojo>() {
            @Override
            public UserPojo map(String value) throws Exception {
                return getUserPojo(value);
            }
        }).assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forBoundedOutOfOrderness(Duration.ofSeconds(3)).withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {
            @Override
            public long extractTimestamp(UserPojo element, long recordTimestamp) {
                return element.getTimestamp();
            }
        }));

        //联合
        DataStream<UserPojo> unionDS = user1DS.union(user2DS);
        SingleOutputStreamOperator<String> resultDS = unionDS.process(new ProcessFunction<UserPojo, String>() {
            @Override
            public void processElement(UserPojo value, ProcessFunction<UserPojo, String>.Context ctx, Collector<String> out) throws Exception {
                out.collect("水位线:" + ctx.timerService().currentWatermark());

            }
        });
        resultDS.print();

        //执行
        env.execute("UnionTest");
    }

    /**
     * {"userId":1,"name":"rosh","uri":"/goods/1","timestamp":1000}
  	 * {"userId":1,"name":"rosh","uri":"/goods/1","timestamp":5000}
     */
    private static UserPojo getUserPojo(String str) {
        JSONObject jsonObject = JSON.parseObject(str);
        Integer userId = jsonObject.getInteger("userId");
        String name = jsonObject.getString("name");
        String uri = jsonObject.getString("uri");
        Long timestamp = jsonObject.getLong("timestamp");
        return new UserPojo(userId, name, uri, timestamp);
    }

}

在这里插入图片描述

2 连接(Connect)

流的联合虽然简单,不过受限于数据类型不能改变,灵活性大打折扣,所以实际应用较少出现。除了联合(union),Flink 还提供了另外一种方便的合流操作——连接(connect)。顾名思义,这种操作就是直接把两条流像接线一样对接起来。

为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个 DataStream 中的数据只能有唯一的类型,所以连接得到的并不是 DataStream,而是一个“连接流”(ConnectedStreams)。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的 DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个 DataStream 中。

在这里插入图片描述

在代码实现上,需要分为两步:首先基于一条 DataStream 调用.connect()方法,传入另外一条 DataStream 作为参数,将两条流连接起来,得到一个 ConnectedStreams;然后再调用同处理方法得到 DataStream。这里可以的调用的同处理方法有.map()/.flatMap(),以及.process()方法。

两条流的连接(connect),与联合(union)操作相比,最大的优势就是可以处理不同类型的流的合并,使用更灵活、应用更广泛。当然它也有限制,就是合并流的数量只能是 2,而 union
可以同时进行多条流的合并。这也非常容易理解:union 限制了类型不变,所以直接合并没有问题;而 connect 是“一国两制”,后续处理的接口只定义了两个转换方法,如果扩展需要重新定义接口,所以不能“一国多制”。

package com.rosh.flink.test;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

public class ConnectTest {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //创建流
        DataStreamSource<Integer> integerDS = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<Long> longDS = env.fromElements(1L, 2L, 3L, 4L, 5L);

        //connect
        ConnectedStreams<Integer, Long> connectDS = integerDS.connect(longDS);
        //map
        SingleOutputStreamOperator<String> rsDS = connectDS.map(new CoMapFunction<Integer, Long, String>() {
            @Override
            public String map1(Integer value) throws Exception {
                return "Integer:" + value;
            }

            @Override
            public String map2(Long value) throws Exception {
                return "Long: " + value;
            }
        });
        //打印
        rsDS.print();

        env.execute("ConnectTest");

    }
}

在这里插入图片描述

****CoProcessFunction,****对于连接流 ConnectedStreams 的处理操作,需要分别定义对两条流的处理转换,因此接口中就会有两个相同的方法需要实现,用数字“1”“2”区分,在两条流中的数据到来时分别调
用。我们把这种接口叫作“协同处理函数”(co-process function)。与 CoMapFunction 类似,如果是调用.flatMap()就需要传入一个 CoFlatMapFunction,需要实现 flatMap1()、flatMap2()两个
方法;而调用.process()时,传入的则是一个 CoProcessFunction。抽象类 CoProcessFunction 在源码中定义如下:

public abstract class CoProcessFunction<IN1, IN2, OUT> extends  AbstractRichFunction {
	...
	
	public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;

	public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

	public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}

	public abstract class Context {...}
	
	...
}

我们可以看到,很明显 CoProcessFunction 也是“处理函数”家族中的一员,用法非常相似。它需要实现的就是 processElement1()、processElement2()两个方法,在每个数据到来时,会根据来源的流调用其中的一个方法进行处理。CoProcessFunction 同样可以通过上下文 ctx 来访问 timestamp、水位线,并通过 TimerService 注册定时器;另外也提供了.onTimer()方法,用于定义定时触发的处理操作。

下面是 CoProcessFunction 的一个具体示例:我们可以实现一个实时对账的需求,也就是app 的支付操作和第三方的支付操作的一个双流 Join。App 的支付事件和第三方的支付事件将会互相等待 5 秒钟,如果等不来对应的支付事件,那么就输出报警信息。程序如下:

package com.rosh.flink.combine;

import com.alibaba.fastjson.JSON;
import com.rosh.flink.pojo.OrderPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

public class CheckBillTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //数据流
        SingleOutputStreamOperator<OrderPojo> appDS = env.fromElements(
                new OrderPojo(1L, "order1", 1000L),
                new OrderPojo(2L, "order2", 2000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderPojo>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<OrderPojo>() {
            @Override
            public long extractTimestamp(OrderPojo orderPojo, long l) {
                return orderPojo.getTimestamp();
            }
        }));

        SingleOutputStreamOperator<OrderPojo> threeDS = env.fromElements(
                new OrderPojo(1L, "order1", 3000L),
                new OrderPojo(2L, "order2", 4000L),
                new OrderPojo(3L, "order3", 5000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderPojo>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<OrderPojo>() {
            @Override
            public long extractTimestamp(OrderPojo orderPojo, long l) {
                return orderPojo.getTimestamp();
            }
        }));

        //connect
        SingleOutputStreamOperator<String> resultDS = appDS.connect(threeDS).keyBy(OrderPojo::getId, OrderPojo::getId)
                .process(new OrderProcessFunction());
        resultDS.print();

        env.execute("CheckBillTest");
    }

    public static class OrderProcessFunction extends CoProcessFunction<OrderPojo, OrderPojo, String> {

        private ValueState<OrderPojo> appState;

        private ValueState<OrderPojo> threeState;

        @Override
        public void open(Configuration parameters) throws Exception {

            appState = getRuntimeContext().getState(new ValueStateDescriptor<OrderPojo>("app-state", OrderPojo.class));
            threeState = getRuntimeContext().getState(new ValueStateDescriptor<OrderPojo>("three-state", OrderPojo.class));

        }

        /**
         * app逻辑
         */
        @Override
        public void processElement1(OrderPojo value, CoProcessFunction<OrderPojo, OrderPojo, String>.Context ctx, Collector<String> out) throws Exception {

            if (threeState.value() != null) {
                out.collect(JSON.toJSONString(value) + "【app 对账成功】");
                threeState.clear();
            } else {
                //更新状态
                appState.update(value);
                //注册5秒后定时器
                ctx.timerService().registerEventTimeTimer(value.getTimestamp() + 5000L);
            }

        }

        /**
         * 三方逻辑
         */
        @Override
        public void processElement2(OrderPojo value, CoProcessFunction<OrderPojo, OrderPojo, String>.Context ctx, Collector<String> out) throws Exception {

            if (appState.value() != null) {
                out.collect(JSON.toJSONString(value) + "【three 对账成功】");
                appState.clear();
            } else {
                threeState.update(value);
                //注册5秒后定时器
                ctx.timerService().registerEventTimeTimer(value.getTimestamp() + 5000L);
            }
        }

        @Override
        public void onTimer(long timestamp, CoProcessFunction<OrderPojo, OrderPojo, String>.OnTimerContext ctx, Collector<String> out) throws Exception {

            if (appState.value() != null) {
                out.collect(JSON.toJSONString(appState.value()) + "【app 对账失败】");
            }
            if (threeState.value() != null) {
                out.collect(JSON.toJSONString(threeState.value()) + "【三方 对账失败】");
            }

            appState.clear();
            threeState.clear();

        }
    }

}

在这里插入图片描述

3 窗口联结(Window Join)

基于时间的操作,最基本的当然就是时间窗口了。我们之前已经介绍过 Window API 的用法,主要是针对单一数据流在某些时间段内的处理计算。那如果我们希望将两条流的数据进行
合并、且同样针对某段时间进行处理和统计,又该怎么做呢?Flink 为这种场景专门提供了一个窗口联结(window join)算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

窗口联结的调用:

窗口联结在代码中的实现,首先需要调用 DataStream 的.join()方法来合并两条流,得到一个 JoinedStreams;接着通过.where()和.equalTo()方法指定两条流中联结的 key;然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算。通用调用形式如下:

stream1.join(stream2) .where(<KeySelector>) .equalTo(<KeySelector>) .window(<WindowAssigner>) .apply(<JoinFunction>)

上面代码中.where()的参数是键选择器(KeySelector),用来指定第一条流中的 key;而.equalTo()传入的 KeySelector 则指定了第二条流中的 key。两者相同的元素,如果在同一窗口中,就可以匹配起来,并通过一个“联结函数”(JoinFunction)进行处理了。

这里.window()传入的就是窗口分配器,之前讲到的三种时间窗口都可以用在这里:滚动窗口(tumbling window)、滑动窗口(sliding window)和会话窗口(session window)。而后面调用.apply()可以看作实现了一个特殊的窗口函数。注意这里只能调用.apply(),没有其他替代的方法。传入的 JoinFunction 也是一个函数类接口,使用时需要实现内部的.join()方法。这个方法有两个参数,分别表示两条流中成对匹配的数据。JoinFunction 在源码中的定义如下:

public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
		 
			OUT join(IN1 first, IN2 second) throws Exception;

}

这里需要注意,JoinFunciton 并不是真正的“窗口函数”,它只是定义了窗口函数在调用时对匹配数据的具体处理逻辑。当然,既然是窗口计算,在.window()和.apply()之间也可以调用可选 API 去做一些自定义,比如用.trigger()定义触发器,用.allowedLateness()定义允许延迟时间,等等。

JoinFunction 中的两个参数,分别代表了两条流中的匹配的数据。这里就会有一个问题:什么时候就会匹配好数据,调用.join()方法呢?接下来我们就来介绍一下窗口 join 的具体处理流程。

两条流的数据到来之后,首先会按照 key 分组、进入对应的窗口中存储;当到达窗口结束时间时,算子会先统计出窗口内两条流的数据的所有组合,也就是对两条流中的数据做一个笛卡尔积(相当于表的交叉连接,cross join),然后进行遍历,把每一对匹配的数据,作为参数(first,second)传入 JoinFunction 的.join()方法进行计算处理,得到的结果直接输出。所以窗口中每有一对数据成功联结匹配,JoinFunction 的.join()方法就会被调用一次,并输出一个结果。

在这里插入图片描述

除了 JoinFunction,在.apply()方法中还可以传入 FlatJoinFunction,用法非常类似,只是内部需要实现的.join()方法没有返回值。结果的输出是通过收集器(Collector)来实现的,所以对于一对匹配数据可以输出任意条结果。其实仔细观察可以发现,窗口 join 的调用语法和我们熟悉的 SQL 中表的 join 非常相似:

SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;

这句 SQL 中 where 子句的表达,等价于 inner join … on,所以本身表示的是两张表基于 id的“内连接”(inner join)。而 Flink 中的 window join,同样类似于 inner join。也就是说,最后
处理输出的,只有两条流中数据按 key 配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用 JoinFunction 的.join()方法,也就没有任何输出了。

demo:

在电商网站中,往往需要统计用户不同行为之间的转化,这就需要对不同的行为数据流,按照用户 ID 进行分组后再合并,以分析它们之间的关联。如果这些是以固定时间周期(比如
1 小时)来统计的,那我们就可以使用窗口 join 来实现这样的需求。

package com.rosh.flink.combine;

import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class WindowJoinTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Long>> stream1 = env.fromElements(
                Tuple2.of("kobe", 1000L),
                Tuple2.of("james", 1000L),
                Tuple2.of("kobe", 2000L),
                Tuple2.of("james", 2000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));

        SingleOutputStreamOperator<Tuple2<String, Long>> stream2 = env.fromElements(
                Tuple2.of("kobe", 3000L),
                Tuple2.of("james", 3000L),
                Tuple2.of("kobe", 4000L),
                Tuple2.of("james", 4000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));

        DataStream<String> resultDS = stream1.join(stream2).where(t -> t.f0)
                .equalTo(t -> t.f0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public String join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
                        return JSON.toJSONString(first) + " =>  " + JSON.toJSONString(second);
                    }
                });

        resultDS.print();

        env.execute("WindowJoinTest");

    }

}

在这里插入图片描述

4 间隔联结(Interval Join)

在有些场景下,我们要处理的时间间隔可能并不是固定的。比如,在交易系统中,需要实时地对每一笔交易进行核验,保证两个账户转入转出数额相等,也就是所谓的“实时对账”。
两次转账的数据可能写入了不同的日志流,它们的时间戳应该相差不大,所以我们可以考虑只统计一段时间内是否有出账入账的数据匹配。这时显然不应该用滚动窗口或滑动窗口来处理—
—因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧,于是窗口内就都没有匹配了;会话窗口虽然时间不固定,但也明显不适合这个场景。 基于时间的窗口联结已经无能为力了。

为了应对这样的需求,Flink 提供了一种叫作“间隔联结”(interval join)的合流操作。顾名思义,间隔联结的思路就是针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,
看这期间是否有来自另一条流的数据匹配。

间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作 A)中的任意一个数据元素 a,就可以
开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据
的“窗口”范围。所以对于另一条流(不妨叫 B)中的数据元素 b,如果它的时间戳落在了这个区间范围内,a 和 b 就可以成功配对,进而进行计算输出结果。所以匹配的条件为:

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里需要注意,做间隔联结的两条流 A 和 B,也必须基于相同的 key;下界 lowerBound应该小于等于上界 upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。

在这里插入图片描述

下方的流 A 去间隔联结上方的流 B,所以基于 A 的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2 毫秒,上界为 1 毫秒。于是对于时间戳为 2 的 A 中元素,它的
可匹配区间就是[0, 3],流 B 中有时间戳为 0、1 的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A 中时间戳为 3 的元素,可匹配区间为[1, 4],B 中只有时
间戳为 1 的一个数据可以匹配,于是得到匹配数据对(3, 1)。

所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,intervaljoin 做匹配的时间段是基于流中数据的,所以并不确定;而且流 B 中的数据可以不只在一个区
间内被匹配。

间隔联结在代码中,是基于 KeyedStream 的联结(join)操作。DataStream 在 keyBy 得到KeyedStream 之后,可以调用.intervalJoin()来合并两条流,传入的参数同样是一个 KeyedStream,
两者的 key 类型应该一致;得到的是一个 IntervalJoin 类型。后续的操作同样是完全固定的:先通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹配数据对的处理操
作。调用.process()需要传入一个处理函数,这是处理函数家族的最后一员:“处理联结函数”通用调用形式如下:

stream1 .keyBy(<KeySelector>).intervalJoin(stream2.keyBy(<KeySelector>)).between(Time.milliseconds(-2), Time.milliseconds(1))
	.process (new ProcessJoinFunction<Integer, Integer, String(){
		 @Override
		  public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
		 
			 out.collect(left + "," + right);
	 }

 });

demo:

在电商网站中,某些用户行为往往会有短时间内的强关联。我们这里举一个例子,我们有两条流,一条是下订单的流,一条是浏览数据的流。我们可以针对同一个用户,来做这样一个
联结。也就是使用一个用户的下订单的事件和这个用户的最近十分钟的浏览数据进行一个联结查询。

package com.rosh.flink.combine;

import com.rosh.flink.pojo.UserPojo;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

public class IntervalJoinTest {


    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        //初始化流
        SingleOutputStreamOperator<Tuple3<String, String, Long>> orderDS =
                env.fromElements(
                        Tuple3.of("Mary", "order-1", 5000L),
                        Tuple3.of("Alice", "order-2", 5000L),
                        Tuple3.of("Bob", "order-3", 20000L),
                        Tuple3.of("Alice", "order-4", 20000L),
                        Tuple3.of("Cary", "order-5", 51000L)
                ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                            @Override
                            public long extractTimestamp(Tuple3<String, String, Long>
                                                                 element, long recordTimestamp) {
                                return element.f2;
                            }
                        })
                );

        SingleOutputStreamOperator<UserPojo> clickDS = env.fromElements(
                new UserPojo(1, "Bob", "./cart", 2000L),
                new UserPojo(2, "Alice", "./prod?id=100", 3000L),
                new UserPojo(2, "Alice", "./prod?id=200", 3500L),
                new UserPojo(1, "Bob", "./prod?id=2", 2500L),
                new UserPojo(2, "Alice", "./prod?id=300", 36000L),
                new UserPojo(1, "Bob", "./home", 30000L),
                new UserPojo(1, "Bob", "./prod?id=1", 23000L),
                new UserPojo(1, "Bob", "./prod?id=3", 33000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() {
                    @Override
                    public long extractTimestamp(UserPojo element, long recordTimestamp) {
                        return element.getTimestamp();
                    }
                })
        );

        //联合
        SingleOutputStreamOperator<String> resultDS = orderDS.keyBy(t -> t.f0)
                .intervalJoin(clickDS.keyBy(UserPojo::getName))
                .between(Time.seconds(-5), Time.seconds(10))
                .process(new ProcessJoinFunction<Tuple3<String, String, Long>, UserPojo, String>() {
                    @Override
                    public void processElement(Tuple3<String, String, Long> left, UserPojo right, ProcessJoinFunction<Tuple3<String, String, Long>, UserPojo, String>.Context ctx, Collector<String> out) throws Exception {
                        out.collect(left + " => " + right);
                    }
                });


        resultDS.print();


        env.execute("IntervalJoinTest");
    }


}

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/344410.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

tensorflow.js 对视频 / 直播人脸检测和特征点收集

前言&#xff1a;这里要介绍的是 Tensorflow.js 官方提供的两个人脸检测模型&#xff0c;分别是 face-detection 和 face-landmarks-detection。他们不但可以对视频中的人间进行精确定位&#xff0c;而且还能对当前设备 (手机 / 电脑摄像头) 采集的直播流实时监测人脸。所以这些…

大华城市安防监控系统平台管理存在任意文件下载漏洞

大华城市安防监控系统平台管理存在任意文件下载漏洞1.大华城市安防监控系统平台管理存在任意文件下载漏洞1.1.漏洞描述1.2.漏洞影响1.3.FOFA2.漏洞复现2.1.登录页面2.2.抓包1.大华城市安防监控系统平台管理存在任意文件下载漏洞 1.1.漏洞描述 大华城市安防监控系统平台管理存在…

企业级数据平台为什么要“可观测”? | StartDT Hackathon

近日&#xff0c;奇点云黑客马拉松“StartDT Hackathon”正式收官。 这期黑客松以“可观测性”为核心选题&#xff0c;旨在通过加强数据云平台DataSimba的可观测性&#xff0c;提升企业用户“自交付、自运维”的易用性和友好度&#xff0c;降低运维门槛&#xff0c;提升发现、…

4.7 反射

文章目录1.概述2.为什么需要反射3.反射需要用到的API3.1 获取字节码对象3.2 常用方法4.反射的应用4.1 创建 : 测试物料类4.2 练习 : 获取类对象4.3 练习 : 类获取构造方法4.4 练习 : 获取成员方法4.5 练习&#xff1a;获取成员变量4.6 练习 : 创建对象4.7 熟悉API4.7.1 创建物科…

定时任务框架xxl-job及quartz

本文主要介绍分布式定时任务框架xxl-job&#xff0c;本文首先会对xxl-job做一个基本的介绍&#xff0c;接着将xxl-job与quartz做一个比较&#xff0c;最后就是介绍xxl-job调度的详细过程。 xxl-job官方文档 xxl-job的介绍 xxl-job是一个开源的分布式定时任务框架&#xff0c;其…

若依前后端分离版集成nacos

根据公司要求&#xff0c;需要将项目集成到nacos中&#xff0c;当前项目是基于若依前后端分离版开发的&#xff0c;若依的版本为3.8.3&#xff0c;若依框架中整合的springBoot版本为2.5.14。Nacos核心提供两个功能&#xff1a;服务注册与发现&#xff0c;动态配置管理。 一、服…

【django项目开发】用户登录后缓存权限到redis中(十)

这里写目录标题一、权限的数据的特点二、首先settings.py文件中配置redis连接redis数据库一、权限的数据的特点 需要去数据库中频繁的读和写&#xff0c;为了项目提高运行效率&#xff0c;可以把用户的权限在每次登录的时候都缓存到redis中。这样的话&#xff0c;权限判断的中…

基于投票策略的室内家具检测:VoteNet、BRNet 最全总结

文章目录一、基本概述二、VoteNet三、BRNet四、最新研究成果一、基本概述 最近几年&#xff0c;基于点云的3D目标检测是自动驾驶场景研究的热点。但是&#xff0c;自动驾驶主要聚焦于室外场景。本文&#xff0c;我们主要介绍两篇文章&#xff08;VoteNet&#xff0c;BRNet&…

HTML第一章总结

<h1~h6>标题标签 <br />换行标签&#xff08;单&#xff09; <p>分段标签 <strong>加粗标签 <em>倾斜标签 <del>删除线标签 <ins>下划线标签 <div>独占一行的布局标签 <span>进行分割的布局标签 <img>图片标签&a…

Go-micro[windows]安装以及踩坑

一.首先安装protochttps://github.com/protocolbuffers/protobuf/releases进入网址&#xff0c;点击tag&#xff0c;然后选择v3版本进入之后找到随后下载安装然后将protoc解压缩到任意目录&#xff08;自己选&#xff09;再将protoc/bin的路径放置环境变量中二.获取protoc-gen-…

RK3568 UBOOT的问题解决案例

一、UBOOT下的波特率 原始的波特率为1500000,串口工具虽然可以设置任意波特率&#xff0c;但工作时不正常。 输入不了。本文描述如何修改成115200。 二、确认UBOOT的配置 ./build.sh uboot processing option: uboot Start building uboot TARGET_UBOOT_CONFIGrk3568## make …

【C++】二叉树之力扣经典题目1——详解二叉树的递归遍历,二叉树的层次遍历

如有错误&#xff0c;欢迎指正。 如有不理解的地方&#xff0c;可以私信问我。 文章目录题目1&#xff1a;根据二叉树创建字符串题目实例思路与解析代码实现题目2&#xff1a;二叉树的层序遍历题目思路与解析代码实现题目1&#xff1a;根据二叉树创建字符串 点击进入题目链接—…

C++——多态|虚函数|重写|虚表

文章目录1. 多态的概念1.1 概念2. 多态的定义及实现2.1多态的构成条件2.2 虚函数2.3虚函数的重写虚函数重写的三个例外&#xff1a;2.4 普通调用和多态调用&#xff1a;2.5 C11 override 和 final2.6 重载、虚函数的覆盖(重写)、隐藏(重定义)的对比3. 抽象类(有关纯虚函数)3.1 …

互联网新时代要到来了(一)什么是Web3.0?

什么是Web3.0? tips&#xff1a;内容来自百度百科、知乎、搜狐新闻、李留白公众号、CSDN「Meta.Qing」博客等网页 什么是Web3.0?1.什么是Web3.0&#xff08;概念介绍&#xff09;&#xff1f;2.Web3.0简单理解3.Web3.0的技术特点4.Web3.0项目1.什么是Web3.0&#xff08;概念…

greenDao的使用文档

介绍&#xff1a;greenDAO 是一款轻量级的 Android ORM 框架&#xff0c;将 Java 对象映射到 SQLite 数据库中&#xff0c;我们操作数据库的时候&#xff0c;不在需要编写复杂的 SQL语句&#xff0c; 在性能方面&#xff0c;greenDAO 针对 Android 进行了高度优化&#xff0c; …

Ubuntu 20中安装snaphu

Ubuntu 20中安装snaphu0 前言1 snaphu安装步骤1.1 在控制台用命令行安装1.2 在官网下载安装包0 前言 snaphu是一个解缠软件。基于欧空局的SNAP snaphu的官网&#xff1a;https://web.stanford.edu/group/radar/softwareandlinks/sw/snaphu/ 1 snaphu安装步骤 大致有两种 在…

微软支持的ChatGPT激增,但不要低估苹果和谷歌

微软和 OpenAI 可能在 AI 聊天机器人爆炸式增长的市场中具有先发优势&#xff0c;但不要排除其他一些可以访问大量 AI 训练数据的科技巨头&#xff0c;例如 Apple 和 Google。 通过其对 ChatGPT 开发商 OpenAI 的早期和持续支持&#xff0c;微软在AI 军备竞赛中目前处于领先地…

鲸探玩家狂收往期数藏,2023年数藏二级市场的紧箍咒可能松动了?

图片来源&#xff1a;由无界AI绘画工具生成2月初&#xff0c;数藏发行平台鲸探更新了用户服务协议&#xff0c;更新最受关注的点在于&#xff1a;首次转赠期限从180天调整为90天。此外&#xff0c;有媒体披露&#xff0c;鲸探客服回答用户提问称&#xff0c;非首次转赠也从720天…

ITSS认证分为几个级别,哪个级别最高

​一、什么是ITSS ITSS( 信息技术服务标准&#xff0c;简称ITSS)是国内第一套成体系和综合配套的信息技术服务标准库&#xff0c;全面规范了IT服务产品及其组成要素&#xff0c;用于指导实施标准化和可信赖的IT服务。 ITSS是在工业和信息化部、国家标准化管理委员会的联合指导下…

Python 之 NumPy 统计函数、数据类型和文件操作

文章目录一、统计函数1. 求平均值 mean()2. 中位数 np.median3. 标准差 ndarray.std4. 方差 ndarray.var()5. 最大值 ndarray.max()6. 最小值 ndarray.min()7. 求和 ndarray.sum()8. 加权平均值 numpy.average()二、数据类型1. 数据存储2. 定义结构化数据3. 结构化数据操作三、…