RisingWave最佳实践-利用Dynamic filters 和 Temporal filters 实现监控告警

news2025/1/10 23:58:44

心得的体会

刚过了年刚开工,闲暇之余调研了分布式SQL流处理数据库–RisingWave,本人是Flink(包括FlinkSQL和Flink DataStream API)的资深用户,但接触到RisingWave令我眼前一亮,并且拿我们生产上的监控告警场景在RisingWave上做了验证,以下是自己的心得体会:

RisingWave架构简单,运维成本底,基于云原生(可以分别基于计算和存储动态伸缩),同时在开发上屏蔽了Flink等实时处理框架底层需要处理的一些技术细节(状态存储,数据一致性,分布式集群扩展等)。提供了与PostgreSQL兼容的标准SQL接口,用户可以像使用 PostgreSQL 一样处理数据流。并且RisingWave不单单可以处理流式数据,还提供了数据其他流式处理框架(如:Flink、storm)所不具备的数据存储能力,基本可以完全取代FlinkSQL。相对于其他OLAP系统(如:apache doris,starrocks),RisingWave采用同步实时,可以保证实时的新鲜度;强一致性,而不是最终一致性。用户需要做的仅仅是通过开发SQL就可以处理流数据,当然首先需要具备流式数据处理思维(相对于离线)。

RisingWave当然也有自身的不足,相对于Flink可以通过DataStream API自定义灵活的处理流式数据,RisingWave只能解决一些特定的流式场景,无法做太多定制开发;相对于Apache Doris等OLAP实时分析性数据库,RisingWave不适合做分析型随机查询。另外RisingWave是个新事物,正在发展阶段,周边生态和相关文档还不健全,作为尝鲜者可能会踩很多坑。然而令人欣慰的是RisingWave的社区回复还是很及时的,RisingWave官方投入了很多精力在做RisingWave的布道和答疑。

至于争论比较厉害的RisingWav VS FLink的性能和吞吐量上孰优孰劣,针对不同应用场景可能有不同表现,因此没有亲自调研就没有发言权。但我认为在不同的场景下他们应该有各自的优势。无论如何RisingWave部署简单,上手容易,试错成本低是一个不争的事实。RisingWave可以应用在一些数据看版,监控,实时指标等场景。

利用动态和时间过滤器实现监控告警

FlinkSQL解决不了定时触发的问题,FlinkSQL的流处理逻辑只是按event触发,不能按时间条件触发,也就是没有触发器机制。FlinkSQL窗口的定时触发,归根结底也是基于event触发,event驱动的机制。因此需要触发器的场景就需要用到Flink DataStream API的KeyedProcessFunction等算子。但RisingWave利用Dynamic filters 和 Temporal filters 可以间接实现类似场景的触发器机制。

场景描述

现有如下群消息实时指标监控场景:
数据有初始化(init)、查询(query)、回调(callback:succ+fail)三种先后顺序状态。
数据是按预设时间批次分组的,例如:2024-01-01 08:00:00、2024-01-01 08:30:00,实时统计每一个批次内三种不同状态的数据count。

监控指标一:在某一个批次延迟指定的时间(query_timeout)之内(例如:2024-01-01 08:00:00延迟1小时触发时间为系统时间2024-01-01 09:00:00),该批次的query状态数据count没有达到init状态的数量count阀值(即query_count<init_count*query_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count等

监控指标二:如果指标一告警没有被触发,该批次在满足query状态数据count达到init状态数量count的阀值(即query_count>=init_countquery_threshold)以后,在指定的延迟时间内(callback_timeout),该批次的callback状态数据count没有达到query状态的数量count阀值(即callback_count<query_countcallback_threshold)就触发告警。
同时结束该批次数据统计,下发该批次数据的指标包括:批次时间、init_count、query_count、callback_count等

群消息实时指标监控流程图如下:
群消息实时指标监控流程图

实例demo

RisingWave部署可以参考:RisingWave分布式SQL流处理数据库调研

假设:
query_threshold=1, callback_count=1
query_timeout= ‘5 minute’, callback_timeout= ‘1 minute’
0->init,1->query,2->callback

RisingWave SQL:

 
DROP TABLE t_msg;
CREATE TABLE t_msg(
     msg_id int
    ,status smallint 
    ,public_time timestamp
    ,process_time timestamp as proctime()
) APPEND ONLY;

set timezone = 'PRC';--PRC(People’s Republic of China)
show timezone;

select * from t_msg;

--统计不同状态的count
DROP MATERIALIZED VIEW mv_t_msg_groupby; 
CREATE MATERIALIZED VIEW mv_t_msg_groupby AS
SELECT 
 public_time
,sum(case when status = 0  then 1 else 0 end) AS init_count
,sum(case when status = 1  then 1 else 0 end) AS query_count
,sum(case when status = 2  then 1 else 0 end) AS callback_count
,max(process_time) as process_time
FROM t_msg 
group by public_time;

select * from mv_t_msg_groupby;


--sink_query_alarm 
DROP SINK sink_query_alarm;
CREATE SINK sink_query_alarm AS 
SELECT 
 public_time
,init_count
,query_count
,process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   init_count*1 > query_count --query_threshold=1
WITH (
   connector='kafka',
   properties.bootstrap.server='192.168.1.100:8092',
   topic='t_sink_query_alarm'
)
FORMAT PLAIN ENCODE JSON(
   force_append_only='true'
);


--由于RisingWave不支持在【MATERIALIZED VIEW】和【SINK】等【可伸缩流】中指定处理时间字段,因此需要借助外部存储kafka周转
--RisngWave官方给的解释:support a proctime on an append only stream might be easier but on retractable stream could take extra cost. We must think it carefully to introduce such a feature.

--sink_query_succ 
DROP SINK sink_query_succ;
CREATE SINK sink_query_succ AS 
SELECT 
 public_time
,init_count
,query_count
,callback_count
,process_time as query_succ_process_time
FROM mv_t_msg_groupby
where public_time + INTERVAL '5 minute' >= now()   --query_timeout=1 minute, 在指定的时间内,【Delete and clean expired data】
and   init_count*1 <= query_count --query_threshold=1,query_count达到了指定值
WITH (
   connector='kafka',
   properties.bootstrap.server='192.168.1.100:8092',
   topic='t_sink_query_succ'
)
FORMAT PLAIN ENCODE JSON(
   force_append_only='true'
);

--source连接器
DROP SOURCE source_query_succ;
CREATE SOURCE IF NOT EXISTS source_query_succ (
     init_count int
    ,query_count int 
    ,callback_count int 
    ,public_time timestamp
    ,query_succ_process_time timestamp
)
WITH (
   connector='kafka',
   topic='t_sink_query_succ',
   properties.bootstrap.server='192.168.1.100:8092',
   scan.startup.mode='earliest', -- earliest ,latest,default:earliest 
) FORMAT PLAIN ENCODE JSON;

select * from source_query_succ;

--sink_callback_alarm,用到动态过滤器和时间过滤器
DROP SINK sink_callback_alarm;
CREATE SINK sink_callback_alarm AS 
WITH tmp AS ( 
select public_time, min(query_succ_process_time) as query_succ_process_time  -- 动态过滤器
FROM source_query_succ 
group by public_time
)
SELECT 
 b.public_time
,b.init_count
,b.query_count
,b.callback_count
,b.process_time
,a.query_succ_process_time
FROM  tmp a
JOIN  mv_t_msg_groupby b ON a.public_time=b.public_time
where a.query_succ_process_time + INTERVAL '1 minute' <= now()   --query_timeout=1 minute, public_time相当于当前时间延迟1分钟触发,【Delay table changes】
and   b.query_count*1 > b.callback_count --callback_threshold=1
WITH (
   connector='kafka',
   properties.bootstrap.server='192.168.1.100:8092',
   topic='t_sink_callback_alarm'
)
FORMAT PLAIN ENCODE JSON(
   force_append_only='true'
);

-- 模拟数据
--init
INSERT INTO t_msg values(1,0,'2024-02-23 15:55:00'::TIMESTAMP); --比当前系统时间早
INSERT INTO t_msg values(2,0,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,0,'2024-02-23 15:55:00'::TIMESTAMP);
--query                                
INSERT INTO t_msg values(1,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,1,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,1,'2024-02-23 15:55:00'::TIMESTAMP);
--callback                             
INSERT INTO t_msg values(1,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(2,2,'2024-02-23 15:55:00'::TIMESTAMP);
INSERT INTO t_msg values(3,2,'2024-02-23 15:55:00'::TIMESTAMP);

查看监控结果:

docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_query_alarm -C 
docker run -it --rm edenhill/kcat:1.7.1 kcat -b 192.168.1.100:8092 -t t_sink_callback_alarm -C 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1466973.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

备战蓝桥杯————双指针技巧巧解数组1

利用双指针技巧来解决七道与数组相关的题目。 两数之和 II - 输入有序数组&#xff1a; 给定一个按升序排列的数组&#xff0c;找到两个数使它们的和等于目标值。可以使用双指针技巧&#xff0c;在数组两端设置左右指针&#xff0c;根据两数之和与目标值的大小关系移动指针。 …

动态规划--持续更新篇

将数字变成0的操作次数 1.题目 2.思路 在numberOfSteps函数中&#xff0c;首先设置f[0]为0&#xff0c;因为0已经是0了&#xff0c;不需要任何步骤。然后&#xff0c;使用一个for循环从1迭代到输入的整数num。对于每个整数i&#xff0c;如果i是奇数&#xff0c;则将f[i]设置为…

uniapp离线打包(使用Android studio打包)

一、准备工作 安装HbuilderX&#xff0c;记住版本号下载对应HbuilderX版本的Android离线SDK&#xff0c;如我使用3.6.18版本打包&#xff0c;则对应应下载3.6.18版本的SDK&#xff08;官网不提供旧版本的SDK&#xff0c;有些需要自己找&#xff09;官网下载地址&#xff1a;ht…

目标检测-Transformer-ViT和DETR

文章目录 前言一、ViT应用和结论结构及创新点 二、DETR应用和结论结构及创新点 总结 前言 随着Transformer爆火以来&#xff0c;NLP领域迎来了大模型时代&#xff0c;成为AI目前最先进和火爆的领域&#xff0c;介于Transformer的先进性&#xff0c;基于Transformer架构的CV模型…

QGIS编译(跨平台编译)之七十:【Windows编译错误处理】找不到vector_tile.pb.h、vector_tile.pb.cc

文章目录 一、错误描述二、错误原因分析三、错误处理一、错误描述 ①无法打开源文件“vector_tile.pb.h” ②无法打开包含文件:“vector_tile.pb.h”:No Such file or directory ③无法打开源文件:“vector_tile.pb.cc”:No Such file or directory 二、错误原因分析 qgis\…

MFC 配置Halcon

1.新建一个MFC 工程&#xff0c;Halcon 为64位&#xff0c;所以先将工程改为x64 > VC 目录设置包含目录和库目录 包含目录 库目录 c/c ->常规 链接器 ->常规 > 链接器输入 在窗口中添加头文件 #include "HalconCpp.h" #include "Halcon.h"…

Oracle迁移到mysql-表结构的坑

1.mysql中id自增字段必须是整数类型 id BIGINT AUTO_INCREMENT not null, 2.VARCHAR2改为VARCHAR 3.NUMBER(16)改为decimal(16,0) 4.date改为datetime 5.mysql范围分区必须int格式&#xff0c;不能list类型 ERROR 1697 (HY000): VALUES value for partition …

应急响应实战笔记03权限维持篇(3)

0x00 前言 攻击者在获取服务器权限后&#xff0c;会通过一些技巧来隐藏自己的踪迹和后门文件&#xff0c;本文介绍Linux下的几种隐藏技术。 0x01 隐藏文件 Linux 下创建一个隐藏文件&#xff1a;touch .test.txt touch 命令可以创建一个文件&#xff0c;文件名前面加一个 点…

使用pygame 编写俄罗斯方块游戏

项目地址&#xff1a;https://gitee.com/wyu_001/mypygame/tree/master/game 可执行程序 这个游戏主要使用pygame库编写俄罗斯方块游戏&#xff0c;demo主要演示了pygame开发游戏的主要设计方法和实现代码 下面是游戏界面截图 游戏主界面&#xff1a; 直接上代码&#xff1a…

RabbitMQ 部署方式选择

部署模式 RabbitMQ支持多种部署模式&#xff0c;可以根据应用的需求和规模选择适合的模式。以下是一些常见的RabbitMQ部署模式&#xff1a; 单节点模式&#xff1a; 最简单的部署方式&#xff0c;所有的RabbitMQ组件&#xff08;消息存储、交换机、队列等&#xff09;都运行在…

Redis可视化工具——RedisInsight

文章目录 1. 下载2. 安装3. RedisInsight 添加 Redis 数据库4. RedisInsight 使用 RedisInsight 是 Redis 官方出品的可视化管理工具&#xff0c;支持 String、Hash、Set、List、JSON 等多种数据类型的管理&#xff0c;同时集成了 RedisCli&#xff0c;可进行终端交互。 1. 下载…

数组与指针相关

二级指针与指针数组 #include <stdio.h> #include <stdlib.h> int main() { // 定义一个指针数组&#xff0c;每个元素都是一个指向int的指针 int *ptr_array[3]; // 为指针数组的每个元素分配内存 ptr_array[0] malloc(2*sizeof(int)); ptr_array[1] m…

转运机器人,AGV底盘小车:打造高效、精准的汽车电子生产线

为了满足日益增长的市场需求&#xff0c;保持行业领先地位&#xff0c;某汽车行业电子产品企业引入富唯智能AMR智能搬运机器人及其智能物流解决方案&#xff0c;采用自动化运输措施优化生产节拍和搬运效率&#xff0c;企业生产效率得到显著提升。 项目背景&#xff1a; 1、工厂…

PyTorch概述(二)---MNIST

NIST Special Database3 具体指的是一个更大的特殊数据库3&#xff1b;该数据库的内容为手写数字黑白图片&#xff1b;该数据库由美国人口普查局的雇员手写 NIST Special Database1 特殊数据库1&#xff1b;该数据库的内容为手写数字黑白图片&#xff1b;该数据库的图片由高…

GitCode配置ssh

下载SSH windows设置里选“应用” 选“可选功能” 添加功能 安装这个 坐等安装&#xff0c;安装好后可以关闭设置。 运行 打开cmd 执行如下指令&#xff0c;启动SSH服务。 net start sshd设置开机自启动 把OpenSSH服务添加到Windows自启动服务中&#xff0c;可避免每…

mysql的日志文件在哪?

阅读本文之前请参阅----MySQL 数据库安装教程详解&#xff08;linux系统和windows系统&#xff09; MySQL的日志文件通常包括错误日志、查询日志、慢查询日志和二进制日志等。这些日志文件的位置取决于MySQL的安装和配置。以下是一些常见的日志文件位置和如何找到它们&#xff…

【kubernetes】二进制部署k8s集群之,多master节点负载均衡以及高可用(下)

↑↑↑↑接上一篇继续部署↑↑↑↑ 之前已经完成了单master节点的部署&#xff0c;现在需要完成多master节点以及实现k8s集群的高可用 一、完成master02节点的初始化操作 二、在master01节点基础上&#xff0c;完成master02节点部署 步骤一&#xff1a;准备好master节点所需…

调用 Python 函数遗漏括号 ( )

调用 Python 函数遗漏括号 1. Example - error2. Example - correctionReferences 1. Example - error name "Forever Strong" print(name.upper()) print(name.lower)FOREVER STRONG <built-in method lower of str object at 0x0000000002310670>---------…

【ArcGIS】利用高程进行坡度分析:区域面/河道坡度

在ArcGIS中利用高程进行坡度分析 坡度ArcGIS实操案例1&#xff1a;流域面上坡度计算案例2&#xff1a;河道坡度计算2.1 案例数据2.2 操作步骤 参考 坡度 坡度是地表单元陡缓的程度&#xff0c;通常把坡面的垂直高度和水平距离的比值称为坡度。 坡度的表示方法有百分比法、度数…

单片机04__基本定时器__毫秒微秒延时

基本定时器__毫秒微秒延时 基本定时器介绍&#xff08;STM32F40x&#xff09; STM32F40X芯片一共包含14个定时器&#xff0c;这14个定时器分为3大类&#xff1a; 通用定时器 10个 TIM9-TIM1和TIM2-TIM5 具有基本定时器功能&#xff0c; 还具有输入捕获&#xff0c;输出比较功…