Flink任务优化分享
1.背景介绍
线上计算任务在某版本上线之后发现每日的任务时长都需要三个多小时才能完成,计算时间超过了预估时间,通过Dolphinscheduler的每日调度任务看,在数据层 dwd 的数据分段任务存在严重的性能问题,每天的计算耗时将近40分钟,并且随着数据量的上涨,时间越来越长,因此这个计算节点需要着重优化。
2.改进思路及实施
现在的大数据计算任务是用 flink 执行的,因此优化的入手点就是从 Flink History Server 上看任务的执行计划,找到耗时较多的节点以及是否有节点因为sql逻辑被重复执行,导致耗时较高。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GwHDHsdd-1690266698352)(/Users/apple/Documents/yangxin/develop/image-20230413173034642.png)]
如图所示,可以发现计算任务走了三个分叉,从sql最后的输出来看,只有两个insert表操作,所以这里至少有一条分叉是不必要的;然后就是找到分叉点的原因,为什么会导致任务分成了三个分支,这个就需要执行计划慢慢去理,界面上可以点开每个节点看到他的执行计算优化之后的结果,然后来判断这一节点对应了sql的哪一步。着重需要判断的就是产生分支的那个节点
Sort(
orderBy=[tenant_id ASC, room_id ASC, msg_start_time ASC]
) ->
Calc(
select=[__etl_time__, date_id, tenant_id, brand_id, channel, channel_app_id, channel_session_type, msg_id, msg_start_time, msg_end_time, msg_from_id, msg_from_orig_id,
msg_from_nk, msg_from_role, msg_to_ids, msg_to_users, msg_type, msg_content, msg_detail, group_chat_info, dialogue_id, room_id, operation_flags, recording_properties,
asr_properties, metric_properties, tags, tag_properties, dialogue_properties, lastMsgEndTime, nextMsgStartTime, is_cut_by_msg_time, is_fit_specific_event, pre_is_fit_specific_event, fit_specific_row,
CAST(FROM_UNIXTIME(w0$o0)) AS start_time,
CAST(FROM_UNIXTIME(w0$o1)) AS end_time,
CAST(w1$o0) AS fit_specific_rows,
GenIsFitConsecutiveRowsAndTime(channel_session_type, tenant_id, CAST(w1$o0), CAST(CAST(FROM_UNIXTIME(w0$o0))), CAST(CAST(FROM_UNIXTIME(w0$o1))), is_fit_specific_event) AS is_fit_specific_flag,
(is_cut_by_msg_time = _UTF-16LE'1':VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $39, //
(GenIsFitConsecutiveRowsAndTime(channel_session_type, tenant_id, CAST(w1$o0), CAST(CAST(FROM_UNIXTIME(w0$o0))), CAST(CAST(FROM_UNIXTIME(w0$o1))), is_fit_specific_event) = 1) AS $40
]
) ->
OverAggregate(
partitionBy=[tenant_id, room_id],
orderBy=[msg_start_time ASC],
window#0=[
LAG(is_fit_specific_flag) AS w0$o0
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
],
select=[__etl_time__, date_id, tenant_id, brand_id, channel, channel_app_id, channel_session_type, msg_id, msg_start_time, msg_end_time, msg_from_id, msg_from_orig_id,
msg_from_nk, msg_from_role, msg_to_ids, msg_to_users, msg_type, msg_content, msg_detail, group_chat_info, dialogue_id, room_id, operation_flags, recording_properties,
asr_properties, metric_properties, tags, tag_properties, dialogue_properties, lastMsgEndTime, nextMsgStartTime, is_cut_by_msg_time, is_fit_specific_event, pre_is_fit_specific_event, fit_specific_row,
start_time, end_time, fit_specific_rows, is_fit_specific_flag,
$39, // 是否按时间切
$40, // 当前条是否为 1
w0$o0 -> pre_is_fit_specific_flag // 前一条是否满足特殊规则
]
) -> (
Calc(
select=[date_id, tenant_id, channel_session_type, msg_id, msg_start_time, room_id, tags,
IF(($39 OR (w0$o0 IS NULL AND $40) OR ((w0$o0 <> is_fit_specific_flag) IS TRUE AND w0$o0 IS NOT NULL)), 1, 0) AS is_cut_flag,
CAST(tenant_id) AS $8,
CAST(msg_start_time) AS $9,
GenCutPointTypeByFeature(channel_session_type, tenant_id, tags) AS $10
]
) ->
OverAggregate(
partitionBy=[tenant_id, room_id],
orderBy=[msg_start_time ASC],
window#0=[
COUNT(is_cut_flag) AS w0$o0,
$SUM0(is_cut_flag) AS w0$o1
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
],
select=[date_id, tenant_id, channel_session_type, msg_id, msg_start_time, room_id, tags, is_cut_flag,
$8, -> tenant_id
$9, -> msg_start_time
$10, -> 特征切分点: START/END/NO
w0$o0, -> count
w0$o1 -> sum
]
) ->
Calc(
select=[
CONCAT_WS(_UTF-16LE'-', $8, room_id, date_id, CAST(CASE((w0$o0 > 0:BIGINT), w0$o1, null:INTEGER))) AS dialogue_id1,
channel_session_type, tenant_id, msg_id, $9 AS $f4, tags, $10 AS cutPointType
]
),
#####################
CREATE TEMPORARY VIEW keep_cutpoint_view AS
SELECT dialogue_id1, smoothRes.smoothResultVoMap
FROM (
SELECT dialogue_id1,
smoothCutPoint(channel_session_type, tenant_id, dialogue_id1, msg_id, msg_start_time, tags, cutPointType) AS smoothRes
FROM gen_cut_type_by_feature_view
GROUP BY dialogue_id1
);
#####################
Calc(
select=[__etl_time__, date_id, tenant_id, brand_id, channel, channel_app_id, channel_session_type, msg_id, msg_start_time, msg_end_time, msg_from_id, msg_from_orig_id,
msg_from_nk, msg_from_role, msg_to_ids, msg_to_users, msg_type, msg_content, msg_detail, group_chat_info, dialogue_id, room_id, operation_flags, recording_properties,
asr_properties, metric_properties, tags, tag_properties, dialogue_properties, lastMsgEndTime, nextMsgStartTime, is_cut_by_msg_time, is_fit_specific_event, pre_is_fit_specific_event,
fit_specific_row, start_time, end_time, fit_specific_rows, is_fit_specific_flag,
w0$o0 AS pre_is_fit_specific_flag,
CASE(w0$o0 IS NULL, is_fit_specific_flag, (w0$o0 <> is_fit_specific_flag), 1, 0) AS is_cut_by_specific,
IF(($39 OR (w0$o0 IS NULL AND $40) OR ((w0$o0 <> is_fit_specific_flag) IS TRUE AND w0$o0 IS NOT NULL)), 1, 0) AS is_cut_flag,
IF((IF(($39 OR (w0$o0 IS NULL AND $40) OR ((w0$o0 <> is_fit_specific_flag) IS TRUE AND w0$o0 IS NOT NULL)), 1, 0) = 1), _UTF-16LE'start', null:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AS $42,
CAST(tenant_id) AS $43,
GenCutPointTypeByFeature(channel_session_type, tenant_id, tags) AS $44
]
) ->
OverAggregate(
partitionBy=[tenant_id, room_id],
orderBy=[msg_start_time ASC],
window#0=[
COUNT(is_cut_flag) AS w0$o0,
$SUM0(is_cut_flag) AS w0$o1
RANG BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
], select=[__etl_time__, date_id, tenant_id, brand_id, channel, channel_app_id, channel_session_type, msg_id, msg_start_time, msg_end_time, msg_from_id, msg_from_orig_id,
msg_from_nk, msg_from_role, msg_to_ids, msg_to_users, msg_type, msg_content, msg_detail, group_chat_info, dialogue_id, room_id, operation_flags, recording_properties,
asr_properties, metric_properties, tags, tag_properties, dialogue_properties, lastMsgEndTime, nextMsgStartTime, is_cut_by_msg_time, is_fit_specific_event, pre_is_fit_specific_event,
fit_specific_row, start_time, end_time, fit_specific_rows, is_fit_specific_flag, pre_is_fit_specific_flag, is_cut_by_specific, is_cut_flag,
$42, -> cut_point_type
$43,
$44,
w0$o0,
w0$o1
]
) ->
Calc(
select=[date_id, tenant_id, brand_id, channel, channel_app_id, channel_session_type, msg_id, msg_start_time, msg_end_time, msg_from_id, msg_from_orig_id, msg_from_nk,
msg_from_role, msg_to_ids, msg_to_users, msg_type, msg_content, msg_detail, group_chat_info, room_id, operation_flags, recording_properties, asr_properties, metric_properties, tags, tag_properties,
dialogue_properties,
CONCAT_WS(_UTF-16LE'-', $43, room_id, date_id, CAST(CASE((w0$o0 > 0:BIGINT), w0$o1, null:INTEGER))) AS dialogue_id1,
$44 AS cutPointType
]
)
)
######################
根据时间+特殊规则先生成一轮 dialogu_id1
特征的切分点命中被提前下推到这个阶段执行
BUG&优化点:
1. 平滑使用了 groupBy 再 join 会主表, 导致计算流走了分支, 导致部分计算逻辑重复执行了, 这一部分可以考虑用 over 聚合来做
gen_fit_sprcific_flag_view -> gen_cut_flag_view -> gen_dialogue_id_by_cut_flag_view -> gen_cut_type_by_feature_view -> keep_cutpoint_view
2. CASE WHEN pre_is_fit_specific_flag IS NULL THEN is_fit_specific_flag
WHEN pre_is_fit_specific_flag <> is_fit_specific_flag THEN 1
WHEN pre_is_fit_specific_flag = is_fit_specific_flag THEN 0
ELSE 0
END AS is_cut_by_specific
逻辑不对, 导致 GenIsFitConsecutiveRowsAndTime 被重复执行
3. IF(is_cut_flag = 1, 'start', CAST(NULL AS STRING)) AS cut_point_type, 需要判断一下是否还有必要
5.
======================
Sort(
orderBy=[dialogue_id1 ASC]
) ->
SortAggregate(
isMerge=[false],
groupBy=[dialogue_id1],
select=[dialogue_id1,
smoothCutPoint(channel_session_type, tenant_id, dialogue_id1, msg_id, $f4, tags, cutPointType) AS smoothRes
]
) ->
Calc(
select=[dialogue_id1, smoothRes.smoothResultVoMap AS smoothResultVoMap]
) -> (
Correlate(
invocation=[GetCutPointBySplit($cor7.smoothResultVoMap)],
correlate=[table(GetCutPointBySplit($cor7.smoothResultVoMap))],
select=[dialogue_id1,smoothResultVoMap,msgId,cutPointMap],
rowType=[
RecordType(
VARCHAR(2147483647) dialogue_id1,
(VARCHAR(2147483647), (VARCHAR(2147483647), VARCHAR(2147483647)) MAP) MAP smoothResultVoMap,
VARCHAR(2147483647) msgId,
(VARCHAR(2147483647), VARCHAR(2147483647)) MAP cutPointMap
)
], joinType=[INNER]
) ->
Calc(
select=[dialogue_id1, msgId, ITEM(cutPointMap, _UTF-16LE'isKeep') AS isKeep]
),
Correlate(
invocation=[GetCutPointBySplit($cor9.smoothResultVoMap)],
correlate=[table(GetCutPointBySplit($cor9.smoothResultVoMap))],
select=[dialogue_id1,smoothResultVoMap,msgId,cutPointMap],
rowType=[
RecordType(
VARCHAR(2147483647) dialogue_id1,
(VARCHAR(2147483647), (VARCHAR(2147483647), VARCHAR(2147483647)) MAP) MAP smoothResultVoMap,
VARCHAR(2147483647) msgId,
(VARCHAR(2147483647), VARCHAR(2147483647)) MAP cutPointMap
)
], joinType=[INNER]
) ->
Calc(
select=[dialogue_id1, msgId, ITEM(cutPointMap, _UTF-16LE'isKeep') AS isKeep]
),
Correlate(
invocation=[GetCutPointBySplit($cor8.smoothResultVoMap)],
correlate=[table(GetCutPointBySplit($cor8.smoothResultVoMap))],
select=[dialogue_id1,smoothResultVoMap,msgId,cutPointMap],
rowType=[
RecordType(
VARCHAR(2147483647) dialogue_id1,
(VARCHAR(2147483647), (VARCHAR(2147483647), VARCHAR(2147483647)) MAP) MAP smoothResultVoMap,
VARCHAR(2147483647) msgId,
(VARCHAR(2147483647), VARCHAR(2147483647)) MAP cutPointMap
)
], joinType=[INNER]
) ->
Calc(
select=[dialogue_id1, msgId, ITEM(cutPointMap, _UTF-16LE'isKeep') AS isKeep]
)
)
######################
计算平滑逻辑
优化点:
1. smoothCutPoint 很大的性能问题, 改成基于 over 聚合的 udaf, 优化掉 GROUPBY + LATERAL TABLE + JOIN
对应sql如下:
--根据配置文件特征数据对数据进行特征切分标记
CREATE TEMPORARY VIEW gen_cut_type_by_feature_view AS
SELECT *,
GenCutPointTypeByFeature(channel_session_type, tenant_id, tags) AS cutPointType
FROM gen_dialogue_id_by_cut_flag_view;
CREATE TEMPORARY VIEW keep_cutpoint_view AS
SELECT dialogue_id1, smoothRes.smoothResultVoMap
FROM (
SELECT dialogue_id1,
smoothCutPoint(channel_session_type, tenant_id, dialogue_id1, msg_id, msg_start_time, tags, cutPointType) AS smoothRes
FROM gen_cut_type_by_feature_view
GROUP BY dialogue_id1
);
CREATE TEMPORARY VIEW keep_cutpoint_breakup AS
SELECT dialogue_id1, smoothResultVoMap, msgId, cutPointMap, cutPointMap['isKeep'] AS isKeep
FROM keep_cutpoint_view, LATERAL TABLE(GetCutPointBySplit(smoothResultVoMap)) AS T(msgId, cutPointMap);
CREATE TEMPORARY VIEW keep_cutpoint_join AS
SELECT t1.*,t2.isKeep, IF(t2.isKeep = '0', 'no', t1.cutPointType) AS curCutPointType, msg_start_time
FROM gen_cut_type_by_feature_view t1
LEFT JOIN keep_cutpoint_breakup t2 ON t1.dialogue_id1 = t2.dialogue_id1 AND t1.msg_id = t2.msgId;
CREATE TEMPORARY VIEW gen_dialogue_id_by_feature_view0 AS
SELECT
date_id,
tenant_id,
brand_id,
channel,
channel_app_id,
channel_session_type,
msg_id,
msg_start_time,
msg_end_time,
msg_from_id,
msg_from_nk,
msg_from_orig_id,
msg_from_role,
msg_to_ids,
msg_to_users,
msg_type,
msg_content,
msg_detail,
group_chat_info,
room_id,
operation_flags,
recording_properties,
asr_properties,
metric_properties,
tags,
tag_properties,
dialogue_properties,
dialogue_id1,
cutPointType,
curCutPointType,
preCutPointType,
isKeep,
CONCAT_WS(
'-',
CAST(tenant_id AS STRING),
room_id,
date_id,
CAST(
SUM(IF(preCutPointType IS NULL OR preCutPointType = 'end' OR curCutPointType = 'start', 1, 0)) OVER (PARTITION BY tenant_id, room_id, date_id ORDER BY msg_start_time)
AS STRING)
) AS dialogue_id
FROM (
SELECT
date_id,
tenant_id,
brand_id,
channel,
channel_app_id,
channel_session_type,
msg_id,
msg_start_time,
msg_end_time,
msg_from_id,
msg_from_nk,
msg_from_orig_id,
msg_from_role,
msg_to_ids,
msg_to_users,
msg_type,
msg_content,
msg_detail,
group_chat_info,
room_id,
operation_flags,
recording_properties,
asr_properties,
metric_properties,
tags,
tag_properties,
dialogue_properties,
dialogue_id1,
cutPointType,
curCutPointType,
isKeep,
LAG(curCutPointType) OVER ( PARTITION BY dialogue_id1 ORDER BY msg_start_time) AS preCutPointType
FROM keep_cutpoint_join
);
之前sql对于这个分段平滑逻辑的实现是,先根据idalogue_id group by数据,使用udaf去得到聚合结果,然后在通过msg_id将聚合结果join回原来的明细数据里,这种做法就会产生分岔,不仅性能差,而且会重复执行计算节点导致耗时上升。这种做法在后边的相关性聚合也是差不多的,这样一分析问题就找到了,就是要把聚合结果join回主表这种做法换一种更高效的方式实现,具体改进思路就是将原来这种方式改成基于 over 聚合的 udaf, 优化掉 GROUPBY + LATERAL TABLE + JOIN
优化之后的sql:
--根据配置文件特征数据对数据进行特征切分标记
CREATE TEMPORARY VIEW gen_cut_type_by_feature_view AS
SELECT *,
GenCutPointTypeByFeature(channel_session_type, tenant_id, tags) AS cutPointType
FROM gen_dialogue_id_by_cut_flag_view;
CREATE TEMPORARY VIEW keep_cutpoint_view AS
SELECT *,
smooth_result[msg_id]['is_keep'] AS isKeep,
CASE WHEN smooth_result[msg_id]['is_keep'] = '0' THEN 'no' ELSE cutPointType END AS curCutPointType
FROM(
SELECT *,
smoothCutPoint(channel_session_type, tenant_id, dialogue_id1, msg_id, msg_start_time, tags, cutPointType) OVER ( PARTITION BY dialogue_id1) AS smooth_result
FROM gen_cut_type_by_feature_view
);
CREATE TEMPORARY VIEW gen_dialogue_id_by_feature_view0 AS
SELECT
date_id,
tenant_id,
brand_id,
channel,
channel_app_id,
channel_session_type,
msg_id,
msg_start_time,
msg_end_time,
msg_from_id,
msg_from_nk,
msg_from_orig_id,
msg_from_role,
msg_to_ids,
msg_to_users,
msg_type,
msg_content,
msg_detail,
group_chat_info,
room_id,
operation_flags,
recording_properties,
asr_properties,
metric_properties,
tags,
tag_properties,
dialogue_properties,
CONCAT_WS(
'-',
CAST(tenant_id AS STRING),
room_id,
date_id,
CAST(
SUM( CASE WHEN dialogue_id1 <> preDialogueId OR preCutPointType IS NULL OR preCutPointType = 'end' OR curCutPointType = 'start' THEN 1 ELSE 0 END) OVER (PARTITION BY tenant_id, room_id, date_id ORDER BY msg_start_time)
AS STRING)
) AS dialogue_id
FROM (
SELECT
date_id,
tenant_id,
brand_id,
channel,
channel_app_id,
channel_session_type,
msg_id,
msg_start_time,
msg_end_time,
msg_from_id,
msg_from_nk,
msg_from_orig_id,
msg_from_role,
msg_to_ids,
msg_to_users,
msg_type,
msg_content,
msg_detail,
group_chat_info,
room_id,
operation_flags,
recording_properties,
asr_properties,
metric_properties,
tags,
tag_properties,
dialogue_properties,
dialogue_id1,
cutPointType,
curCutPointType,
LAG(dialogue_id1) OVER ( PARTITION BY tenant_id, room_id, date_id ORDER BY msg_start_time) AS preDialogueId,
LAG(curCutPointType) OVER ( PARTITION BY tenant_id, room_id, date_id ORDER BY msg_start_time) AS preCutPointType
FROM keep_cutpoint_view
);
相关性的优化也是一样的思路,改成基于 over 聚合的 udaf,减少聚合结果join回原表的这种操作
相关性sql对比:
CREATE TEMPORARY VIEW dialogue_relevant_view AS
SELECT
`tenant_id`,
`brand_id`,
`channel`,
`channel_app_id`,
`channel_session_type`,
`date_id`,
dialogue_id as dialogue_id,
res.relevant_config_version as relevant_config_version ,
res.relevant_config as relevant_config ,
res.metrics as metrics ,
res.dialogue_relevant as dialogue_relevant
FROM (select dialogue_relevant_udaf(channel_session_type, tenant_id, msg_id, msg_start_time, msg_end_time, msg_from_role,tags) as res,
`tenant_id`,
`brand_id`,
`channel`,
`channel_app_id`,
`channel_session_type`,
`date_id`,`dialogue_id`
from gen_dialogue_id_by_feature_view
group by `tenant_id`,
`brand_id`,
`channel`,
`channel_app_id`,
`channel_session_type`,
`date_id`,`dialogue_id`);
CREATE TEMPORARY VIEW dialogue_view_all AS
select
NOW() as `__etl_time__`,
a.date_id,
a.tenant_id,
a.brand_id,
a.channel,
a.channel_app_id,
a.channel_session_type,
a.msg_id,
a.msg_start_time,
a.msg_end_time,
a.msg_from_id,
a.msg_from_nk,
a.msg_from_orig_id,
a.msg_from_role,
a.msg_to_ids,
a.msg_to_users,
a.msg_type,
a.msg_content,
a.msg_detail,
a.group_chat_info,
a.room_id,
a.operation_flags,
a.recording_properties,
a.asr_properties,
a.metric_properties,
a.tags,
a.tag_properties,
map_put(map_put(a.dialogue_properties , 'dialogue_relevant' , b.dialogue_relevant),'relevant_config',b.relevant_config) as dialogue_properties,
a.dialogue_id1,
a.cutPointType,
a.curCutPointType,
a.preCutPointType,
a.isKeep,
a.dialogue_id
from gen_dialogue_id_by_feature_view a
left join dialogue_relevant_view b
on a.tenant_id = b.tenant_id and
a.brand_id = b.brand_id and
a.channel = b.channel and
a.channel_app_id = b.channel_app_id and
a.channel_session_type = b.channel_session_type and
a.dialogue_id = b.dialogue_id;
#####################################
CREATE TEMPORARY VIEW dialogue_view AS
select
date_id,
tenant_id,
brand_id,
channel,
channel_app_id,
channel_session_type,
msg_id,
msg_start_time,
msg_end_time,
msg_from_id,
msg_from_nk,
msg_from_orig_id,
msg_from_role,
msg_to_ids,
msg_to_users,
msg_type,
msg_content,
msg_detail,
group_chat_info,
room_id,
operation_flags,
recording_properties,
asr_properties,
metric_properties,
tags,
tag_properties,
dialogue_properties,
dialogue_id,
dialogue_relevant_udaf(channel_session_type, tenant_id, msg_id, msg_start_time, msg_end_time, msg_from_role,tags) OVER (PARTITION BY `tenant_id`,
`brand_id`,
`channel`,
`channel_app_id`,
`channel_session_type`,
`date_id`,`dialogue_id`) AS res
from gen_dialogue_id_by_feature_view ;
3.优化结果
优化之后的执行计划清爽很多,执行速度也有了明显提升,从原来的将近40分钟的计算时长,减少到7分钟,提升巨大