Kafka 业务日志采集最佳实践

news2025/1/16 7:55:08

简介

Apache Kafka 是一个分布式流处理平台,主要用于构建实时数据流管道和应用程序。在收集业务日志的场景中,Kafka 可以作为一个消息中间件,用于接收、存储和转发大量的日志数据。将 Kafka 与其他系统(如 Elasticsearch、Flume、Spark Streaming 等)集成,以提供更丰富的日志处理和分析功能。本文提到的是和观测云集成,即通过观测云的采集器 Datakit 采集 Kafka 中的业务日志,下面通过一些例子了解下观测云的快速集成效果。

实践环境

前置条件

  • 注册观测云

软件和中间件

  • Kafka3.2.0
  • Datakit采集器
  • JDK 8

硬件

  • 云服务器 CentOS7.9 64位 4vCPU,8GB 内存,100GB 云盘一台。

接入方案

准备 Kafka 环境

安装 Kafka

下载 3.2.0 版本,解压即可使用。

wget https://archive.apache.org/dist/kafka/3.2.0/kafka_2.13-3.2.0.tgz

注:目前 Datakit 支持的 Kafka 版本有[version:0.8.2 ~ 3.2.0]

启动 Zookeeper 服务
$ bin/zookeeper-server-start.sh config/zookeeper.properties
启动 KafkaServer
$ bin/kafka-server-start.sh config/server.properties
创建 Topic

创建名为 testlog 的 Topic 。

$ bin/kafka-topics.sh --create --topic testlog --bootstrap-server localhost:9092
启动 Producer
$ bin/kafka-console-producer.sh --topic testlog --bootstrap-server localhost:9092

安装 DataKit

参考官网文档安装 DataKit 采集器

TOKEN 依据你的观测云工作空间来填写
DK_DATAWAY=https://openway.guance.com?token=<TOKEN> bash -c "$(curl -L https://static.guance.com/datakit/install.sh)"

开启 Kafka 采集器

进入 DataKit 安装目录下 (默认是 /usr/local/datakit/conf.d/ ) 的 conf.d/kafkamq 目录,复制 kafkamq.conf.sample 并命名为 kafkamq.conf 。

类似如下:

-rwxr-xr-x 1 root root 2574 Apr 30 23:52 kafkamq.conf
-rwxr-xr-x 1 root root 2579 May  1 00:40 kafkamq.conf.sample

调制 kafka 采集器配置如下:

  • addrs = ["localhost:9092"],该文采集器 DataKit 和 Kafka 安装到同一台操作系统中,localhost 即可。
  • kafka_version = "3.2.0",该文使用 Kafka 的版本。
  • [inputs.kafkamq.custom],删除注释符号“#”。
  • [inputs.kafkamq.custom.log_topic_map],删除注释符号“#”。
  • "testlog"="log.p",testlog 为 Topic 的名字,log.p 为观测云 Pipeline 可编程数据处理器的日志字段提取规则配置。涉及的业务日志和 log.p 的内容详细见下面的《使用 Pipeline》。
# {"version": "1.28.1", "desc": "do NOT edit this line"}

[[inputs.kafkamq]]
  addrs = ["localhost:9092"]
  # your kafka version:0.8.2 ~ 3.2.0
  kafka_version = "3.2.0"
  group_id = "datakit-group"
  # consumer group partition assignment strategy (range, roundrobin, sticky)
  assignor = "roundrobin"

  ## rate limit.
  #limit_sec = 100
  ## sample
  # sampling_rate = 1.0

  ## kafka tls config
  # tls_enable = true
  # tls_security_protocol = "SASL_PLAINTEXT"
  # tls_sasl_mechanism = "PLAIN"
  # tls_sasl_plain_username = "user"
  # tls_sasl_plain_password = "pw"

  ## -1:Offset Newest, -2:Offset Oldest
  offsets=-1

  ## skywalking custom
  #[inputs.kafkamq.skywalking]
    ## Required!send to datakit skywalking input.
    #dk_endpoint="http://localhost:9529"
    #thread = 8 
    #topics = [
    #  "skywalking-metrics",
    #  "skywalking-profilings",
    #  "skywalking-segments",
    #  "skywalking-managements",
    #  "skywalking-meters",
    #  "skywalking-logging",
    #]
    #namespace = ""

  ## Jaeger from kafka. Please make sure your Datakit Jaeger collector is open !!!
  #[inputs.kafkamq.jaeger]
    ## Required! ipv6 is "[::1]:9529"
    #dk_endpoint="http://localhost:9529"
    #thread = 8 
    #source: agent,otel,others...
    #source = "agent"
    ## Required! topics
    #topics=["jaeger-spans","jaeger-my-spans"]

  ## user custom message with PL script.
  [inputs.kafkamq.custom]
    #spilt_json_body = true
    #thread = 8 
    ## spilt_topic_map determines whether to enable log splitting for specific topic based on the values in the spilt_topic_map[topic].
    #[inputs.kafkamq.custom.spilt_topic_map]
    #  "log_topic"=true
    #  "log01"=false
    [inputs.kafkamq.custom.log_topic_map]
      "testlog"="log.p"
    #  "log01"="log_01.p"
    #[inputs.kafkamq.custom.metric_topic_map]
    #  "metric_topic"="metric.p"
    #  "metric01"="rum_apm.p"
    #[inputs.kafkamq.custom.rum_topic_map]
    #  "rum_topic"="rum_01.p"
    #  "rum_02"="rum_02.p"

  #[inputs.kafkamq.remote_handle]
    ## Required!
    #endpoint="http://localhost:8080"
    ## Required! topics
    #topics=["spans","my-spans"]
    # send_message_count = 100
    # debug = false
    # is_response_point = true
    # header_check = false
  
  ## Receive and consume OTEL data from kafka.
  #[inputs.kafkamq.otel]
    #dk_endpoint="http://localhost:9529"
    #trace_api="/otel/v1/trace"
    #metric_api="/otel/v1/metric"
    #trace_topics=["trace1","trace2"]
    #metric_topics=["otel-metric","otel-metric1"]
    #thread = 8 

  ## todo: add other input-mq

注意:开启或调整 DataKit 的配置,需重启采集器(shell 下执行 datakit service -R)。

使用 Pipeline

log.p 规则内容

data = load_json(message)
protocol = data["protocol"]
response_code = data["response_code"]
set_tag(protocol,protocol)
set_tag(response_code,response_code)
group_between(response_code,[200,300],"info","status")
group_between(response_code,[400,499],"warning","status")
group_between(response_code,[500,599],"error","status")

time = data["start_time"]
set_tag(time,time)
default_time(time)

效果展示

发送业务日志样例

业务日志样例文件如下:

#info
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:37:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
#error
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:39:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
#warn
{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-05-01T00:38:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}

日志发送命令

在 Producer 启动后,分别发送如下三条日志内容,三条日志一条为 info 级别("response_code":204),另一条为 error 级别("response_code":504),最后一条为 warn 级别日志("response_code":404)。

>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":204,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
>
>
>
>
>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":504,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}
>
>
>
>{"protocol":"HTTP/1.1","upstream_local_address":"172.20.32.97:33878","response_flags":"-","istio_policy_status":null,"trace_id":"5532224c1013b9ad6da1efe88778dd64","authority":"server:1338","method":"PUT","response_code":404,"duration":83,"upstream_service_time":"83","user_agent":"Jakarta Commons-HttpClient/3.1","bytes_received":103,"downstream_local_address":"172.21.2.130:1338","start_time":"2024-04-30T08:47:11.230Z","upstream_transport_failure_reason":null,"requested_server_name":null,"bytes_sent":0,"route_name":"routes","x_forwarded_for":"10.0.0.69,10.23.0.31","upstream_cluster":"outbound|1338|svc.cluster.local","request_id":"80ac7d31-a598-4dc8-bb74-1850593f61e4","downstream_remote_address":"10.23.0.31:0","path":"/api/dimensions/items","upstream_host":"172.20.9.101:1338"}

通过 DataKit 采集到 Kafka 的三条业务日志

使用 Pipeline 对业务日志进行字段提取

下图 protocol、response_code 以及 time 都是使用 Pipeline 提取后的效果。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1653797.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

steam_api64.dll是什么东西?steam_api64.dll缺失的多个详细解决方法

在现代PC游戏领域&#xff0c;Steam无疑是最具影响力的游戏分发和社交平台之一。它不仅提供了一个庞大的游戏市场&#xff0c;还集成了好友系统、成就系统、云存储等多种功能&#xff0c;为数百万玩家提供了便捷的游戏体验。在这庞大的生态系统中&#xff0c;steam_api64.dll作…

vue3 依赖-组件tablepage-vue3版本1.0.3更新内容

github求⭐ 可通过github 地址和npm 地址查看全部内容 vue3 依赖-组件tablepage-vue3说明文档&#xff0c;列表页快速开发&#xff0c;使用思路及范例-汇总 vue3 依赖-组件tablepage-vue3说明文档&#xff0c;列表页快速开发&#xff0c;使用思路及范例&#xff08;Ⅰ&#…

DDPM与扩散模型

很早之前就新建了一个专栏从0开始弃坑扩散模型 ,但发了一篇文章就没有继续这一系列&#xff0c;在这个AIGC的时代&#xff0c;于是我准备重启这个专栏。 整个专栏的学习顺序可以见这篇汇总文章 这是本专栏的第一章 目录 引言生成模型的发展历程 引言 扩散模型( Diffusion Mode…

C数据结构:栈

目录 栈的作用 栈的实现 栈的数据结构 栈的初始化 栈的销毁 栈的插入 栈的删除 获得栈顶元素 获得栈有效元素个数 判断栈是否为空 栈的使用 完整代码 栈是一种特殊结构的线性表 先来看看栈的图 之所以说它特殊&#xff0c;是因为它的插入删除功能比较特殊 栈的插…

DBdoctor产品介绍

基本信息 DBdoctor是聚好看科技股份有限公司自主研发的一款数据库内核级性能诊断工具&#xff0c;首次将eBPF技术聚焦在了数据库领域&#xff0c;一分钟内定位数据库性能问题并给出优化建议&#xff0c;实现数据库性能诊断百倍提效。 免费下载 请在PC端打开以下链接&#x…

Burp Suite抓取明文

目录 Burp Suite代理 正常的通信模式 Burp Suite代理后通信模式 设置代理 安装证书 导出证书 Burp Suite导入 浏览器下载证书 安装证书 管理证书 导入证书 下一步 导入证书 下一步 完成 抓明文的例子 1、修改浏览器代理 ​编辑2、开启拦截​编辑 3、查看抓取历…

NSS题目练习

[SWPUCTF 2021 新生赛]gift_F12 通过题目提示可以知道flag应该可以在源代码中找到 查看源代码&#xff0c;直接用 ctrlf 搜索flag即可 [SWPUCTF 2021 新生赛]jicao 题目打开后能看到一串php代码&#xff0c;要求是用post传参传入idwllmNB以及用get传参传入json[x]"wllm&q…

YoloV5的学习与使用

前言 Yolo算法简介 YOLO (You Only Look Once) 是一种用于目标检测的深度学习算法&#xff0c;由 Joseph Redmon、Santosh Divvala、Ross Girshick 和 Ali Farhadi 在 2015 年提出。YOLO 是一种端到端的算法&#xff0c;它将目标检测任务视为一个单一的回归问题&#xff0c;从…

Web3钱包开发获取测试币-OKB X1Testnet(三)

Web3钱包开发获取测试币-OKB X1Testnet(三) 基于以上两篇 Web3钱包开发获取测试币-Polygon Mumbai(一) &#xff1a;https://suwu150.blog.csdn.net/article/details/137949473Web3钱包开发获取测试币-Base Sepolia(二)&#xff1a;https://suwu150.blog.csdn.net/article/det…

【HTTP下】总结{重定向/cookie/setsockopt/流操作/访问网页/总结}

文章目录 1.请求头2.cookie理解 3.vim跳转/搜索4.setsockopt被重用的意思 5.流操作5.1定位读取指针5.2ifstram::read() 6.总结6.1 百度搜索框搜索功能字符6.2请求uri请求和响应的第一行都有http版本请求内容里有GET /favicon.ico HTTP/1.1 6.3访问网页Fiddler抓包原理&#xff…

从零开始的软件测试学习之旅(九)jmeter直连数据库及jmeter断言,关联

jmeter直连数据库及断言,关联 jmeter直连数据库步骤jmeter断言jmeter逻辑控制器if控制器ForEach控制器循环控制器 Jmeter关联Jmeter关联XPath提取器Jmeter关联正则表达式提取器二者比较跨线程组关联 每日复习 jmeter直连数据库 概念 这不叫直连:Jmeter -> java/python 提供的…

W801学习笔记二十四:NES模拟器游戏

之前已经实现了NES模拟器玩游戏。W801学习笔记九&#xff1a;HLK-W801制作学习机/NES游戏机(模拟器) 现在要在新版本掌机中移植过来。 1、把NES文件都拷贝到SD卡中。 这回不会受内存大小限制了。我这里拷贝了4个&#xff0c;还可以拷贝更多。 2、应用初始化中&#xff0c;加载…

Spring:OAuth2.0

文章目录 一、认证与授权二、OAuth2.0介绍 一、认证与授权 认证&#xff08;Authentication&#xff09;与授权&#xff08;Authorization&#xff09;在网络安全和系统管理中是两个重要的概念&#xff0c;它们各自有不同的作用和目标。 认证是验证确认身份以授予对系统的访问…

编辑员工信息——后端

需求&#xff1a; 在员工管理列表页面点击编辑按钮&#xff0c;跳转到编辑页面&#xff0c;在编辑页面回显员工信息并进行修改&#xff0c;最后点击保存按钮完成编辑操作。 代码开发流程&#xff1a; 点击编辑按钮&#xff0c;页面跳转到add.html&#xff0c;并在url中携带参…

高精度数学计算的瑞士军刀,mpmath库详解与应用示例

写在前言 hello&#xff0c;大家好&#xff0c;我是一点&#xff0c;专注于Python编程&#xff0c;如果你也对感Python感兴趣&#xff0c;欢迎关注交流。 做为一个一只脚已经踏进35岁大关的程序员&#xff0c;对于职场&#xff0c;几乎向上无望&#xff0c;已经没有太多的期待…

面向侧扫声纳目标检测的YOLOX-ViT知识精馏

面向侧扫声纳目标检测的YOLOX-ViT知识精馏 摘要IntroductionRelated WorkYOLOv-ViTKnowledge DistillationExperimental Evaluation Knowledge Distillation in YOLOX-ViT for Side-Scan Sonar Object Detection 摘要 在本文中&#xff0c;作者提出了YOLOX-ViT这一新型目标检测…

Springboot+vue项目零食销售商城

摘要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;零食销售商城当然也不能排除在外。零食销售商城是以实际运用为开发背景&#xff0c;运用软件工程原理和开发方法&#xff…

Yolov8实现loopy视频识别

1、前言 loopy是一个非常可爱的动漫角色&#xff08;可爱粉色淀粉肠&#xff09;&#xff0c;闲来无事&#xff0c;打算用yolov8训练一个模型对loopy进行识别。 2、准备工作 先在网络上搜寻很多loopy的图片&#xff0c;然后将图片导入Lablel Studio软件进行标注&#xff0c;并…

Ansible---inventory 主机清单

一、inventory 主机清单 1.1、inventory介绍 hosts配置文件位置&#xff1a;/etc/ansible/hosts Inventory支持对主机进行分组&#xff0c;每个组内可以定义多个主机&#xff0c;每个主机都可以定义在任何一个或多个主机组内。 1.2、inventory中的变量 Inventory变量名含义…

Service 和 Ingress

文章目录 Service 和 IngressServiceEndpointservice 的定义代理集群外部服务反向代理外部域名Service 常用类型 IngressIngress-nginx安装使用 Service 和 Ingress service 和 ingress 是kubernetes 中用来转发网络请求的两个服务&#xff0c;两个服务用处不同&#xff0c;se…