Flink SQL 实时读取 kafka 数据写入 Clickhouse —— 日志处理(三)

news2024/12/24 11:35:28

文章目录

  • 前言
  • Clickhouse 表设计
    • adlp_log_local 本地表
    • adlp_log 分布式表
  • Flink SQL 说明
    • 创建 Source Table (Kafka) 连接器表
    • 创建 Sink Table (Clickhouse) 连接器
    • 解析 Message 写入 Sink
  • 日志查询演示
  • 总结

前言

在之前的文章中,我们总结了如何在 Django 项目中进行日志配置,以及如何在 k8s 上部署 Filebeat 采集 PVC 中的日志发送至 Kafka:

  • Django 日志控制台输出、文件写入按天拆分文件,自定义 Filter 增加 trace_id 以及过滤——日志处理(一)
  • Filebeat k8s 部署(Deployment)采集 PVC 日志发送至 Kafka——日志处理(二)

本文将总结如何使用 Flink SQL 实时将 kafka 中的日志消息发送至 Clickhouse 表中。

说明
限于文章主题和篇幅,本文不会将如何部署和使用 Flink SQL, 关于这些内容过多而且网上资料也很多,就不再赘述。
本文的核心是说明如何设计 Clickhouse 表结构,以及对应的 Flink SQL 说明。

Clickhouse 表设计


上图中的JSON 内容是kafka 中的日志消息,我们需要读取该消息中的 message 字段(我们的日志信息),然后将该字段中的 time, level, func, trace_id, message 保存至 clickhouse 中。
这里我使用两张表保存日志:

  • adlp_log_local本地表
  • adlp_log分布式表,FlinkSQL 实时写入分布式表

adlp_log_local 本地表

create table if not exists cloud_data.adlp_log_local on cluster perftest_5shards_2replicas
(
    `dt`             DateTime64(3),
    `level`          LowCardinality(String),
    `trace_id`       String,
    `func`           String,
    `message`        String,

    -- 建立索引加速低命中率内容的查询
    INDEX idx_trace_id `trace_id` TYPE tokenbf_v1(4096, 2, 0) GRANULARITY 2,
    INDEX idx_message `message` TYPE tokenbf_v1(30720, 2, 0) GRANULARITY 1
)
ENGINE = ReplicatedMergeTree('/clickhouse/tables/{layer}-{shard}/cloud_data/adlp_log_local', '{replica}')
    PARTITION BY toYYYYMMDD(dt)
    PRIMARY KEY (dt, trace_id)
    ORDER BY (dt, trace_id)
    TTL toDateTime(dt) + toIntervalDay(30);

字段说明

  • dt (DateTime64(3)): 存储日志时间戳,精确到毫秒。
  • level (LowCardinality(String)): 存储日志级别,如 INFOERROR 等,使用 LowCardinality 优化存储和查询。
  • trace_id (String): 存储追踪 ID,通常用于关联一系列相关的日志记录。
  • func (String): 存储函数或方法名称,表示日志产生的位置。
  • message(String): 存储日志消息的具体内容。

索引

  • idx_trace_id: 使用 tokenbf_v1 类型的布隆过滤器索引(tokenbf_v1(4096, 2, 0)),在 trace_id 字段上创建,粒度为 2。布隆过滤器索引适合低命中率的查询,能够快速过滤出大多数不匹配的记录。
  • idx_message: 使用 tokenbf_v1 类型的布隆过滤器索引(tokenbf_v1(30720, 2, 0)),在 message 字段上创建,粒度为 1。同样用于加速低命中率的查询。

存储引擎

  • ReplicatedMergeTree: 使用分布式和复制的存储引擎,路径模板为 /clickhouse/tables/{layer}-{shard}/cloud_data/adlp_log_local,副本名称为 {replica},保证数据的高可用性和一致性。

分区和排序

  • 分区 (PARTITION BY): 按 dt 字段的年月日(toYYYYMMDD(dt))进行分区,有助于管理和查询按天划分的数据。
  • 主键 (PRIMARY KEY): 主键由 dttrace_id 组成,有助于高效查询。
  • 排序 (ORDER BY): 按 dttrace_id 字段排序,优化基于时间和 trace ID 的查询。

数据生命周期 (TTL)

  • TTL (Time To Live): 配置数据的生存时间,数据在 dt 字段的时间加上 30 天后自动过期删除,保持数据表的清洁和高效。

adlp_log 分布式表

create table if not exists cloud_data.adlp_log on cluster perftest_5shards_2replicas
(
    `dt`             DateTime64(3),
    `level`          LowCardinality(String),
    `trace_id`       String,
    `func`           String,
    `message`        String
)
ENGINE = Distributed('perftest_5shards_2replicas', 'cloud_data', 'adlp_log_local', rand());

字段说明
与本地表 adlp_log_local 相同,包含以下字段:

  • dt (DateTime64(3))
  • level (LowCardinality(String))
  • trace_id (String)
  • func (String)
  • message (String)

存储引擎
Distributed: 分布式引擎,允许将数据分布到多个分片和副本中。参数解释如下:

  • 集群名称 (perftest_5shards_2replicas): 指定集群的名称。
  • 数据库 (cloud_data): 数据库名称。
  • 表 (adlp_log_local): 本地表的名称。
  • 分片键 (rand()): 使用随机函数进行数据分片,保证数据均匀分布。

Flink SQL 说明

创建 Source Table (Kafka) 连接器表

CREATE TEMPORARY TABLE source_table (
    message STRING
) WITH (
    'connector' = 'kafka',
    'topic' = 'filebeat_logs',
    'properties.bootstrap.servers' = '127.0.0.1:9092',
    'properties.group.id' = 'prod-logs-k2c',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json',
    'json.ignore-parse-errors' = 'false',
    'json.fail-on-missing-field' = 'false',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.mechanism' = 'PLAIN',
    'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";'
);

创建 Sink Table (Clickhouse) 连接器

CREATE TEMPORARY TABLE sink_table (
    `dt` TIMESTAMP(3),
    `level` STRING ,
    `trace_id` STRING ,
    `func` STRING ,
    `message` STRING
) WITH (
  'connector' = 'clickhouse',
  'url' = 'clickhouse://127.0.0.1:8123',
  'username' = 'admin',
  'password' = 'admin',
  'database-name' = 'cloud_data',
  'table-name' = 'adlp_log',
  'use-local' = 'true',
  'sink.batch-size' = '1000',
  'sink.flush-interval' = '1000',
  'sink.max-retries' = '10',
  'sink.update-strategy' = 'insert',
  'sink.sharding.use-table-definition' = 'true',
  'sink.parallelism' = '1'
);

解析 Message 写入 Sink

INSERT INTO sink_table
SELECT 
    TO_TIMESTAMP(JSON_VALUE(message, '$.time'), 'yyyy-MM-dd HH:mm:ss') AS dt,
    JSON_VALUE(message, '$.level') AS level,
    JSON_VALUE(message, '$.trace_id') AS trace_id,
    JSON_VALUE(message, '$.func') AS func,
    JSON_VALUE(message, '$.message') AS message
FROM source_table;

注意:
这里在写入的时候默认我们的日志格式是 JSON 的,如果我们的日志发送到 kafka 不是 JSON 格式的,上边的 JSON_VALUE 可能会报错。当然,我们也可以在条件中加上是否为 JSON 判断,但是我觉得没必要。

日志查询演示

我们的日志导入成功后,可以通过第三方查询工具查询 clickhouse 数据源,我这里使用的是 superset 去查询 clickhouse 数据源。
通过 trace_id 查询整个执行链路的日志
image.png
查询错误日志信息
image.png

全文检索 message 日志信息
image.png

更多扩展

  • superset 是一个强大的 BI 工具,可以将我们的日志中的一些指标做成看板,比如说关键错误日志数量,然后设置告警,发送通知。
  • 通过 Flink SQL 实时将我们的日志从 kafka 中写入 clickhouse ,结合 clickhouse 强大的查询功能,以及 superset 强大的 BI 功能,可以充分挖掘业务日志中的潜在价值。

总结

本文总结了如何使用使用 Clickhouse 保存日志数据,以及如何通过 Flink SQL 将我们的日志实时从 kafka 同步至 clickhouse,然后在结合强大的第三方查询 BI 工具 superset,玩转业务日志,挖掘业务日志的潜在价值。
本文设计到的技能知识点比较多,需要熟悉 Clickhouse, Kafka, FlinkSQL, Superset 等,我之前的文章中总结了一些关于 Clickhouse 和 Kafka 相关的内容,感兴趣的读者可以看看:

clickhouse

  • Clickhouse字典关联外部 MySQL 表联合查询实践
  • ClickHouse架构概览 —— Clickhouse 架构篇(一)
  • ClickHouse 使用技巧总结
  • ClickHouse 实现用户画像(标签)系统实践

kafka

  • kafka 生产者 API 实践总结
  • kafka 消费者 API 使用总结
  • 保证 Kafka 数据可靠性最佳实践总结
  • kafka 实现精确一次性语义实践总结

superset

  • Pycharm 调试 superset 源码配置(远程调试)
  • superset 二开增加 flink 数据源连接通过flink sql 查询数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1939885.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

甄选范文“论系统安全架构设计及其应用”,软考高级论文,系统架构设计师论文

论文真题 随着社会信息化进程的加快,计算机及网络已经被各行各业广泛应用,信息安全问题也变得愈来愈重要。它具有机密性、完整性、可用性、可控性和不可抵赖性等特征。信息系统的安全保障是以风险和策略为基础,在信息系统的整个生命周期中提供包括技术、管理、人员和工程过…

Noah-MP陆面生态水文模拟与多源遥感数据同化技术

了解陆表过程的主要研究内容以及陆面模型在生态水文研究中的地位和作用;熟悉模型的发展历程,常见模型及各自特点;理解Noah-MP模型的原理,掌握Noah-MP模型在单站和区域的模拟、模拟结果的输出和后续分析及可视化等方法;…

【Spring Boot】网页五子棋项目实现,手把手带你全盘解析(长达两万3千字的干货,坐好了,要发车了......)

目录 网页五子棋项目一、项目核心流程二、 登录模块2.1 前端输入用户信息2.2 后端进行数据库查询用户信息 三、 游戏大厅模块3.1 前端通过Ajax请求用户数据,后端从Session中拿取并从数据库中查询后返回3.2 前后端建立WebSocket连接,并进行判断&#xff0…

xxl-job登录没反应问题解决方法

最近在写一个关于xxl-job的项目,然后遇到了如下的问题,可以正常访问到xxl-job的登录界面但是点击登录按钮发现没有反应,并且没有发送任何请求。 排查步骤(使用docker) 1.重启mysql 2.重启docker 3.重写安装mysql 4.查看…

Mysql-索引结构

一.什么是索引? 索引(index)是帮助MySQL高效获取数据的数据结构(有序)。在数据之外,数据库系统还维护着满足特定查找算法的数据结构,这些数据结构以某种方式引用(指向)数据,这样就可以在这些数据结构上实现高级查找算法,这种数据结构就是索引 二.无索引的情况 找到…

【Linux】Linux的基本使用

一.Linux的背景知识. 1.1什么是Linux Linux是一种开源的类Unix操作系统内核. 和Windows是" 并列 "的关系. 1.2Linux的发行版本. Linux 严格意义来说只是一个 “操作系统内核”.一个完整的操作系统 操作系统内核 配套的应用程序. 由于 Linux 是一个完全开源免费…

基于JSP的高校二手交易平台

开头语:你好,我是专注于计算机技术的学姐码农小野,如果有任何技术需求,欢迎随时联系我。 开发语言:Java 数据库:MySQL 技术:JSP技术 JAVA MySQL 工具:常见Web浏览器&#xff0…

【开发踩坑】 MySQL不支持特殊字符(表情)插入问题

背景 线上功能报错: Cause:java.sql.SQLException:Incorrect string value:xFO\x9F\x9FxBO for column commentat row 1 uncategorized SQLException; SQL state [HY000]:error code [1366]排查 初步觉得是编码问题(utf8 — utf8mb4) 参考上…

Linux环境下dockes使用MongoDB,上传zip文件如何解压并备份恢复到MongoDB数据库中

1、准备 Docker 和 MongoDB 容器 建议主机端口改一下 docker run --name mongodb -d -p 27018:27017 mongo 2. 创建一个工作目录并将 zip 文件上传到dockers容器中 docker cp data.zip mongodb:/data.zip 3. 在 MongoDB 容器中解压 zip 文件(也可以解压完再复制…

大语言模型LLM-三种模型架构

架构:由Transformer论文衍生出来的大语言模型,主要有三种模型架构预训练目标:FLM,PLM,MLM调整:微调: Transformer transfomer可以并行地计算? transformer中encoder模块是完全并行…

深入理解Linux网络(四):TCP接收阻塞

TCP socket 接收函数 recv 发出 recvfrom 系统调用。 进⼊系统调⽤后,⽤户进程就进⼊到了内核态,通过执⾏⼀系列的内核协议层函数,然后到 socket 对象的接收队列中查看是否有数据,没有的话就把⾃⼰添加到 socket 对应的等待队列⾥…

MYSQL——库表操作

MYSQL——库表操作 1.1 SQL语句基础1.1.1. SQL简介1.1.2. SQL语句分类1.1.3. SQL语句的书写规范 1.2 数据库的操作1.2.1 数据库的登录及退出1.2.2 查看数据库1.2.3 创建数据库1.2.4 切换数据库1.2.5 查看当前用户1.2.6 删除数据库 1.3 MySQL字符集1.3.1. 字符集1.3.2. 字符序1.…

myBatis的基本操作(持续更新中。。。)

目录 1. 简介2. 简单使用3. 代理开发4. 小技巧5. 动态查询6. 注解&#xff08;待更新&#xff09;底部 1. 简介 mybatis是一款优秀的持久层框架&#xff0c;用来简化JDBC开发 持久层&#xff1a;负责将数据保存到数据库的那一层代码 2. 简单使用 依赖 <dependencies>…

LabVIEW断路器操动机构运动速度检测

开发了一种基于LabVIEW设计平台开发的断路器操动机构运动速度检测系统。通过集成高速相机和图像处理技术&#xff0c;该系统能够实时监控和分析操动机构的动态性能&#xff0c;为电力系统提供关键的技术支持。 项目背景 随着工业化的发展&#xff0c;对电力系统的稳定性和可靠…

python的tkinter、socket库开发tcp的客户端和服务端

一、tcp通讯流程和开发步骤 1、tcp客户端和服务端通讯流程图 套接字是通讯的利器&#xff0c;连接时要经过三次握手建立连接&#xff0c;断开连接要经过四次挥手断开连接。 2、客户端开发流程 1&#xff09;创建客户端套接字 2&#xff09;和服务端器端套接字建立连接 3&#x…

钡铼分布式I/O系统边缘计算Modbus,MQTT,OPC UA耦合器BL206

BL206系列耦合器是一个数据采集和控制系统&#xff0c;基于强大的32 位微处理器设计&#xff0c;采用Linux操作系统&#xff0c;支持Modbus&#xff0c;MQTT&#xff0c;OPC UA协议&#xff0c;可以快速接入现场PLC、DCS、PAS、MES、Ignition和SCADA以及ERP系统&#xff0c;同时…

习题2.21

(defn rever [a](defn item[l r](if ( nil (first l)) r(item (rest l) (cons (first l) r))))(item a nil)) 这段代码非常有助于理解什么是深度优先&#xff0c;什么是广度优先。 很久没有写习题的代码了&#xff0c;倒不是懒得做习题了&#xff0c;是私事多&#xff0c;状态…

【局域网服务器连接】如何远程连入实验室linux系统服务器?| 局域网 | 内网穿透

文章目录 前言服务器基本配置安装 ssh 服务防火墙放行 局域网内网穿透获取SN码添加映射 总结 前言 简单记录连接实验室服务器步骤。如服务器直接有公网 ip 地址&#xff0c;ssh 直接连入即可&#xff0c;无需参考本文。 与服务器连同一 wifi&#xff0c; 参考 局域网 方式连接…

Android:requestLayout、invalidate 和 postInvalidate 的区别

提醒&#xff1a;下面源码来自SDK里Android-34版本 一、requestLayout 点击查看requestLayout官网文档 1.1 requestLayout方法源码 /*** Call this when something has changed which has invalidated the* layout of this view. This will schedule a layout pass of the v…

【C++航海王:追寻罗杰的编程之路】关于空间配置器你知道多少?

目录 1 -> 什么是空间配置器 2 -> 为什么需要空间配置器 3 -> SGI-STL空间配置器的实现原理 3.1 -> 一级空间配置器 3.2 -> 二级空间配置器 3.2.1 -> 内存池 3.2.2 -> SGI-STL中二级空间配置器设计 3.2.3 -> SGI-STL二级空间配置器之空间申请 …