1.源码
底层用的是connect
把两个流的数据先保存到状态中
先判断有没有迟到,迟到就放到侧输出流
再根据范围找数据
然后根据上界删除数据
package com.atguigu.gmall.realtime.test;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
/**
* @author
* @date 2023/7/8
* 该案例演示了Flink的intervalJoin
*/
public class Test03_IntervalJoin {
public static void main(String[] args) throws Exception {
//TODO 1.基本环境准备
//1.1 指定流处理环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//1.2 设置并行度
env.setParallelism(2);
//TODO 2.检查点相关的设置(略)
//TODO 3.从指定的网络端口获取员工数据,指定Watermark以及提取事件时间字段
SingleOutputStreamOperator<Emp> empDS = env
.socketTextStream("hadoop102", 8888)
.map(
listStr -> {
String[] fieldArr = listStr.split(",");
return new Emp(Integer.valueOf(fieldArr[0]), fieldArr[1],
Integer.valueOf(fieldArr[2]), Long.valueOf(fieldArr[3]));
}
).assignTimestampsAndWatermarks(
// WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3))
WatermarkStrategy
.<Emp>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Emp>() {
@Override
public long extractTimestamp(Emp emp, long recordTimestamp) {
return emp.getTs();
}
}
)
);
empDS.print("emp:");
//TODO 4.从指定的网络端口获取部门数据,指定Watermark以及提取事件时间字段
SingleOutputStreamOperator<Dept> deptDS = env
.socketTextStream("hadoop102", 8889)
.map(
lineStr -> {
String[] fieldArr = lineStr.split(",");
return new Dept(Integer.valueOf(fieldArr[0]), fieldArr[1], Long.valueOf(fieldArr[2]));
}
).assignTimestampsAndWatermarks(
WatermarkStrategy
.<Dept>forMonotonousTimestamps()
.withTimestampAssigner(
new SerializableTimestampAssigner<Dept>() {
@Override
public long extractTimestamp(Dept dept, long recordTimestamp) {
return dept.getTs();
}
}
)
);
deptDS.print("dept:");
//TODO 5.使用intervalJoin关联员工和部门
empDS
.keyBy(Emp::getDeptno)
.intervalJoin(deptDS.keyBy(Dept::getDeptno))
.between(Time.milliseconds(-5),Time.milliseconds(5))
.process(
new ProcessJoinFunction<Emp, Dept, Tuple2<Emp,Dept>>() {
@Override
public void processElement(Emp emp, Dept dept, Context ctx, Collector<Tuple2<Emp, Dept>> out) throws Exception {
out.collect(Tuple2.of(emp,dept));
}
}
).print(">>>");
env.execute();
}
}
2.并行度问题
多个上游,一个下游,下游取最小的
一个上游,多个下游,广播
多个上游,多个下游,先广播,再取最小的
注意:水位线会减1ms
有可能出现上游出现了某个水位线,但是不触发,是因为别的并行度没有数据,水位线是long的最小值
尽量在source处设置水位线
只有kafka source是多个并行度,其他source都是一个并行度