Flink-多流转换(合流,分流,双流join)

news2024/11/16 23:33:33

8 多流转换

8.1 分流

  1. 简单实现

对流三次filter算子操作实现分流

        // 筛选 Mary 的浏览行为放入 MaryStream 流中
        DataStream<Event> MaryStream = stream.filter(new FilterFunction<Event>()
        {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.user.equals("Mary");
            }
        });
        // 筛选 Bob 的购买行为放入 BobStream 流中
        DataStream<Event> BobStream = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event value) throws Exception {
                return value.user.equals("Bob");
            }
        });
        // 筛选其他人的浏览行为放入 elseStream 流中
        DataStream<Event> elseStream = stream.filter(new FilterFunction<Event>()
        {
            @Override
            public boolean filter(Event value) throws Exception {
                return !value.user.equals("Mary") && !value.user.equals("Bob") ;
            }
        });
        MaryStream.print("Mary pv");
        BobStream.print("Bob pv");
        elseStream.print("else pv");
  1. 使用测输出流
  • 代码
public class SplitStreamTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        //定义测输出流标签
        //测输出流类型可以跟主流不同,因此换个类型
        OutputTag<Tuple3<String, String, Long>> maryTag = new OutputTag<Tuple3<String, String, Long>>("Mary") {};
        OutputTag<Tuple3<String, String, Long>> bobTag = new OutputTag<Tuple3<String, String, Long>>("Bob") {};

        SingleOutputStreamOperator<Event> processStream = stream.process(new ProcessFunction<Event, Event>() {//主流类型还是Event吧
            @Override
            public void processElement(Event value, Context ctx, Collector<Event> out) throws Exception {
                if (value.user.equals("Mary")) {
                    //把数据写道测输出流,第一个参数标签,第二个是
                    ctx.output(maryTag, Tuple3.of(value.user, value.url, value.timestamp));
                } else if (value.user.equals("Bob")) {
                    //把数据写道测输出流,第一个参数标签,第二个是
                    ctx.output(bobTag, Tuple3.of(value.user, value.url, value.timestamp));
                } else {//其他放主流
                    out.collect(value);
                }
            }

        });

        processStream.print("else");
        processStream.getSideOutput(maryTag).print("Mary");
        processStream.getSideOutput(bobTag).print("Bob");

        env.execute();
    }
}
  • 结果
else> Event{user='Alice', url='./home', timestamp=2022-11-25 21:56:01.958}
else> Event{user='Alice', url='./cart', timestamp=2022-11-25 21:56:02.971}
Mary> (Mary,./home,1669384564001)
else> Event{user='Alice', url='./cart', timestamp=2022-11-25 21:56:05.004}
Bob> (Bob,./prod?id=100,1669384566019)
Bob> (Bob,./cart,1669384567024)
else> Event{user='Alice', url='./cart', timestamp=2022-11-25 21:56:08.027}

8.2 合流

8.2.1 概述

  1. 含义

在这里插入图片描述

要求:数据类型要相同

特点:可以合并多条流

  1. 使用
stream1.union(stream2,stream3,...)

8.2.2 联合(Union)

  1. 代码
public class UnionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream1 = env.socketTextStream("hadoop2",7777)
                .map(data->{
                    String[] field = data.split(",");
                    return new Event(field[0].trim(),field[1].trim(),Long.valueOf(field[2].trim()));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );
        stream1.print("stream1");

        SingleOutputStreamOperator<Event> stream2 = env.socketTextStream("hadoop2",8888)
                .map(data->{
                    String[] field = data.split(",");
                    return new Event(field[0].trim(),field[1].trim(),Long.valueOf(field[2].trim()));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );
        stream2.print("stream2");

        //合并两条流
        stream1.union(stream2)
                .process(new ProcessFunction<Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        out.collect("水位线:"+ctx.timerService().currentWatermark());
                    }
                })
                .print();

        env.execute();
    }
}
  1. 结果及分析

在这里插入图片描述

图我习惯往上一层,方便对应着看

端口7777时间戳为2000,等下次事件发生由于延迟两秒,再-1,水位线为-1

但是由于端口8888的一直没数据,因此无论端口7777时间戳到哪里,水位线都是由两条流中较低的而决定,即以端口8888决定,因此水位线一直为一个很大的负数

此时如果在端口8888的窗口中输入数据,那么水位线会根据此流而变化,由于此流延迟五秒,会到6000的时候才会推动上一个时间戳5000的水位线到达-1,并与7777端口直至持平

在这里插入图片描述

当8888端口时间戳进行到7000的时候,水位线没有变成199(6000-50000-1),由于7777端口的时间戳才到-1,因此由低的流决定,显示水位线为-1

在这里插入图片描述

在7777端口输入Mary,./home,6000的推近上一个时间戳的水位线到3999(6000-2000-1),在8888端口输入Mary,./home,7000的时候也推近了7000这个时间戳的水位线,因为只需要2000毫秒就能触发,现在2000毫秒过了,就能触发,因此最后水位线显示1999

8.2.3 连接(connect)

  1. 概述

在这里插入图片描述

两个不同类型的DataStream连接(通过.connect)形成ConnectedStreams,进行算子转换后才得到DataStream

但是之前的map(),flatMap(),process()传入的都是对应的函数类处理单流数据,现在需要处理多流,会在原来的MapFunction前面加上Co,即CoMapFunction,其他的也一样CoProcessFunction,并且CoMapFuntion中方法有map1()方法和map2方法

  1. 分析

在这里插入图片描述

stream2(DataStream)调用connect后得到的是ConnectedStream

在这里插入图片描述

ConnectStream不继承DataStream了,ConnectStream的泛型分别是两个流的类型,其中有process()方法等,传入的CoProcessFunction
在这里插入图片描述

CoProcessFunction继承Function,并且有两个map的方法,分别是map1和map2传入的三个参数分别是,第一个流,第二个流,以及输出(即合流后的类型),

  1. 代码
  • demo
public class ConnectTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Integer> stream1 = env.fromElements(1, 2, 3);
        DataStreamSource<Long> stream2 = env.fromElements(4L,5L,6L,7L);

        stream2.connect(stream1)
                .map(new CoMapFunction<Long, Integer, String>() {
                    @Override
                    public String map1(Long value) throws Exception {
                        return "Long:"+value.toString();
                    }

                    @Override
                    public String map2(Integer value) throws Exception {
                        return "Integer:"+value.toString();
                    }
                })
                .print();

        env.execute();

    }
}

结果

Integer:1
Integer:2
Integer:3
Long:4
Long:5
Long:6
Long:7
  • 实时对账案例

两条流,一条是app用户提交支付的请求,另一条流是第三方支付平台给我们反馈的订单支付的请求

public class BillCheckExample {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        //来自app的支付日志
        SingleOutputStreamOperator<Tuple3<String,String,Long>> appStream = env.fromElements(
                Tuple3.of("order-1", "app", 1000L),
                Tuple3.of("order-2", "app", 2000L),
                Tuple3.of("order-3", "app", 3500L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.
        <Tuple3<String,String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                        return element.f2;
                    }
                })
        );
        //来自第三方平台的支付日志
        SingleOutputStreamOperator<Tuple4<String,String,String,Long>> thirdpartStream = env.fromElements(
                Tuple4.of("order-1", "third-party", "success", 3000L),
                Tuple4.of("order-3", "third-party", "success", 4000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.
                <Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple4<String,String,String,Long>>() {
                    @Override
                    public long extractTimestamp(Tuple4<String,String,String,Long> element, long recordTimestamp) {
                        return element.f3;
                    }
                })
        );

        //检测同一支付单在两条流中是否匹配,等待一段时间后,不匹配就报警
//        //这种也可以
//        appStream.keyBy(data->data.f0)
//                .connect(thirdpartStream.keyBy(data -> data.f0));
//
        appStream.connect(thirdpartStream)
                        .keyBy(data->data.f0,data-> data.f0)
                        .process(new OrderMatchResult())
                        .print();


        env.execute();
    }
    //自定义实现CoFunction
    public static class OrderMatchResult extends CoProcessFunction<Tuple3<String,String,Long>,
                                                Tuple4<String,String,String,Long>,String>{

        //定义状态变量,用来保存已经到达的事件
        private ValueState<Tuple3<String, String, Long>> appEventState;
        private ValueState<Tuple4<String, String, String, Long>> thirdPartyEventState;

        //运行上下文环境中获取状态
        @Override
        public void open(Configuration parameters) throws Exception {
            appEventState = getRuntimeContext().getState(
                    new ValueStateDescriptor<Tuple3<String, String, Long>>("app-event", Types.TUPLE(Types.STRING,Types.STRING,Types.LONG))
            );
        thirdPartyEventState = getRuntimeContext().getState(
                new ValueStateDescriptor<Tuple4<String, String, String, Long>>("thirdparty-event", Types.TUPLE(Types.STRING, Types.STRING, Types.STRING,Types.LONG))
        );
        }

        @Override
        public void processElement1(Tuple3<String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
            //来的是app event,看另一条流中事件是否来过
            if(thirdPartyEventState.value()!=null){
                out.collect("对账成功:"+value+" "+thirdPartyEventState.value());
                //清空状态
                thirdPartyEventState.clear();
            }else{
                //如果每来就等待,并且更新状态
                appEventState.update(value);
                //注册一个5秒后的定时器,开始等待另一条的事件
                ctx.timerService().registerEventTimeTimer(value.f2+5000L);
            }

        }

        @Override
        public void processElement2(Tuple4<String, String, String, Long> value, CoProcessFunction<Tuple3<String, String, Long>, Tuple4<String, String, String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
            //来的是app event,看另一条流中事件是否来过
            if(appEventState.value()!=null){
                out.collect("对账成功:"+appEventState.value()+" "+value);
                //清空状态
                appEventState.clear();
            }else{
                //如果没来就等待,并且更新状态
                thirdPartyEventState.update(value);
                //注册一个5秒后的定时器,开始等待另一条的事件
                ctx.timerService().registerEventTimeTimer(value.f3);
            }
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            //定时器触发,判断状态,如果某个状态不为空,说明另一条中事件没来
            //并且不会存在两个都不为空,因为其中一个不为空后会被清除
            //没有没清空表示失败
            if(appEventState.value()!=null){
                out.collect("对账失败:"+appEventState.value()+" "+"第三方支付平台信息未到");
            }
            if(thirdPartyEventState.value()!=null){
                out.collect("对账失败:"+thirdPartyEventState.value()+" "+"APP信息信息未到");
            }
            //清空所有数据
            appEventState.clear();
            thirdPartyEventState.clear();
        }
    }
    
}
  • 结果
对账成功:(order-1,app,1000) (order-1,third-party,success,3000)
对账成功:(order-3,app,3500) (order-3,third-party,success,4000)
对账失败:(order-2,app,2000) 第三方支付平台信息未到

8.2.4 广播连接流(broadcast)

  1. 概述

在这里插入图片描述

DataStream调用connect()方法后可以传入BroadcastStream广播流

在这里插入图片描述

传入广播流后返回的是BroadcastConnectedStream广播连接流,用于动态实时变化定义配置的场景

在这里插入图片描述

BroadcastStream广播流通过保存成广播状态广播给下游

DataStream中有broadcast()方法,需要传入MapStateDescriotor映射状态描述器,保存成映射状态然后广播至下游,最后返回了BroadcastStream

  1. 运用

在这里插入图片描述

new MapStateDescriotor形成MapStateDescriotor,传入broadcast(),返回BroadcastStream
在这里插入图片描述

再将得到的BroadcastStream对象放入DataStream调用connect()方法中,最终得到BroadcastConnectedStream广播连接流

在这里插入图片描述

在这里插入图片描述

而后BroadcastConnectedStream也可以调用process()方法,跟之前一样可以传入KeyedBroadcastProcessFunction,里面也是两个,一个是processElement(数据流使用的)以及processBroadcastElement(广播流用的)最后返回SingleOutputStreamOperator

8.3 双流join

8.3.1 概述

  • 两条流类型不同
  • 特殊的connect

8.3.2 窗口联结(Window Join)

  1. 分析
    在这里插入图片描述

在这里插入图片描述

DataStream直接调用join方法,并得到JoinedStream

在这里插入图片描述

在这里插入图片描述

得到JoinedStream后,就可以调用where方法,where()中传入第一条流的KeySelelctor,返回Where类型

在这里插入图片描述

在这里插入图片描述

Where是JoinedStream的内部类,内部类中equalTo()传入第二条流KeySelelctor,并且返回EqualTo内部类

在这里插入图片描述

在这里插入图片描述

EqualTo内部类的方法window(),传入WindowAssigner跟之前的window函数一样了,可以传入TumblingEventTimeWindows滚动窗口以及其他的滑动以及会话窗口,最终返回的是WithWindow静态类

在这里插入图片描述

在这里插入图片描述

WithWindow中的方法就是之前窗口API能做的事情,例如apply(),然后apply()中键可以再传入FlatJoinFunction以及JoinFunction 函数
在这里插入图片描述

在这里插入图片描述

JoinFunction 参数类比CoMapFunction,方法为join联合两条流并输出OUT,FlatJoinFunction也差不多

  • 总结
    在这里插入图片描述
  1. 使用
stream1.join(stream2)
       .where(<KeySelector>)
       .equalTo(<KeySeletor>)
       .window(<WindowAssigner>)
       .apply(<JoinFunction>)

跟sql的join where xx=xx很像,上面的结果默认是inner join

橙流join绿流

在这里插入图片描述

  1. 代码
  • demo
public class WindowJoinTest {
    public static void main(String[] args) throws Exception {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String,Long>> stream1 = env.fromElements(
                Tuple2.of("a", 1000L),
                Tuple2.of("b", 1000L),
                Tuple2.of("a", 2000L),
                Tuple2.of("b", 2000L)

        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
        .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));

        SingleOutputStreamOperator<Tuple2<String,Integer>> stream2 = env.fromElements(
                Tuple2.of("a", 3000),
                Tuple2.of("b", 4000),
                Tuple2.of("a", 4500),
                Tuple2.of("b", 5500)

        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {
                        return element.f1;
                    }
                }));


        stream1.join(stream2)
                .where(data->data.f0)
                .equalTo(data->data.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Integer>, String>() {
                    @Override
                    public String join(Tuple2<String, Long> first, Tuple2<String, Integer> second) throws Exception {
                        return first+" -> "+second;
                    }
                }).print();
        env.execute();
    }
}

结果

(a,1000) -> (a,3000)
(a,1000) -> (a,4500)
(a,2000) -> (a,3000)
(a,2000) -> (a,4500)
(b,1000) -> (b,4000)
(b,2000) -> (b,4000)

8.3.3 间隔联结(Interval Join)

  1. 概述

在这里插入图片描述

区间有lowerBound下届,upperBound上届

a.timestamp+lowerBound<=b.timestamp<=a.timestamp+upperBound

  1. 分析

在这里插入图片描述

在这里插入图片描述

DataStream先keyby后得到KeyedStream,再用KeyedStream的intervalJoin方法,传入的也要是另一个KeyedStream,得到IntervalJoin

在这里插入图片描述

IntervalJoin中可以指定是inEventTime(事件时间)还是inProcessing(处理时间),以及between

在这里插入图片描述

between传入参数,可以指定上届和下届,返回IntervalJoined

在这里插入图片描述

在这里插入图片描述

IntervalJoined方法中有lowerBoundExclusive()刨除下届以及upperBoundExclusive()刨除上届,以及process()方法传入ProcessJoinFunction

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

ProcessJoinFunction是个抽象类,有Context抽象类以及有processElement方法,参数是第一流数据,第二个流数据,上下文,以及输出

  1. 使用
stream1
	.keyBy(<KeySelector>)
	.intervalJoin(stream2.Keyby(<KeySelector>))
	.between(Time.milliseconds(-2),Time.milliseconds(1))
	.process(new ProcessJoinFunction<Integer,Integer,String>(){
	....
	}
  1. 案例
  • 场景

两条流,一条是下订单的流,一条是浏览数据的流,做联结。观察一个用户的下订单事件和这个用户的最近十分钟的浏览数据进行一个联结查询

  • 代码
public class IntervalJoinTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String,Long>> orderStream = env.fromElements(
                Tuple2.of("Mary", 1000L),
                Tuple2.of("Alice", 1000L),
                Tuple2.of("Bob", 2000L),
                Tuple2.of("Alice", 2000L),
                Tuple2.of("Cary", 2000L)

        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                        return element.f1;
                    }
                }));

        SingleOutputStreamOperator<Event> clickStream = env.fromElements(
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3300L),
                new Event("Alice", "./prod?id=200", 3000L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3800L),
                new Event("Bob", "./prod?id=3", 4200L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                }));

        //将两条流做一个连接
        orderStream.keyBy(data->data.f0)
                        .intervalJoin(clickStream.keyBy(data->data.user))
                        .between(Time.seconds(-5),Time.seconds(10))
                        .process(new ProcessJoinFunction<Tuple2<String, Long>, Event, String>(){
                                 @Override
                                 public void processElement(Tuple2<String, Long> left, Event right, ProcessJoinFunction<Tuple2<String, Long>, Event, String>.Context ctx, Collector<String> out) throws Exception {
                                    out.collect(right+" => "+left);//浏览记录导致订单
                                 }
                             })
                        .print();
        env.execute();
    }
}
  • 结果
Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0} => (Alice,1000)
Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0} => (Bob,2000)
Event{user='Bob', url='./prod?id=1', timestamp=1970-01-01 08:00:03.3} => (Bob,2000)
Event{user='Alice', url='./prod?id=100', timestamp=1970-01-01 08:00:03.0} => (Alice,2000)
Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.0} => (Alice,2000)
Event{user='Alice', url='./prod?id=200', timestamp=1970-01-01 08:00:03.0} => (Alice,1000)
Event{user='Bob', url='./home', timestamp=1970-01-01 08:00:03.5} => (Bob,2000)
Event{user='Bob', url='./prod?id=2', timestamp=1970-01-01 08:00:03.8} => (Bob,2000)
Event{user='Bob', url='./prod?id=3', timestamp=1970-01-01 08:00:04.2} => (Bob,2000)

8.3.4 窗口同组连接(Window CoGroup)

  1. 使用
stream1.coGroup(Stream2)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.hours(1)))
    .apply(<CoGroupFunction>)

对比窗口联结

stream1.join(stream2)
       .where(<KeySelector>)
       .equalTo(<KeySeletor>)
       .window(<WindowAssigner>)
       .apply(<JoinFunction>

把join变成了coGroup,以及JoinFunction变成CoGroupFunction

  1. 分析

在这里插入图片描述

DataStream有coGroup方法,需要传入DataStream,返回CoGroupedStreams

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

CoGroupedStreams方法中可以调用where方法得到Where类型,然后调用equalTo()方法得到EqualTo类型,然后调用window()方法指定窗口得到WithWindow类型,可以在调用apply()

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

apply()传入的是CoGroupFunction接口,也是只有一个单一抽象方法coGroup(),coGroup()方法中传入的参数,是Iterable集合类型,表示的是窗口内的一组元素(非一个)

重点:使用coGroup()方法可以实现除了内连接以外的连接,也可以实现左外连接和右外连接

  1. 代码
  • 代码
public class CoGroupTest {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String,Long>> stream1 = env.fromElements(
                Tuple2.of("a", 1000L),
                Tuple2.of("b", 1000L),
                Tuple2.of("a", 2000L),
                Tuple2.of("b", 2000L)

        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                        return element.f1;
                    }
                }));

        SingleOutputStreamOperator<Tuple2<String,Integer>> stream2 = env.fromElements(
                Tuple2.of("a", 3000),
                Tuple2.of("b", 4000),
                Tuple2.of("a", 4500),
                Tuple2.of("b", 5500)

        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String,Integer>>forBoundedOutOfOrderness(Duration.ZERO)
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Integer>>() {
                    @Override
                    public long extractTimestamp(Tuple2<String, Integer> element, long recordTimestamp) {
                        return element.f1;
                    }
                }));


        stream1.coGroup(stream2)
                .where(data->data.f0)
                .equalTo(data->data.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Integer>, String>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<String, Long>> first, Iterable<Tuple2<String, Integer>> second, Collector<String> out) throws Exception {
                        out.collect(first+"=>"+second);
                    }
                }).print();
        env.execute();
    }
}
  • 结果
[(a,1000), (a,2000)]=>[(a,3000), (a,4500)]
[(b,1000), (b,2000)]=>[(b,4000)]
[]=>[(b,5500)]

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/83045.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vue-admin-template侧边栏修改成抽屉式

目录 一、修改sidebar组件 二、修改store 三、修改sidebaritem页面 四、修改navbar页面 五、修改layout 六、修改样式 一、修改sidebar组件 src—layout—components—sidebar—index.vue 把组件sidebar改成drawer <template><div :class"{ has-logo: s…

【AGC】崩溃服务flutter报缺失recordFatalException方法的问题

问题背景&#xff1a; cp反馈集成AGC-崩溃服务的flutter插件&#xff0c;使用最新的1.3.0300版本&#xff0c;出现下面错误 /Users/yin/Documents/projects/flutter/.pub-cache/hosted/pub.dartlang.org/agconnect_crash-1.3.0300/android/src/main/java/com/huawei/agconnec…

【脚本项目源码】Python制作艺术签名生成器,打造专属你的个人艺术签名

前言 本文给大家分享的是如何通过利用Python制作艺术签名生成器&#xff0c;废话不多直接开整~ 开发工具 Python版本&#xff1a; 3.6 相关模块&#xff1a; requests模块 PIL模块 PyQt5模块 环境搭建 安装Python并添加到环境变量&#xff0c;pip安装需要的相关模块即…

一个.NET的轻量级JWT库

这两天网上闲逛的时候&#xff0c;看到一个.NET的轻量级JWT库LitJWT&#xff0c;LitJWT号称主要关注性能&#xff0c;能提升至少5倍的编码/解码速度&#xff0c;以及更少的配置&#xff01; LitJWT支持的平台为netstandard 2.1或net5.0更高。 LitJWT宣传的性能对比图&#xf…

vulnhub靶机:presidential1

目录 进行靶机ip的扫描 nmap扫描开发的端口和服务信息 目录扫描 修改host文件 子域名搜集 phpmyadmin管理端登录 phpmyadmin漏洞利用 反弹shell capabilities提权 获取root权限 靶机总结 靶机下载网址&#xff1a;Presidential: 1 ~ VulnHub Kali ip&#xff1a;19…

数据分析方法-五大理论、分析框架、应用、数据分析作用

1、统计学理论 1.1 大数定量 定义&#xff1a; 指大量重复某一实验时&#xff0c;最后的频率会无限接近于事件的概率 数据的样本量越大&#xff0c;我们预测和计算的概率就越准确 数据的样本量越小&#xff0c;我们预测和计算的概率就越可能失效 举例&#xff1a; 某产品用户…

keepalived 主备使用

keepalived 主备使用 本篇主要介绍一下 keepalived 的基本的 主备使用 1.概述 什么是 keepalived呢,它是一个集群管理中 保证集群高可用的软件,防止单点故障,keepalived是以VRRP协议为实现基础的&#xff0c;VRRP全称Virtual Router Redundancy Protocol&#xff0c;即虚拟路冗…

CH36X系列芯片Linux系统使用教程

一、概述 CH365是一个连接PCI总线的通用接口芯片&#xff0c;CH367/CH368是连接PCI-Express总线的通用接口芯片。支持I/O端口映射、存储器映射、扩展ROM以及中断&#xff0c;提供主动并口、SPI、I2C、GPIO等硬件接口。基于如上芯片可将PCI/PCIe总线转换为简便易用的类似于ISA总…

R语言使用Rasch模型分析学生答题能力

最近我们被客户要求撰写关于IRT的研究报告&#xff0c;包括一些图形和统计输出。几个月以来&#xff0c;我一直对序数回归与项目响应理论&#xff08;IRT&#xff09;之间的关系感兴趣。 在这篇文章中&#xff0c;我重点介绍Rasch分析。 最近&#xff0c;我花了点时间尝试理解…

今天面了个阿里拿 38K 出来的,让我见识到了基础的天花板

前言 人人都有大厂梦&#xff0c;对于程序员来说&#xff0c;BAT 为首的一线互联网公司肯定是自己的心仪对象&#xff0c;毕竟能到这些大厂工作&#xff0c;不仅薪资高待遇好&#xff0c;而且能力技术都能够得到提升&#xff0c;最关键的是还能够给自己镀上一层金&#xff0c;…

【数集项目之 MCDF】(二) 从输入端 slave_FIFO

由于slave_FIFO调用了子模块同步FIFO SCFIFO.v&#xff0c;因此首先简单介绍同步FIFO的设计。 第一节 同步FIFOSCFIFO设计 同步FIFO实体是一组存储单元&#xff0c;因此需要先用数组方式来实现 reg [DATA_WIDTH - 1 : 0] fifo_buffer[DATA_DEPTH - 1 : 0]; 其中在参数中进行…

【随机分形搜索算法】一种新的全局数值优化的适应度-距离平衡随机分形搜索算法FDB-SFS附matlab代码

✅作者简介&#xff1a;热爱科研的Matlab仿真开发者&#xff0c;修心和技术同步精进&#xff0c;matlab项目合作可私信。 &#x1f34e;个人主页&#xff1a;Matlab科研工作室 &#x1f34a;个人信条&#xff1a;格物致知。 更多Matlab仿真内容点击&#x1f447; 智能优化算法 …

模型精度再被提升,统一跨任务小样本学习算法 UPT 给出解法!

近日&#xff0c;阿里云机器学习平台PAI与华东师范大学高明教授团队、达摩院机器智能技术NLP团队合作在自然语言处理顶级会议EMNLP2022上发表统一多NLP任务的预训练增强小样本学习算法UPT&#xff08;Unified Prompt Tuning&#xff09;。这是一种面向多种NLP任务的小样本学习算…

【HMS Core】运动健康服务上传平台的健康数据,能否获取到上传设备的SN码或者唯一设备码信息

问题描述 上传平台的健康数据&#xff0c;能否获取到上传设备的SN码或者唯一设备码信息 解决方案 DeviceInfo中包含华为设备唯一标识&#xff0c;您可以通过DeviceInfo进行查看。 DeviceInfo Android API&#xff1a;https://developer.huawei.com/consumer/cn/doc/developm…

价值年薪50W的软件测试进阶学习路线,终于让我从阿里P8手里抠出来了

作为一个男人我感觉必须得做点什么来证明一下自己&#xff0c;现在我又回来了&#xff0c;准备把自己的节操准备补一下。另外给各位未来的自动化测试工程师和测试开发工程师说一句&#xff0c;别的我不清楚&#xff0c;学习编程请从一而终 咱们学习编程就挺难的&#xff0c;有…

Matplotlib学习笔记(第二章 2.14 图像教程)

2.1.4 图像教程 使用Matplotlib绘制图像的简短教程。 启动命令 首先&#xff0c;让我们开始IPython。它是对标准Python提示符的最优秀的增强&#xff0c;它与Matplotlib结合得特别好。直接在shell上启动lPython&#xff0c;或者使用Jupyter笔记本(其中IPython作为运行的内核…

为什么 Android 要采用 Binder 作为 IPC 机制?

前言 Android 系统为了安全、稳定性、内存管理等原因&#xff0c;Android 应用和系统服务都是运行在独立的进程中的&#xff0c;但系统服务与应用进程之间&#xff0c;应用进程A与应用进程B之间需要通信和数据共享的。因此&#xff0c;Android 系统需要提供一套能够高效、安全…

js 踩了正则表达式的大坑!lastIndex属性

参考文章 踩坑情景 今天一来&#xff0c;被测试测出了一个问题&#xff0c;在使用vantUI的移动端项目中&#xff0c;我虽然对用户输入的值做了去除首尾空格的操作&#xff0c;但却忘记对用户输入的空值进行错误提示&#xff0c;fine&#xff0c;我选择给表单添加rules规则&am…

[附源码]JAVA毕业设计远程教学系统录屏(系统+LW)

[附源码]JAVA毕业设计远程教学系统录屏&#xff08;系统LW&#xff09; 项目运行 环境项配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术…

40个高质量SSM毕设项目分享【源码+论文】

文章目录前言 题目1 : 基于SSM的房屋出租出售系统 <br /> 题目2 : 基于SSM的房屋租赁系统 <br /> 题目3 : 基于SSM的个人健康信息管理系统 <br /> 题目4 : 基于SSM的共享充电宝管理系统 <br /> 题目5 : 基于SSM的即动运动网站 <br />前言 &…