弱结构化日志 Flink SQL 怎么写?SLS SPL 来帮忙

news2025/1/22 21:10:43

作者:潘伟龙(豁朗)

背景

日志服务 SLS 是云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入 SLS 进行存储、分析;阿里云 Flink 是阿里云基于 Apache Flink 构建的大数据分析平台,在实时数据分析、风控检测等场景应用广泛。阿里云 Flink 原生支持阿里云日志服务 SLS 的 Connector,用户可以在阿里云 Flink 平台将 SLS 作为源表或者结果表使用。

阿里云 Flink SLS Connector 对于结构化的日志非常直接,通过配置,SLS 的日志字段可以与 Flink SQL 的 Table 字段列一一映射;然后仍有大量的业务日志并非完全的结构化,例如会将所有日志内容写入一个字段中,需要正则提前、分隔符拆分等手段才可以提取出结构化的字段,基于这个场景,本文介绍一种使用 SLS SPL 配置 SLS Connector 完成数据结构化的方案,覆盖日志清洗与格式规整场景。

弱结构化日志处理的痛点

弱结构化日志现状与结构化处理需求的矛盾

日志数据往往是多种来源,多种格式,往往没有固定的 Schema,所以在数据处理前,需要先对数据进行清洗、格式规整,然后在进行数据分析;这类数据内容格式是不固定的,可能是 JSON 字符串、CSV 格式,甚至是不规则的 Java 堆栈日志。

Flink SQL 是一种兼容 SQL 语法的实时计算模型,可以基于 SQL 对结构化数据进行分析,但同时也要求源数据模式固定:字段名称、类型、数量是固定;这也是 SQL 计算模型的基础。

日志数据的弱结构化特点与 Flink SQL 结构化分析之间有着一道鸿沟,跨越这道鸿沟需要一个中间层来进行数据清洗、规整;这个中间层的方案有多种选择可以使用,下面会对不同的方案做简单对比,并提出一种新的基于 SLS SPL 的方案来轻量化完成解决数据清洗规整的工作。

弱结构化日志数据

下面是一条日志示例,日志格式较为复杂,既有 JSON 字符串,又有字符串与 JSON 混合的场景。其中:

  • Payload 为 JSON 字符串,其中 schedule 字段的内容也是一段 JSON 结构。
  • requestURL 为一段标准的 URL Path 路径。
  • error 字段是前半部分包含 CouldNotExecuteQuery:字符串,后半部分是一段 JSON 结构。
  • tag:path 包含日志文件的路径,其中 service_a 可能是业务名称。
  • caller 中包含文件名与文件行数。
{
  "Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
  "TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
  "TaskType": "ALERT",
  "__source__": "11.199.97.112",
  "__tag__:__hostname__": "iabcde12345.cloud.abc121",
  "__tag__:__path__": "/var/log/service_a.LOG",
  "caller": "executor/pool.go:64",
  "error": "CouldNotExecuteQuery : {\n    \"httpCode\": 404,\n    \"errorCode\": \"LogStoreNotExist\",\n    \"errorMessage\": \"logstore k8s-event does not exist\",\n    \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
  "requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
  "ts": "2024-01-29 22:57:13"
}

结构化数据处理需求

对于这样的日志提取出更有价值的信息需要进行数据清洗,首先需要提取重要的字段,然后对这些字段进行数据分析;本篇关注重要字段的提取,分析仍然可以在 Flink 中进行。

假设提取字段具体需求如下:

  • 提取 error 中的 httpCode、errorCode、errorMessage、requestID。
  • 提取 tag:path 中的 service_a 作为 serviceName。
  • 提取 caller 中的 pool.go 作为 fileName,64 作为 fileNo。
  • 提取 Payload 中的 project;提取 Payload 下面的 schedule 中的 type 为 scheuleType。
  • 重命名 source 为 serviceIP。
  • 其余字段舍弃。

最终需要的字段列表如下,基于这样一个表格模型,我们可以便捷的使用 Flink SQL 进行数据分析。

图片

解决方案

实现这样的数据清洗,有很多种方法,这里列举几种基于 SLS 与 Flink 的方案,不同方案之间没有绝对的优劣,需要根据不同的场景选择不同的方案。

数据加工方案: 在 SLS 控制台创建目标 Logstore,通过创建数据加工任务,完成对数据的清洗。

图片

Flink 方案: 将 error 和 payload 指定为源表字段,通过 SQL 正则函数、JSON 函数对字段进行解析,解析后的字段写入临时表,然后对临时表进行分析。

图片

SPL 方案: 在 Flink SLS Connector 中配置 SPL 语句,对数据进行清洗,Flink 中源表字段定义为清洗后的数据结构。

图片

从上述三种方案的原理不难看出,在需要数据清洗的场景中,在 SLS Connector 中配置 SPL 是一种更轻量化的方案,具有轻量化、易维护、易扩展的特点。

在日志数据弱结构化的场景中,SPL 方案既避免了方案一中创建临时中间 Logstore,也避免了方案二中在 Flink 中创建临时表,在离数据源更近的位置进行数据清洗,在计算平台关注业务逻辑,职责分离更加清晰。

如何在 Flink 中使用 SPL

接下来以一段弱结构化日志为例,来介绍基于 SLS SPL 的能力来使用 Flink。为了便于演示,这里在 Flink 控制台配置 SLS 的源表,然后开启一个连续查询以观察效果。在实际使用过程中,仅需修改 SLS 源表配置,即可完成数据清洗与字段规整。

SLS 准备数据

  • 开通 SLS,在 SLS 创建 Project,Logstore,并创建具有消费 Logstore 的权限的账号 AK/SK。
  • 当前 Logstore 数据使用 SLS SDK 写入模拟数据,格式使用上述日志片段,其中包含 JSON、复杂字符串等弱结构化字段。

图片

预览 SPL 效果

在 Logstore 可以可以开启扫描模式,SLS SPL 管道式语法使用分隔符分割不同的指令,每次输入一个指令可以即时查看结果,然后增加管道数,渐进式、探索式获取最终结果。

图片

对上图中的 SPL 进行简单描述:

* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller 
 | parse-json Payload 
 | project-away Payload 
 | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson 
 | parse-json errorJson 
 | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName 
 | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo 
 | project-rename serviceHost="__tag__:__hostname__" 
 | extend scheduleType = json_extract_scalar(schedule, '$.type') 
 | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project
  • 1 行:project 指令:从原始结果中保留 Payload、error、tag:path、caller 字段,舍弃其他字段,这些字段用于后续解析。
  • 2 行:parse-json 指令:将 Payload 字符串展开为 JSON,第一层字段出现在结果中,包括 lastNotified、serviceUri、jobID 等。
  • 3 行:project-away 指令:去除原始 Payload 字段。
  • 4 行:parse-regexp 指令:按照 error 字段中的内容,解析其中的部分 JSON 内容,置于 errorJson 字段。
  • 5 行:parse-json 指令:展开 errorJson 字段,得到 httpCode、errorCode、errorMessage 等字段。
  • 6 行:parse-regexp 指令:通过正则表达式解析出 tag:path 种的文件名,并命名为 serviceName。
  • 7 行:parse-regexp 指令:通过正则表达式捕获组解析出 caller 种的文件名与行数,并置于 fileName、fileNo 字段。
  • 8 行:project-rename 指令:将 tag:hostname 字段重命名为serviceHost。
  • 9 行:extend 指令:使用 json_extract_scalar 函数,提取 schedule 中的 type 字段,并命名为 scheduleType。
  • 10 行:project 指令:保留需要的字段列表,其中 project 字段来自于 Payload。

创建 SQL 作业

在阿里云 Flink 控制台创建一个空白的 SQL 的流作业草稿,点击下一步,进入作业编写。

图片

在作业草稿中输入如下创建临时表的语句:

CREATE TEMPORARY TABLE sls_input_complex (
  errorCode STRING,
  errorMessage STRING,
  fileName STRING,
  fileNo STRING,
  httpCode STRING,
  requestID STRING,
  scheduleType STRING,
  serviceHost STRING,
  project STRING,
  proctime as PROCTIME()
) WITH (
  'connector' = 'sls',
  'endpoint' ='cn-beijing-intranet.log.aliyuncs.com',
  'accessId' = '${ak}',
  'accessKey' = '${sk}',
  'starttime' = '2024-02-01 10:30:00',
  'project' ='${project}',
  'logstore' ='${logtore}',
  'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project'
  );
  • 其中 a k , {ak}, ak,{sk}, p r o j e c t , {project}, project,{logstore} 需要替换为有消费权限的 AK 账号。
  • query 字段,替换为上述 SPL,注意在阿里云 Flink 控制台需要对单引号使用单引号转义,并且消除换行符。
  • SPL 最终得到的字段列表与 TABLE 中字段对应。

连续查询及效果

在作业中输入分析语句,查看结果数据:

SELECT * FROM sls_input_complex

点击右上角调试按钮,进行调试,可以看到 TABLE 中每一列的值,对应 SPL 处理后的结果。

图片

总结

为了适应弱结构化日志数据的需求,Flink SLS Connector 进行了升级,支持直接通过 Connector配置 SPL 的方式实现 SLS 数据源的清洗下推,特别是需要正则字段提取、JSON 字段提取、CSV 字段提取场景下,相较原数据加工方案和原 Flink SLS Connector 方案更轻量级,让数据清洗的职责更加清晰,在数据源端完成数据清洗工作,也可以减少数据的网络传输流量,使得到达 Flink 的数据已经是规整好的数据,可以更加专注在 Flink 中进行业务数据分析。

同时为了便于 SPL 验证测试,SLS 扫描查询也已支持使用 SPL 进行查询,可以实时看到 SPL 管道式语法执行结果。

参考链接:

[1] 日志服务概述

https://help.aliyun.com/zh/sls/product-overview/what-is-log-service

[2] SPL 概述

https://help.aliyun.com/zh/sls/user-guide/spl-overview

[3] 阿里云 Flink Connector SLShttps://help.aliyun.com/zh/flink/developer-reference/log-service-connector[4] SLS 扫描查询

https://help.aliyun.com/zh/sls/user-guide/scan-based-query-overview

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1474848.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【LeetCode刷题】146. LRU 缓存

请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。 实现 LRUCache 类: LRUCache(int capacity) 以 正整数 作为容量 capacity 初始化 LRU 缓存int get(int key) 如果关键字 key 存在于缓存中,则返回关键字的值,否则返回 -…

MYSQL学习笔记:索引

MYSQL学习笔记:索引 文章目录 MYSQL学习笔记:索引索引的分类索引的创建删除索引优化B树索引B树InnoDB主键和二级索引树聚集索引与非聚集索引哈希索引INNODB的自适应哈希索引索引和慢查询 用索引也是要涉及磁盘I/O的操作的索引也是一种数据结构&#xff0…

【计算机网络】数据链路层|封装成帧|透明传输|差错检测|PPP协议|CSMA/CD协议

目录 一、思维导图 ​ 二、数据链路层功能概述 1.数据链路层概述 2.数据链路层功能概述——封装成帧 3.数据链路层功能概述——透明传输 4.数据链路层功能概述——差错检测 三、数据链路层重要协议 1.数据链路层重要协议:PPP协议 2.数据链路层重要协议&#x…

LeetCode 刷题 [C++] 第141题.环形链表

题目描述 给你一个链表的头节点 head ,判断链表中是否有环。 如果链表中有某个节点,可以通过连续跟踪 next 指针再次到达,则链表中存在环。 为了表示给定链表中的环,评测系统内部使用整数 pos 来表示链表尾连接到链表中的位置&a…

2024数字中国创新大赛·数据要素赛道“能源大数据应用赛”正式上线!参赛指南请查收

近日,由国网福建电力承办的2024数字中国创新大赛能源大数据应用赛正式上线发布。赛事按照数字中国建设、能源革命的战略要求,围绕能源数据要素x、能源数字技术、能源商业模式等热点设置赛题,诚邀社会各界为加快建成新型电力系统出谋划策&…

DVWA 靶场之 Command Injection(命令执行)middlehigh

对于 middle 难度的 我们直接先看源码 <?phpif( isset( $_POST[ Submit ] ) ) {// Get input$target $_REQUEST[ ip ];// Set blacklist$substitutions array(&& > ,; > ,);// Remove any of the characters in the array (blacklist).$target str_rep…

VUE3搭载到服务器

1.搭建服务器 使用 Windows 自带的 IIS 作为服务器。 步骤如下&#xff1a;https://blog.csdn.net/qq_62464995/article/details/130140673 同时&#xff0c;上面的步骤中&#xff0c;还使用了 cpolar 将 IIS 本地网址映射到公共网址。 注&#xff1a; cpolar客户端&#xf…

java springmvc/springboot 项目通过HttpServletRequest对象获取请求体body工具类

请求 测试接口 获取到的 获取到打印出的json字符串里有空格这些&#xff0c;在json解析的时候正常解析为json对象了。 工具类代码 import lombok.extern.slf4j.Slf4j; import org.springframework.web.context.request.RequestContextHolder; import org.springframework.we…

(全部习题答案)研究生英语读写教程基础级教师用书PDF|| 研究生英语读写教程提高级教师用书PDF

研究生英语读写教程基础级教师用书PDF 研究生英语读写教程提高级教师用书PDF pdf下载&#xff08;完整版下载&#xff09; &#xff08;1&#xff09;研究生英语读写教程基础级教师用书PDF &#xff08;2&#xff09;研究生英语读写教程基提高级教师用书PDF

yolov9 瑞芯微芯片rknn部署、地平线芯片Horizon部署、TensorRT部署

特别说明&#xff1a;参考官方开源的yolov9代码、瑞芯微官方文档、地平线的官方文档&#xff0c;如有侵权告知删&#xff0c;谢谢。 模型和完整仿真测试代码&#xff0c;放在github上参考链接 模型和代码。 之前写过yolov8检测、分割、关键点模型的部署的多篇博文&#xff0c;y…

网络安全之内容安全

内容安全 攻击可能只是一个点&#xff0c;防御需要全方面进行 IAE引擎 DFI和DPI技术--- 深度检测技术 DPI --- 深度包检测技术--- 主要针对完整的数据包&#xff08;数据包分片&#xff0c;分段需要重组&#xff09;&#xff0c;之后对 数据包的内容进行识别。&#xff08;应用…

分享three.js和cannon.js构建Web 3D场景

使用 three.js&#xff0c;您不再需要花哨的游戏PC或控制台来显示逼真的3D图形。 您甚至不需要下载特殊的应用程序。现在每个人都可以使用智能手机和网络浏览器体验令人惊叹的3D应用程序。 这个惊人的库和充满活力的社区是您在浏览器、笔记本电脑、平板电脑或智能手机上创建游…

HTTP 的 multipart 类型

上一篇文章讲到 http 的 MIME 类型 http MIME 类型 里有一个 multipart 多部分对象集合类型&#xff0c;这个类型 http 指南里有讲到&#xff1a;MIME 中的 multipart&#xff08;多部分&#xff09;电子邮件报文中包含多个报文&#xff0c;它们合在一起作为单一的复杂报文发送…

System Verilog 要点概览

二进制 常量 格式&#xff1a;二进制位宽进制符号&#xff08;b:2;h:16;d:10&#xff09;数据 1b1 1b0 16habcd 4d10变量 logic a;//一位二进制 logic [3:0]b;//4位二进制 logic [31:0][31:0]c;//32*32位二进制组合/位绑定 {a, 1b1}//高位是a&#xff0c;低位是常数二进制…

go test用法(获取单元测试覆盖率)

go test用法&#xff08;获取ut覆盖率&#xff09; 为了提升系统的稳定性&#xff0c;一般公司都会对代码的单元测试覆盖率有一定要求。下面针对golang自带的测试命令go test做讲解。 1 命令 1.1 go test ./… &#xff08;运行当前目录及所有子目录下的测试用例&#xff09; …

读《Shape-Guided: Shape-Guided Dual-Memory Learning for 3D Anomaly Detection》

Chu Y M, Chieh L, Hsieh T I, et al. Shape-Guided Dual-Memory Learning for 3D Anomaly Detection[J]. 2023.&#xff08;为毛paperwithcode上面曾经的榜一引用却只有1&#xff09; 摘要 专家学习 无监督 第一个专家&#xff1a;局部几何&#xff0c;距离建模 第二个专家&…

【React源码 - 调度任务循环EventLoop】

我们知道在React中有4个核心包、2个关键循环。而React正是在这4个核心包中运行&#xff0c;从输入到输出渲染到web端&#xff0c;主要流程可简单分为一下4步&#xff1a;如下图&#xff0c;本文主要是介绍两大循环中的任务调度循环。 4个核心包&#xff1a; react&#xff1a;…

数据结构 队列

一定义 1.1概述&#xff1a; 队列只允许在一端进行插入操作&#xff0c;而在另一端进行删除操作的线性表 特点&#xff1a;队列是先进先出的线性表 允许插入的一端称为队尾&#xff0c;允许删除的一端是队头 这里我们就介绍链式的 1.2 建立队列 这里说一句 其实不管是栈还…

php基础学习之错误处理(其一)

一&#xff0c;错误处理的概念 错误处理指的是系统(或者用户)在执行某些代码的时候&#xff0c;发现有错误&#xff0c;就会通过错误处理的形式告知程序员&#xff0c;俗称报错 二&#xff0c;错误分类 语法错误&#xff1a;书写的代码不符合 PHP 的语法规范&#xff0c;语法错…

【医学影像】LIDC-IDRI数据集的无痛制作

LIDC-IDRI数据集制作 0.下载0.0 链接汇总0.1 步骤 1.合成CT图reference 0.下载 0.0 链接汇总 LIDC-IDRI官方网址&#xff1a;https://www.cancerimagingarchive.net/nbia-search/?CollectionCriteriaLIDC-IDRINBIA Data Retriever 下载链接&#xff1a;https://wiki.canceri…