您现在的位置是:首页 > 文章详情

实操教程 | 触发器实现 Apache DolphinScheduler 失败钉钉自动告警

日期:2023-08-28点击:81

file

作者 | sqlboy-yuzhenc

背景介绍

在实际应用中,我们经常需要将特定的任务通知给特定的人,虽然 Apache DolphinScheduler 在安全中心提供了告警组和告警实例,但是配置起来相对复杂,并且还需要在定时调度时指定告警组。通过这篇文章,你将学到一个简单的方法,无需任何配置,只需要在用户表(t_ds_user)表中增加字段钉钉名称(dignding_name),创建用户时指定用户的手机号码和维护对应的钉钉名称,就能轻松实现 Apache DolphinScheduler 任务失败时钉钉告警到指定的人。

安装插件plpython3u

psql etl -U postgres create extension plpython3u 

pip安装requests

cd /opt && wget https://bootstrap.pypa.io/get-pip.py python get-pip.py pip install requests 

创建发送钉钉的存储过程

  • plpython3u为不受信语言,所以只能被超级用户使用
sql create or replace function tool.sp_send( message json ,webhook varchar ,secret varchar ) returns text language plpython3u security definer as $function$ import requests import json import time import hmac import hashlib import base64 import urllib.parse """ /* * 作者 : v-yuzhenc * 功能 : 给钉钉发送一条消息 * message : 需要发送的消息,json格式,详情参考https://open.dingtalk.com/document/robots/custom-robot-access * webhook : 钉钉机器人的webhook * secret : 钉钉机器人的secret * */ """ v_timestamp = str(round(time.time() * 1000)) p_secret = secret secret_enc = p_secret.encode('utf-8') string_to_sign = '{}\n{}'.format(v_timestamp, p_secret) string_to_sign_enc = string_to_sign.encode('utf-8') hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest() v_sign = urllib.parse.quote_plus(base64.b64encode(hmac_code)) # 钉钉自定义机器人的webhook地址 p_webhook = webhook webhook_url = p_webhook+"&timestamp="+v_timestamp+"&sign="+v_sign # 要发送的消息内容 p_message = json.loads(message) # 发送POST请求 response = requests.post(webhook_url, data=json.dumps(p_message), headers={"Content-Type": "application/json"}) # 打印响应结果 return response.text $function$; alter function tool.sp_send(json,varchar,varchar) owner to tool; grant execute on function tool.sp_send(json,varchar,varchar) to public; 

测试发送钉钉的存储过程

select sp_send('{ "msgtype": "actionCard", "actionCard": { "title": "我 20 年前想打造一间苹果咖啡厅,而它正是 Apple Store 的前身", "text": "![screenshot](https://oscimg.oschina.net/oscnet/up-56ac82da1c2d7b4c169542227e6feb202d6.png) \n\n #### 乔布斯 20 年前想打造的苹果咖啡厅 \n\n Apple Store 的设计正从原来满满的科技感走向生活化,而其生活化的走向其实可以追溯到 20 年前苹果一个建立咖啡馆的计划", "btnOrientation": "0", "btns": [ { "title": "内容不错", "actionURL": "https://www.dingtalk.com/" }, { "title": "不感兴趣", "actionURL": "https://www.dingtalk.com/" } ] } }'::json); 

file

参考

自定义机器人安全设置 - 钉钉开放平台

自定义机器人接入 - 钉钉开放平台

t_ds_user增加字段

alter table t_ds_user add column dingding_name varchar(100); --人为将海豚账号对应的钉钉用户名更新上去 

编写触发器

CREATE OR REPLACE FUNCTION dp.tg_ds_udef_alert_ding() RETURNS trigger LANGUAGE plpgsql AS $function$ /* * 作者:v-yuzhenc * 功能:海豚调度工作流失败自动告警 * */ declare i record; v_user varchar; v_mobile varchar; v_content text; v_message varchar; begin if new.state in (4,5,6) then for i in ( select d.user_name ,d.phone ,d.dingding_name ,g.name project_name ,e.name process_name ,string_agg(distinct b.name||' '||to_char(b.end_time,'yyyy-mm-dd hh24:mi:ss'),'\r\n') task_name from t_ds_process_instance a inner join t_ds_task_instance b on (a.id = b.process_instance_id) inner join t_ds_task_definition c on (b.task_code = c.code and b.task_definition_version = c."version") inner join t_ds_user d on (c.user_id = d.id) inner join t_ds_process_definition e on (a.process_definition_code = e.code and a.process_definition_version = e."version") inner join t_ds_project g on (e.project_code = g.code) where c.task_type <> 'SUB_PROCESS' and a.state = 6 and b.state = 6 and a.id = new.id group by d.user_name ,d.phone ,d.dingding_name ,g.name ,e.name ) loop v_mobile := i.phone; v_user := i.dingding_name; v_content := '海豚工作流执行失败,请尽快处理!\r\n项目名称:\r\n'||i.project_name||'\r\n工作流名称:\r\n'||i.process_name||'\r\n任务名称:\r\n'||i.task_name; v_message := $v_message${ "at": { "atMobiles":[ "$v_message$||v_mobile||$v_message$" ], "atUserIds":[ "$v_message$||v_user||$v_message$" ], "isAtAll": false }, "text": { "content":"$v_message$||v_content||$v_message$" }, "msgtype":"text" }$v_message$; --告警 perform tool.sp_send(v_message::json); end loop; end if; return new; end; $function$ ; create trigger tg_state_ds_process_instance after update on t_ds_process_instance for each row execute procedure tg_ds_udef_alert_ding(); 

测试

file

本文转载自CSDN博主sqlboy-yuzhenc文章:https://blog.csdn.net/qq_33445829/article/details/131073349

本文由 白鲸开源科技 提供发布支持!

原文链接:https://my.oschina.net/dailidong/blog/10104767
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章