原理
触发器监控工作流实例表,当工作流实例表中的状态更新后,针对状态为失败的任务进行企业微信告警。
发送企业微信消息函数
# 必须在pg的主机上线安装requests模块
pip install requests
# 以postgres用户登陆psql客户端到etl数据库
psql etl -U postgres
# 创建插件plpython3u
create extension plpython3u;
# plpython3u为不受信语言,所以只能被超级用户使用
# 在tool模式下建立发送企业微信消息函数tool.sp_send_wechat
CREATE OR REPLACE FUNCTION tool.sp_send_wechat(message json, webhook character varying DEFAULT 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=你自己的key'::character varying)
RETURNS text
LANGUAGE plpython3u
SECURITY DEFINER
AS $function$
import requests
import json
"""
/*
* 作者 : v-yuzhenc
* 功能 : 给企业微信发送一条消息
* message : 需要发送的消息,json格式
* webhook : 企业微信机器人的webhook
* */
"""
import requests
import json
# 企业微信自定义机器人的webhook地址
p_webhook = webhook
# 要发送的消息内容
p_message = json.loads(message)
# 发送POST请求
response = requests.post(p_webhook, data=json.dumps(p_message), headers={"Content-Type": "application/json"})
# 打印响应结果
return response.text
$function$
;
--将函数直接转给tool
ALTER FUNCTION tool.sp_send_wechat(json, varchar) OWNER TO tool;
--公开函数的执行权限
GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO public;
--将函数的执行权限授权给tool用户
GRANT ALL ON FUNCTION tool.sp_send_wechat(json, varchar) TO tool;
\q
企业微信告警触发器
- 由于企业微信markdown格式的消息艾特指定的人只能通过企业微信中的userid(即用户在企业微信中的账号)调用,所以,我们在海豚调度的元数据表t_ds_user中增加wechat_userid字段,人工将海豚的用户对应的企业微信的userid维护上去
# 以dp用户登录etl数据库
psql etl -U dp
# 增加字段
alter table t_ds_user add wechat_userid varchar(100);
comment on column t_ds_user.wechat_userid is '对应的企业微信的userid';
# 维护wechat_userid中的数据
# 这里根据自己的企业实际情况做
update t_ds_user
set wechat_userid = 'YuZhenChao'
where user_name = 'yuzhenchao'
;
# 创建触发器函数dp.tg_ds_udef_alert_wechat
CREATE OR REPLACE FUNCTION dp.tg_ds_udef_alert_wechat()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
/*
* 作者:v-yuzhenc
* 功能:海豚调度工作流失败自动告警
* */
declare
i record;
v_mobile varchar;
v_content text;
v_message varchar;
begin
if new.state in (4,5,6) then
for i in (
select
'<@'||d.wechat_userid||'>\r\n# [DolphinScheduler Job ]\r\n> 实例 id : ['||a.id::varchar||'/'||b.id||'](https://dolphin.tclpv.com/dolphinscheduler/ui/projects/'||g.code||'/workflow/instances/'||a.id||'?code='||a.process_definition_code||')\r\n> 项目名称 : <font color=\"comment\">'||g.name||'('||g.code||')</font>'||'\r\n> 工作流名 : <font color=\"comment\">'||e.name||'('||a.process_definition_code||')</font>'||'\r\n> 任务名称 : <font color=\"comment\">'||b.name||'('||b.task_code||')</font>'||'\r\n> 任务类型 : <font color=\"comment\">'||b.task_type||'</font>\r\n> 开始时间 : <font color=\"comment\">'||to_char(b.start_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 结束时间 : <font color=\"comment\">'||to_char(b.end_time,'yyyy-mm-dd hh24:mi:ss')||'</font>\r\n> 任务状态 : <font color=\"warning\">执行失败</font>'||'\r\n> 所属用户 : <font color=\"comment\">'||d.user_name||'('||c.user_id||')</font>' as wechat_content
,d.phone
from t_ds_process_instance a
inner join t_ds_task_instance b
on (a.id = b.process_instance_id)
inner join t_ds_task_definition c
on (b.task_code = c.code and b.task_definition_version = c."version")
inner join t_ds_user d
on (c.user_id = d.id)
inner join t_ds_process_definition e
on (a.process_definition_code = e.code and a.process_definition_version = e."version")
inner join t_ds_project g
on (e.project_code = g.code)
where c.task_type <> 'SUB_PROCESS'
and a.state = 6
and b.state = 6
and a.id = new.id
) loop
v_mobile := i.phone;
v_content := i.wechat_content;
v_message := $v_message${
"msgtype":"markdown",
"markdown": {
"content":"$v_message$||v_content||$v_message$"
}
}$v_message$;
--告警
perform tool.sp_send_wechat(v_message::json);
end loop;
end if;
return new;
end;
$function$
;
--授权给dp
ALTER FUNCTION dp.tg_ds_udef_alert_wechat() OWNER TO dp;
GRANT ALL ON FUNCTION dp.tg_ds_udef_alert_wechat() TO dp;
# 创建时候触发器
create trigger tg_state_ds_process_instance after update on dp.t_ds_process_instance for each row execute function dp.tg_ds_udef_alert_wechat();
\q
测试
- 新建一个工作流,选择SQL组件
-
保存工作流
-
上线工作流并运行工作流
-
工作流运行失败
-
随即企业微信来了消息提醒