使用 SPL 高效实现 Flink SLS Connector 下推

news2024/10/5 18:32:21

作者:潘伟龙(豁朗)

背景

日志服务 SLS 是云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入 SLS 进行存储、分析;阿里云 Flink 是阿里云基于 Apache Flink 构建的大数据分析平台,在实时数据分析、风控检测等场景应用广泛。阿里云 Flink 原生支持阿里云日志服务 SLS 的 Connector,可以在阿里云 Flink 平台将 SLS 作为源表或者结果表使用。

在阿里云 Flink 配置 SLS 作为源表时,默认会消费 SLS 的 Logstore 数据进行动态表的构建,在消费的过程中,可以指定起始时间点,消费的数据也是指定时间点以后的全量数据;在特定场景中,往往只需要对某类特征的日志或者日志的某些字段进行分析处理,此类需求可以通过 Flink SQL 的 WHERE 和 SELECT 完成,这样做有两个问题:

1)Connector 从源头拉取了过多不必要的数据行或者数据列造成了网络的开销;

2)这些不必要的数据需要在 Flink 中进行过滤投影计算,这些清洗工作并不是数据分析的关注的重点,造成了计算的浪费。

对于这种场景,有没有更好的办法呢?

答案是肯定的,SLS 推出了 SPL 语言, 可以高效的对日志数据的清洗,加工。 这种能力也集成在了日志消费场景,包括阿里云 Flink 中 SLS Connector,通过配置 SLS SPL 即可实现对数据的清洗规则,在减少网络传输的数据量的同时,也可以减少 Flink 端计算消耗。

接下来对 SPL 及 SPL 在阿里云 Flink SLS Connector 中应用进行介绍及举例。

SLS SPL 介绍

图片

SLS SPL 是日志服务推出的一款针对弱结构化的高性能日志处理语言,可以同时在 Logtail 端、查询扫描、流式消费场景使用,具有交互式、探索式、使用简洁等特点。

SPL 基本语法如下:

<data-source> 
| <spl-cmd> -option=<option> -option ... <expression>, ... as <output>, ...
| <spl-cmd> ...
| <spl-cmd> ...

< spl-cmd > 是 SPL 指令,支持行过滤、列扩展、列裁剪、正则取值、字段投影、数值计算、JSON、CSV 等半结构化数据处理,具体参考 SPL 指令 [ 1] 介绍,具体来说包括:

结构化数据 SQL 计算指令:

支持行过滤、列扩展、数值计算、SQL 函数调用

  • extend 通过 SQL 表达式计算结果产生新字段
  • where 根据 SQL 表达式计算结果过滤数据条目
*
| extend latency=cast(latency as BIGINT)
| where status='200' AND latency>100

字段操作指令:

支持字段投影、字段重名、列裁剪

  • project 保留与给定模式相匹配的字段、重命名指定字段
  • project-away 保留与给定模式相匹配的字段、重命名指定字段
  • project-rename 重命名指定字段,并原样保留其他所有字段
*
| project-away -wildcard "__tag__:*"
| project-rename __source__=remote_addr

非结构化数据提取指令:

支持 JSON、正则、CSV 等非结构化字段值处理

  • parse-regexp 提取指定字段中的正则表达式分组匹配信息
  • parse-json 提取指定字段中的第一层 JSON 信息
  • parse-csv 提取指定字段中的 CSV 格式信息
*
| parse-csv -delim='^_^' content as time, body
| parse-regexp body, '(\S+)\s+(\w+)' as msg, user

SPL 在 Flink SLS Connector 中的原理介绍

阿里云 Flink 支持 SLS Connector,通过 SLS Connector 实时拉取 SLS 中 Logstore 的数据,分析后的数据也可以实时写入 SLS,作为一个高性能计算引擎,Flink SQL 也在越来越广泛的应用在 Flink 计算中,借助 SQL 语法可以对结构化的数据进行分析。

在 SLS Connector 中,可以配置日志字段为 Flink SQL 中的 Table 字段,然后基于 SQL 进行数据分析;在未支持 SPL 配置之前,SLS Connector 会实时消费全量的日志数据到 Flink 计算平台,当前消费方式有如下特点:

  • 在 Flink 中计算的往往不需要所有的日志行,比如在安全场景中,可能仅需要符合某种特征的数据,需要进行日志进行过滤,事实上不需要的日志行也会被拉取,造成网络带宽的浪费。
  • 在 Flink 中计算的一般是特定的字段列,比如在 Logstore 中有 30 个字段,真正需要在 Flink 计算的可能仅有 10 个字段,全字段的拉取造成了网络带宽的浪费。

在以上场景中,可能会增加并不需要的网络流量和计算开销,基于这些特点,SLS 将 SPL 的能力集成到 SLS Connector 的新版本中,可以实现数据在到达 Flink 之前已经进行了行过滤和列裁剪,这些预处理能力内置在 SLS 服务端,可以达到同时节省网络流量与 Flink 计算(过滤、列裁剪)开销的目的。

原理对比

  • 未配置 SPL 语句时:Flink 会拉取 SLS 的全量日志数据(包含所有列、所有行)进行计算,如图 1。
  • 配置 SPL 语句时:SPL 可以对拉取到的数据如果 SPL 语句包含过滤及列裁剪等,Flink 拉取到的是进行过滤和列裁剪后部分数据进行计算,如图 2。

图片

在 Flink 中使用 SLS SPL

接下来以一个 Nginx 日志为例,来介绍基于 SLS SPL 的能力来使用 Flink。为了便于演示,这里在 Flink 控制台配置 SLS 的源表,然后开启一个连续查询以观察效果。在实际使用过程中,可以直接修改 SLS 源表,保留其余分析和写出逻辑。

接下来介绍下阿里云 Flink 中使用 SPL 实现行过滤与列裁剪功能。

在 SLS 准备数据

  • 开通 SLS,在 SLS 创建 Project,Logstore,并创建具有消费 Logstore 的权限的账号 AK/SK。
  • 当前 Logstore 数据使用 SLS 的的 SLB 七层日志模拟接入方式产生模拟数据,其中包含 10 多个字段。

图片

模拟接入会持续产生随机的日志数据,日志内容示例如下:

{
  "__source__": "127.0.0.1",
  "__tag__:__receive_time__": "1706531737",
  "__time__": "1706531727",
  "__topic__": "slb_layer7",
  "body_bytes_sent": "3577",
  "client_ip": "114.137.195.189",
  "host": "www.pi.mock.com",
  "http_host": "www.cwj.mock.com",
  "http_user_agent": "Mozilla/5.0 (Windows NT 6.2; rv:22.0) Gecko/20130405 Firefox/23.0",
  "request_length": "1662",
  "request_method": "GET",
  "request_time": "31",
  "request_uri": "/request/path-0/file-3",
  "scheme": "https",
  "slbid": "slb-02",
  "status": "200",
  "upstream_addr": "42.63.187.102",
  "upstream_response_time": "32",
  "upstream_status": "200",
  "vip_addr": "223.18.47.239"
}

Logstore 中 slbid 字段有两种值:slb-01 和 slb-02,对 15 分钟的日志数据进行 slbid 统计,可以发现 slb-01 与 slb-02 数量相当。

图片

行过滤场景

在数据处理中过滤数据是一种常见需求,在 Flink 中可以使用 filter 算子或者 SQL 中的 where 条件进行过滤,使用非常方便;但是在 Flink 使用 filter 算子,往往意味着数据已经通过网络进入 Flink 计算引擎中,全量的数据会消耗着网络带宽和 Flink 的计算性能,这种场景下,SLS SPL 为 Flink SLS Connector 提供了一种支持过滤“下推”的能力,通过配置 SLS Connector 的 query 语句中,过滤条件,即可实现过滤条件下推。避免全量数据传输和全量数据过滤计算。

图片

创建 SQL 作业

在阿里云 Flink 控制台创建一个空白的 SQL 的流作业草稿,点击下一步,进入作业编写。

图片

在作业草稿中输入如下创建临时表的语句:

CREATE TEMPORARY TABLE sls_input(
  request_uri STRING,
  scheme STRING,
  slbid STRING,
  status STRING,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` STRING METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-beijing-intranet.log.aliyuncs.com',
  'accessId' = '${ak}',
  'accessKey' = '${sk}',
  'starttime' = '2024-01-21 00:00:00',
  'project' ='${project}',
  'logstore' ='test-nginx-log',
  'query' = '* | where slbid = ''slb-01'''
);
  • 这里为了演示方便,仅设置 request_uri、scheme、slbid、status 和一些元数据字段作为表字段。
  • a k 、 {ak}、 ak{sk}、${project} 替换为具有 Logstore 消费权限的账号。
  • endpoint:填写同地域的 SLS 的私网地址。
  • query:填写 SLS 的 SPL 语句,这里填写了 SPL 的过滤语句:* | where slbid = ‘‘slb-01’’,注意在阿里云 Flink 的 SQL 作业开发中,字符串需要使用英文单引号进行转义。

连续查询及效果

在作业中输入分析语句,按照 slbid 进行聚合查询,动态查询会根据日志的变化,实时刷新数字。

SELECT slbid, count(1) as slb_cnt FROM sls_input GROUP BY slbid

点击右上角调试按钮,进行调试,可以看到结果中 slbid 的字段值,始终是 slb-01。

图片

可以看出设置了 SPL 语句后,sls_input 仅包含 slbid=‘slb-01’ 的数据,其他不符合条件的数据被过滤掉了。

流量对比

使用 SPL 后,可以看出在 SLS 的写流量不变的情况下,Flink 对 SLS 的读流量有大幅度下降;同时在过滤占主要很多 Flink CU 的场景下,经过过滤后,Flink CU 也会有相应的降低。

图片

列裁剪场景

在数据处理中列裁剪也是一种常见需求,在原始数据中,往往会有全量的字段,但是实际的计算只需要特定的字段;类似需要在 Flink 中可以使用 project 算子或者 SQL 中的 select 进行列裁剪与变换,使用 Flink 使用 project 算子,往往意味着数据已经通过网络进入 Flink 计算引擎中,全量的数据会消耗着网络带宽和 Flink 的计算性能,这种场景下,SLS SPL 为 Flink SLS Connector 提供了一种支持投影下推的能力,通过配置 SLS Connector 的 query 参数,即可实现投影字段下推。避免全量数据传输和全量数据过滤计算。

创建 SQL 作业

创建步骤同行过滤场景,在作业草稿中输入如下创建临时表的语句,这里 query 参数配置进行了修改,在过滤的基础上增加了投影语句,可以实现从 SLS 服务端仅拉取特定字段的内容。

CREATE TEMPORARY TABLE sls_input(
  request_uri STRING,
  scheme STRING,
  slbid STRING,
  status STRING,
  `__topic__` STRING METADATA VIRTUAL,
  `__source__` STRING METADATA VIRTUAL,
  `__timestamp__` STRING METADATA VIRTUAL,
   __tag__ MAP<VARCHAR, VARCHAR> METADATA VIRTUAL,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-beijing-intranet.log.aliyuncs.com',
  'accessId' = '${ak}',
  'accessKey' = '${sk}',
  'starttime' = '2024-01-21 00:00:00',
  'project' ='${project}',
  'logstore' ='test-nginx-log',
  'query' = '* | where slbid = ''slb-01'' | project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"'
);

为了效果,下面分行展示语句中配置,在 Flink 语句中任然需要单行配置。

* 
| where slbid = ''slb-01'' 
| project request_uri, scheme, slbid, status, __topic__, __source__, "__tag__:__receive_time__"

上面使用了 SLS SPL 的管道式语法来实现数据过滤后投影的操作,类似 Unix 管道,使用|符号将不同指令进行分割,上一条指令的输出作为下一条指令的输入,最后的指令的输出表示整个管道的输出。

连续查询及效果

图片

在作业中输入分析语句,可以看到,结果与行过滤场景结果类似。

SELECT slbid, count(1) as slb_cnt FROM sls_input_project GROUP BY slbid

🔔 注意: 这里与行过滤不同的是,上面的行过滤场景会返回全量的字段,而当前的语句令 SLS Connector 只返回特定的字段,再次减少了数据的网络传输。

SPL 还可以做什么

  • 上述实例中演示了使用 SLS SPL 的过滤和投影功能来实现 SLS Connector 的“下推”功能,可以有效地减少网络流量和 Flink CU 的使用。可以避免在 Flink 进行计算之前,进行额外的过滤和投影计算消耗。
  • SLS SPL 的功能不止于过滤与投影,SLS SPL 完整支持的语法可以参考文档:SPL 指令 [ 1] 。同时,SPL管道式语法已全面支持在 Flink Connector 中进行配置。
  • SLS SPL 支持对于数据进行预处理,比如正则字段、JSON 字段,CSV 字段展开;数据格式转换,列的增加和减少;过滤等。除了用于消费场景,在 SLS 的 Scan 模式与采集端都会应用场景,以便用户在采集端、消费端都可以使用 SPL 的能力。

相关链接:

[1] SPL 指令

https://help.aliyun.com/zh/sls/user-guide/spl-instruction?spm=a2c4g.11186623.0.0.33f35a3dl8g8KD

[2] 日志服务概述

https://help.aliyun.com/zh/sls/product-overview/what-is-log-service

[3] SPL 概述

https://help.aliyun.com/zh/sls/user-guide/spl-overview

[4] 阿里云 Flink Connector SLS

https://help.aliyun.com/zh/flink/developer-reference/log-service-connector

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1507499.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Axure 单键快捷键 加快绘图速度 提高工作效率

画图类 R&#xff1a;绘制矩形 先点击空白页面&#xff0c;输入R即可绘制 L&#xff1a;绘制直线 先点击空白页面&#xff0c;输入L即可绘制&#xff0c;绘制的时候按住shift直线 O&#xff1a;绘制圆 先点击空白页面&#xff0c;输入O即可绘制&#xff0c;绘制的时候按…

【竞技宝】LOL:TES连下两局轻松击败OMG

【竞技宝】LOL&#xff1a;TES连下两局轻松击败OMG 北京时间2024年3月9日&#xff0c;英雄联盟LPL2024春季常规赛继续进行&#xff0c;昨日共进行三场比赛&#xff0c;第三场比赛由TES对阵OMG。本场比赛&#xff0c;TES的打野选手tian个人表现出色&#xff0c;两局比赛都多次成…

[MYSQL数据库]--表的增删查改和字段类型

前言 作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正 目录 一、表的增…

[力扣 Hot100]Day48 路径总和 III

题目描述 给定一个二叉树的根节点 root &#xff0c;和一个整数 targetSum &#xff0c;求该二叉树里节点值之和等于 targetSum 的 路径 的数目。 路径 不需要从根节点开始&#xff0c;也不需要在叶子节点结束&#xff0c;但是路径方向必须是向下的&#xff08;只能从父节点到…

ChatGPT 消息发不出去了?我找到解决方案了.

ChatGPT消息发不出去了?我找到解决方案了 今天忽然发现 ChatGPT无法发送消息&#xff0c;能查看历史对话&#xff0c;但是无法发送消息。 猜测原因 出现这个问题的各位&#xff0c;应该都是点击登录后顶部弹窗邀请加入多语言 alapha测试]了&#xff0c;并且语言选择了中文&am…

【JS】APIs:事件流、事件委托、其他事件、页面尺寸、日期对象与节点操作

1 事件流 捕获阶段&#xff1a;从父到子 冒泡阶段&#xff1a;从子到父 1.1 事件捕获 <body> <div class"fa"><div class"son"></div> </div> <script>const fadocument.querySelector(.fa);const sondocument.qu…

【MATLAB】语音信号识别与处理:移动中位数滤波算法去噪及谱相减算法呈现频谱

1 基本定义 移动中位数滤波算法是一种基于中位数的滤波方法&#xff0c;它通过对信号进行滑动窗口处理&#xff0c;每次取窗口内的中位数作为当前点的估计值&#xff0c;以去除噪声。该算法的主要思想是利用中位数的鲁棒性&#xff0c;对信号中的噪声进行有效的消除。 具体来说…

RocketMQ入门指南:从零开始学习分布式消息队列技术

RocketMQ 1. MQ介绍1.1 为什么要用MQ1.2 MQ的优点和缺点1.3 各种MQ产品的比较 2. RocketMQ快速入门2.1 准备工作2.1.1 下载RocketMQ2.2.2 环境要求 2.2 安装RocketMQ2.2.1 安装步骤2.2.2 目录介绍 2.3 启动RocketMQ2.4 测试RocketMQ2.4.1 发送消息2.4.2 接收消息 2.5 关闭Rocke…

【AIGC+VisionPro】空间视频生意的创业者

1. 产品概述 -一款基于人工智能的2D到3D视频/图像转换工具,可将普通的2D视频/图像转换为令人惊艳的3D视觉体验。 - 它支持在PC上进行转换,并输出适用于Meta Quest、Apple Vision Pro等XR设备的3D格式。 2. 产品功能 - 利用尖端的3D AI系统,可将任何视频(Youtube、电影、游戏、…

C++第一弹---C++入门(上)

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 【C详解】 C入门 1、C关键字(C98) 2、命名空间 2.1、命名空间定义 2.2、命名空间使用 3、C输入&输出 4、缺省参数 4.1、缺省参数概念 4.2、缺省参…

《Ubuntu20.04环境下的ROS进阶学习0》

一、逛ROS应用商店 在上一专栏http://t.csdnimg.cn/oGlcu&#xff0c;我们了解了ROS的基本功能。这一专栏将会在此基础上做出进一步拓展学习。那么首先我们要学会下载并阅读别人的代码。常用的两个应用商店一个是ROS的官方应用商店ROS index&#xff0c;另一个就是我们熟知的gi…

基于SpringBoot+MYSQL的房屋租赁系统

1、 前言介绍 社会的发展和科学技术的进步&#xff0c;互联网技术越来越受欢迎。网络计算机的生活方式逐渐受到广大人民群众的喜爱&#xff0c;也逐渐进入了每个用户的使用。互联网具有便利性&#xff0c;速度快&#xff0c;效率高&#xff0c;成本低等优点。 因此&#xff0c…

学校Java的第七天

目录 一、什么是数组 二、作用 三、如何使用数组 1、声明数组变量 2、创建数组 示例&#xff1a; 3、数组的使用 示例&#xff1a; 4、数组的遍历 for循环示例&#xff08;不知道for循环的可以查看我之前发的文章&#xff09; for-each循环&#xff08;也就是增强for…

Visual Studio单步调试中监视窗口变灰的问题

在vs调试中&#xff0c;写了这样一条语句 while((nfread(buf, sizeof(float), N, pf))>0) 然而&#xff0c;在调试中&#xff0c;只要一执行while这条语句&#xff0c;监视窗口中的变量全部变为灰色&#xff0c;不能查看&#xff0c;是程序本身并没有报错&#xff0c;能够继…

每日OJ题_链表⑤_力扣25. K 个一组翻转链表

目录 力扣25. K 个一组翻转链表 解析代码 力扣25. K 个一组翻转链表 25. K 个一组翻转链表 难度 困难 给你链表的头节点 head &#xff0c;每 k 个节点一组进行翻转&#xff0c;请你返回修改后的链表。 k 是一个正整数&#xff0c;它的值小于或等于链表的长度。如果节点总…

2024-03-10 c++

&#x1f338; MFC下拉框控件 | Combo Box eg 计算器 1。新建MFC项目&#xff08;基于对话框、静态库&#xff09; 2。添加控件&#xff0c;删除初始的3个多余控件 加3个edit control 加1个combo box&#xff0c;属性sort改为false&#xff0c;data为 ;-;;;% 加1个static text…

invoke()到底是个什么方法???

调用jquery的方法返回属性值 1、invoke&#xff08;‘val’&#xff09; 在form的select下&#xff1a; cy.get(.action-select-multiple).select([apples, oranges, bananas])// when getting multiple values, invoke "val" method first jquery中val方法是用于返…

【c语言 】 函数入门

&#x1f388;个人主页&#xff1a;豌豆射手^ &#x1f389;欢迎 &#x1f44d;点赞✍评论⭐收藏 &#x1f917;收录专栏&#xff1a;C语言 &#x1f91d;希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出指正&#xff0c;让我们共同学习、交流进步&…

笔记 mysql text 不能设置他的默认值如not null

mysql text 不能设置他的默认值 那么插入时默认值是多少&#xff1f; 在 MySQL 中&#xff0c;TEXT 类型的字段不能直接指定默认值 因此&#xff0c;如果你尝试在创建表时为 TEXT类型的字段指定默认值&#xff0c;MySQL 会抛出错误。 然而&#xff0c;虽然不能在表定义中为 TE…

2024年腾讯云优惠券_云服务器代金券_优惠折扣整理

腾讯云代金券领取渠道有哪些&#xff1f;腾讯云官网可以领取、官方媒体账号可以领取代金券、完成任务可以领取代金券&#xff0c;大家也可以在腾讯云百科蹲守代金券&#xff0c;因为腾讯云代金券领取渠道比较分散&#xff0c;腾讯云百科txybk.com专注汇总优惠代金券领取页面&am…