大家都知道flink sql 中 left join 数据不会互相等待,存在 retract 问题,会导致写入 kafka 的数据量变大,就会导致出现数据重复的问题。
举例:即常见的曝光日志流(show_log)通过 log_id 关联点击日志流(click_log),将数据的关联结果进行下发。
执行sql
INSERT INTO sink_table
SELECT
show_log.log_id as log_id,
show_log.timestamp as timestamp,
show_log.show_params as show_params,
click_log.click_params as click_params
FROM show_log LEFT JOIN click_log
ON show_log.log_id = click_log.log_id
;
可能会出现一个情况
log_id | timestamp | show_params | click_params |
1 | 2021-11-01 00:01:00 | show_params | null |
1 | 2021-11-01 00:01:00 | show_params | click_params |
2 | 2021-11-01 00:01:00 | show_params2 | click_params2 |
3 | 2021-11-01 00:02:00 | show_params3 | null |
这样的情况明细是错误的,那原因便是 左表的数据先到了,但是右表的关联数据有了延迟,导致先输出null,但等右表的数据到达的时候,再关联右表的数据,从而出现数据重复的情况。那要如何解决这个问题?
1、使用interval join
INSERT INTO sink_tableSELECT
show_log.log_id as log_id,
show_log.timestamp as timestamp,
show_log.show_params as show_params,
click_log.click_params as click_params
FROM show_log
LEFT JOIN click_log
ON show_log.log_id = click_log.log_id
AND show_log.row_time
BETWEEN click_log.row_time - INTERVAL '10' MINUTE
AND click_log.row_time + INTERVAL '10' MINUTE;
这里设置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL '10' MINUTE AND click_log.row_time + INTERVAL '10' MINUTE
代表 show_log 表中的数据会和 click_log 表中的 row_time 在前后 10 分钟之内的数据进行关联。这样就允许存在延迟数据发生。
interval说明:
join 时,左流和右流会在 interval 时间之内相互等待,如果等到了则输出数据[+(show_log,click_log)],如果等不到,并且另一条流的时间已经推进到当前这条数据在也不可能 join 到另一条流的数据时,则直接输出[+(show_log,null)],[+(null,click_log)]。
举个例子,show_log.row_time BETWEEN click_log.row_time - INTERVAL '10' MINUTE AND click_log.row_time + INTERVAL '10' MINUTE
, 当 click_log 的时间推进到 2021-11-01 11:00:00
时,这时 show_log 来一条 2021-11-01 02:00:00
的数据, 那这条 show_log 必然不可能和 click_log 中的数据 join 到了,因为 click_log 中 2021-11-01 01:50:00
到 2021-11-01 02:10:00
之间的数据以及过期删除了。则 show_log 直接输出 [+(show_log,null)]
2、下游进行数据去重
正对于数据重复的问题,在flink中有着相应的解决方案,可阅读
Flink中常用的去重方案_大大大大肉包的博客-CSDN博客