迟到数据的处理
- 推迟水位线推进:
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
- 设置窗口延迟关闭:
.allowedLateness(Time.seconds(3))
- 使用侧流接收迟到的数据:
.sideOutputLateData(lateData)
public class Flink12_LateDataCorrect {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> ds = env.socketTextStream("hadoop102", 8888)
.map(
line -> {
String[] fields = line.split(",");
return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim()));
}
).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 水位线延迟2秒
.withTimestampAssigner(
(event, ts) -> event.getTs()
)
);
ds.print("input");
OutputTag<WordCountWithTs> lateOutputTag = new OutputTag<>("late", Types.POJO(WordCountWithTs.class));
//new OutputTag<WordCount>("late"){}
SingleOutputStreamOperator<UrlViewCount> urlViewCountDs = ds.map(
event -> new WordCountWithTs(event.getUrl(), 1 , event.getTs())
).keyBy(
WordCountWithTs::getWord
).window(
TumblingEventTimeWindows.of(Time.seconds(10))
).allowedLateness(Time.seconds(5)) // 窗口延迟5秒关闭
.sideOutputLateData(lateOutputTag) // 捕获到侧输出流
.aggregate(
new AggregateFunction<WordCountWithTs, UrlViewCount, UrlViewCount>() {
@Override
public UrlViewCount createAccumulator() {
return new UrlViewCount();
}
@Override
public UrlViewCount add(WordCountWithTs value, UrlViewCount accumulator) {
accumulator.setCount((accumulator.getCount() == null ? 0L : accumulator.getCount()) + value.getCount());
return accumulator;
}
@Override
public UrlViewCount getResult(UrlViewCount accumulator) {
return accumulator;
}
@Override
public UrlViewCount merge(UrlViewCount a, UrlViewCount b) {
return null;
}
}
,
new ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>() {
@Override
public void process(String key, ProcessWindowFunction<UrlViewCount, UrlViewCount, String, TimeWindow>.Context context, Iterable<UrlViewCount> elements, Collector<UrlViewCount> out) throws Exception {
UrlViewCount urlViewCount = elements.iterator().next();
//补充url
urlViewCount.setUrl(key);
//补充窗口信息
urlViewCount.setWindowStart(context.window().getStart());
urlViewCount.setWindowEnd(context.window().getEnd());
// 写出
out.collect(urlViewCount);
}
}
);
urlViewCountDs.print("window") ;
//TODO 将窗口的计算结果写出到Mysql的表中, 有则更新,无则插入
/*
窗口触发计算输出的结果,该部分数据写出到mysql表中执行插入操作,
后续迟到的数据,如果窗口进行了延迟, 窗口还能正常对数据进行计算, 该部分数据写出到mysql执行更新操作。
建表语句:
CREATE TABLE `url_view_count` (
`url` VARCHAR(100) NOT NULL ,
`cnt` BIGINT NOT NULL,
`window_start` BIGINT NOT NULL,
`window_end` BIGINT NOT NULL,
PRIMARY KEY (url, window_start, window_end ) -- 联合主键
) ENGINE=INNODB DEFAULT CHARSET=utf8
*/
SinkFunction<UrlViewCount> jdbcSink = JdbcSink.<UrlViewCount>sink(
"replace into url_view_count(url, cnt ,window_start ,window_end) value (?,?,?,?)",
new JdbcStatementBuilder<UrlViewCount>() {
@Override
public void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {
preparedStatement.setString(1, urlViewCount.getUrl());
preparedStatement.setLong(2, urlViewCount.getCount());
preparedStatement.setLong(3, urlViewCount.getWindowStart());
preparedStatement.setLong(4, urlViewCount.getWindowEnd());
}
},
JdbcExecutionOptions.builder()
.withBatchSize(2)
.withMaxRetries(3)
.withBatchIntervalMs(1000L)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUrl("jdbc:mysql://hadoop102:3306/test")
.withUsername("root")
.withPassword("000000")
.build()
);
urlViewCountDs.addSink(jdbcSink) ;
//捕获侧输出流
SideOutputDataStream<WordCountWithTs> lateData = urlViewCountDs.getSideOutput(lateOutputTag);
lateData.print("late");
//TODO 将侧输出流中的数据,写出到mysql中的表中,需要对mysql中已经存在的数据进行修正
//转换结构 WordCountWithTs => UrlViewCount
//调用flink计算窗口的方式, 基于当前数据的时间计算对应的窗口
SingleOutputStreamOperator<UrlViewCount> mapDs = lateData.map(
wordCountWithTs -> {
Long windowStart = TimeWindow.getWindowStartWithOffset(wordCountWithTs.getTs()/*数据时间*/, 0L/*偏移*/, 10000L/*窗口大小*/);
Long windowEnd = windowStart + 10000L;
return new UrlViewCount(wordCountWithTs.getWord(), 1L, windowStart, windowEnd);
}
);
// 写出到mysql中
SinkFunction<UrlViewCount> lateJdbcSink = JdbcSink.<UrlViewCount>sink(
"insert into url_view_count (url ,cnt , window_start ,window_end) values(?,?,?,?) on duplicate key update cnt = VALUES(cnt) + cnt ",
new JdbcStatementBuilder<UrlViewCount>() {
@Override
public void accept(PreparedStatement preparedStatement, UrlViewCount urlViewCount) throws SQLException {
preparedStatement.setString(1, urlViewCount.getUrl());
preparedStatement.setLong(2, urlViewCount.getCount());
preparedStatement.setLong(3, urlViewCount.getWindowStart());
preparedStatement.setLong(4, urlViewCount.getWindowEnd());
}
},
JdbcExecutionOptions.builder()
.withBatchSize(2)
.withMaxRetries(3)
.withBatchIntervalMs(1000L)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.cj.jdbc.Driver")
.withUrl("jdbc:mysql://hadoop102:3306/test")
.withUsername("root")
.withPassword("000000")
.build()
);
mapDs.addSink(lateJdbcSink) ;
try {
env.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
withIdleness关键字
解决某条流长时间没有数据,不能推进水位线,导致下游窗口的窗口无法正常计算。
public class Flink12_WithIdleness {
public static void main(String[] args) {
//1.创建运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//默认是最大并行度
env.setParallelism(1);
SingleOutputStreamOperator<Event> ds1 = env.socketTextStream("hadoop102", 8888)
.map(
line -> {
String[] words = line.split(" ");
return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));
}
).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(
(event, ts) -> event.getTs()
)
//如果超过10秒钟不发送数据,就不等待该数据源的水位线
.withIdleness(Duration.ofSeconds(10))
);
ds1.print("input1");
SingleOutputStreamOperator<Event> ds2 = env.socketTextStream("hadoop102", 9999)
.map(
line -> {
String[] words = line.split(" ");
return new Event(words[0].trim(), words[1].trim(), Long.valueOf(words[2].trim()));
}
).assignTimestampsAndWatermarks(
WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(
(event, ts) -> event.getTs()
)
//如果超过10秒钟不发送数据,就不等待该数据源的水位线
// .withIdleness(Duration.ofSeconds(10))
);
ds2.print("input2");
ds1.union(ds2)
.map(event->new WordCount(event.getUrl(),1))
.keyBy(WordCount::getWord)
.window(
TumblingEventTimeWindows.of(Time.seconds(10))
).sum("count")
.print("window");
try {
env.execute();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
基于时间的合流
窗口联结Window Join
WindowJoin: 在同一个窗口内的相同key的数据才能join成功。
orderDs.join( detailDs )
.where( OrderEvent::getOrderId ) // 第一条流用于join的key
.equalTo( OrderDetailEvent::getOrderId) // 第二条流用于join的key
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply(
new JoinFunction<OrderEvent, OrderDetailEvent, String>() {
@Override
public String join(OrderEvent first, OrderDetailEvent second) throws Exception {
// 处理join成功的数据
return first + " -- " + second ;
}
}
).print("windowJoin");
时间联结intervalJoin
IntervalJoin : 以一条流中数据的时间为基准, 设定上界和下界, 形成一个时间范围, 另外一条流中相同key的数据如果能落到对应的时间范围内, 即可join成功。
核心代码:
orderDs.keyBy(
OrderEvent::getOrderId
).intervalJoin(
detailDs.keyBy( OrderDetailEvent::getOrderId)
).between(
Time.seconds(-2) , Time.seconds(2)
)
//.upperBoundExclusive() 排除上边界值
//.lowerBoundExclusive() 排除下边界值
.process(
new ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>() {
@Override
public void processElement(OrderEvent left, OrderDetailEvent right, ProcessJoinFunction<OrderEvent, OrderDetailEvent, String>.Context ctx, Collector<String> out) throws Exception {
//处理join成功的数据
out.collect( left + " -- " + right );
}
}
).print("IntervalJoin");