ES8生产实践——日志清洗过滤(vector方案)

news2025/4/28 3:07:54

前言

什么是vector

以下描述摘自官方文档:https://vector.dev/docs/about/what-is-vector/

Vector 是一个高性能的可观测性数据管道,可帮助企业控制其可观测性数据。收集、转换和路由所有日志、度量指标和跟踪数据,并将其提供给今天需要的任何供应商和明天可能需要的任何其他供应商。Vector 可在您需要的地方,而不是在供应商最方便的地方,大幅降低成本、丰富新颖的数据并确保数据安全。开放源代码,速度比其他任何替代方案快 10 倍。

简单来说由于logstash使用java语言开发,在处理海量数据时存在性能低下,占用资源过高的问题。而vector使用Rust语言编写,除了使用极少的资源实现logstash数据处理能力外,还具备配置文件简单、处理函数强大、智能均衡kafka分区消费、自适应并发请求等特色功能。

vector架构图

与logstash管道处理类似,vector主要包含数据输入、数据处理、数据输出三部分。
image.png

vector优势

  • 超级快速可靠:Vector采用Rust构建,速度极快,内存效率高,旨在处理最苛刻的工作负载
  • 端到端:Vector 致力于成为从 A 到 B 获取可观测性数据所需的唯一工具,并作为守护程序、边车或聚合器进行部署
  • 统一:Vector 支持日志和指标,使您可以轻松收集和处理所有可观测性数据
  • 供应商中立:Vector 不偏向任何特定的供应商平台,并以您的最佳利益为出发点,培育公平、开放的生态系统。免锁定且面向未来
  • 可编程转换:Vector 的高度可配置转换为您提供可编程运行时的全部功能。无限制地处理复杂的用例

对比测试

性能对比

下图是 Vector 与其它日志收集器的性能测试结果对比,可以看到,Vector 的各项性能指标都优于 Logstash,综合性能也不错,加上其丰富的功能,完全可以满足我们的日志处理需求。

TestVectorFilebeatFluentBitFluentDLogstashSplunkUFSplunkHF
TCP to Blackhole86mib/sn/a64.4mib/s27.7mib/s40.6mib/sn/an/a
File to TCP76.7mib/s7.8mib/s35mib/s26.1mib/s3.1mib/s40.1mib/s39mib/s
Regex Parsing13.2mib/sn/a20.5mib/s2.6mib/s4.6mib/sn/a7.8mib/s
TCP to HTTP26.7mib/sn/a19.6mib/s<1mib/s2.7mib/sn/an/a
TCP to TCP69.9mib/s5mib/s67.1mib/s3.9mib/s10mib/s70.4mib/s7.6mib/s

可靠性对比

TestVectorFilebeatFluentBitFluentDLogstashSplunk UFSplunk HF
Disk Buffer Persistence
Disk Buffer Persistence
File Rotate (copytruncate)
File Truncation
Process (SIGHUP)
JSON (wrapped)

功能性对比

VectorBeatsFluentbitFluentdLogstashSplunk UFSplunk HFTelegraf
End-to-end
Agent
Aggregator
Unified
Logs
Metrics
Open
Open-source
Vendor-neutral
Reliability
Memory-safe
Delivery guarantees
Multi-core

快速上手

安装部署

官方为我们提供了安装包、docker等多种安装方式,下载地址:https://vector.dev/download/,此处以rpm包部署为例。

[root@tiaoban ~]# wget https://packages.timber.io/vector/0.34.0/vector-0.34.0-1.x86_64.rpm
[root@tiaoban ~]# rpm -ivh vector-0.34.0-1.x86_64.rpm
[root@tiaoban ~]# systemctl start vector --now

配置测试

与logstash类似,在调试数据处理规则时,通常会从文件中读取数据,经过一系列处理后最后输出至控制台。其作用是读取 /var/log/messages日志文件,然后把 syslog 格式的日志转换成 json 格式,最后输出到标准输出:

[root@tiaoban ~]# cd /etc/vector/
[root@tiaoban vector]# ls
examples  vector.yaml  vector.yaml.back
[root@tiaoban vector]# cat vector.yaml
sources:
  in:
    type: "stdin"

sinks:
  print:
    type: "console"
    inputs: ["in"]
    encoding:
      codec: "json"
[root@tiaoban vector]# vector -c vector.yaml
2023-11-12T13:40:06.995872Z  INFO vector::app: Log level is enabled. level="vector=info,codec=info,vrl=info,file_source=info,tower_limit=info,rdkafka=info,buffers=info,lapin=info,kube=info"
2023-11-12T13:40:06.996328Z  INFO vector::app: Loading configs. paths=["vector.yaml"]
2023-11-12T13:40:06.998137Z  INFO vector::topology::running: Running healthchecks.
2023-11-12T13:40:06.998206Z  INFO vector: Vector has started. debug="false" version="0.34.0" arch="x86_64" revision="c909b66 2023-11-07 15:07:26.748571656"
2023-11-12T13:40:06.998219Z  INFO vector::app: API is disabled, enable by setting `api.enabled` to `true` and use commands like `vector top`.
2023-11-12T13:40:06.998590Z  INFO vector::topology::builder: Healthcheck passed.
2023-11-12T13:40:06.998766Z  INFO vector::sources::file_descriptors: Capturing stdin.
hello vector
{"host":"tiaoban","message":"hello vector","source_type":"stdin","timestamp":"2023-11-12T13:40:13.368669601Z"}

当控制台开始打印日志,就说明正常采集到了数据,而且转换成了 json 并打印到了控制台,实验成功。接下来详细介绍vector各个配置段具体内容。

多配置文件

当项目、环境、规则等十分庞杂时,推荐对配置文件根据需要进行拆分,而默认的 vector.toml 文件中可以写入一些全局配置。
修改systemctl 命令管理相关配置,让 Vector 读取指定目录下所有配置文件,而非默认的 vector.toml 文件。
将 Vector systemd 配置文件第 12 行由 ExecStart=/usr/bin/vector 修改为:ExecStart=/usr/bin/vector “–config-dir” “/etc/vector”:

# /usr/lib/systemd/system/vector.service
[Unit]
Description=Vector
Documentation=https://vector.dev
After=network-online.target
Requires=network-online.target


[Service]
User=vector
Group=vector
ExecStartPre=/usr/bin/vector validate
ExecStart=/usr/bin/vector "--config-dir" "/etc/vector"
ExecReload=/usr/bin/vector validate
ExecReload=/bin/kill -HUP $MAINPID
Restart=always
AmbientCapabilities=CAP_NET_BIND_SERVICE
EnvironmentFile=-/etc/default/vector
# Since systemd 229, should be in [Unit] but in order to support systemd <229,
# it is also supported to have it here.
StartLimitInterval=10
StartLimitBurst=5
[Install]
WantedBy=multi-user.target

修改 vector 配置目录环境变量,将 VECTOR_CONFIG_DIR="/etc/vector/"追加到 /etc/default/vector 文件最后:

# /etc/default/vector
# This file can theoretically contain a bunch of environment variables
# for Vector.  See https://vector.dev/docs/setup/configuration/#environment-variables
# for details.
VECTOR_CONFIG_DIR="/etc/vector/"

重新加载 systemd 配置并重启 Vector:

systemctl daemon-reload
systemctl restart vector

配置文件详解

配置文件构成

根据架构图可知,Vector由 Sources、Transforms和Sinks三个部分构成,Vector作为一款管道处理工具,日志数据可以从多个源头(source)流入管道,比如 HTTP、Syslog、File、Kafka 等等,当日志数据流入管道后,就可以被进行一系列处理,处理的过程就是转换(Transform),比如增减日志中的一些字段,对日志进行重新格式化等操作,日志被处理成想要的样子后,就可以传输给接收器(Sink)处理,也就是日志最终流向何处,可以是 Elasticsearch、ClickHouse、AWS S3 等等,配置文件基本格式如下

sources:	# 源
  my_source_id:   # 数据源名称
    type: "***" # 数据源类型
    
transforms: # 转换
  my_transform_id: # 转换名称
    type: remap # 转换类型
    inputs: # 转换操作的源
      - my_source_id

源(source)

Vector 提供了丰富的 Sources 供使用,常用的有控制台、模拟数据、File、http、Kafka等,并且支持的种类还在不断增加,详情参考官方文档https://vector.dev/docs/reference/configuration/sources/。

  • 控制台输入
sources:
  my_source_id:   # 数据源名称
    type: "stdin"
  • 模拟日志数据输入
sources:
  my_source_id:  # 数据源名称
    type: "demo_logs"
    format: "apache_common"  # 模拟数据类型
    count:  10 # 模拟数据条数
  • file输入示例:
sources:
  my_source_id:   # 数据源名称
    type: "file"
    include:			# 采集路径
      - /var/log/**/*.log
  • http输入示例:
sources:
  my_source_id:  # 数据源名称
    type: "http_server"
    address: "0.0.0.0:80" # 监听地址
  • kafka输入示例:
sources:
  my_source_id: # 数据源名称
    type: "kafka"
    bootstrap_servers: "10.14.22.123:9092,10.14.23.332:9092" # kafka地址
    group_id: "consumer-group-name"	# 消费组id
    topics: # 消费主题,支持多个topic使用正则匹配
      - ^(prefix1|prefix2)-.+
    decoding: # 编码格式
      codec: "json"
    auto_offset_reset: "latest" # 消费偏移

转换(transforms)-VRL

在Vector传输数据时,可能涉及数据解析、过滤、新增等操作,跟Logstash组件类似,Vector 提供了诸多 Transforms插件来对日志进行处理。 更多transforms插件参考文档:https://vector.dev/docs/reference/configuration/transforms/
Vector推荐使用remap插件处理数据,它使用VRL语言对日志进行处理,它提供了非常丰富的函数,可以拿来即用。在线调试地址:https://playground.vrl.dev/,VRL语言快速入门参考文档:https://vector.dev/docs/reference/vrl/
接下来列举几个常用的转换案例

  • 字段增删改
# 配置文件
sources:
  my_source:   # 数据源名称
    type: "stdin"

transforms:
  my_transform:
    type: remap
    inputs:    # 匹配输入源
      - my_source
    source: |
      . = parse_json!(.message) # 解析json数据
      .@timestamp = now() # 新增字段
      del(.user) # 删除字段
      .age = del(.value) # 重命名字段

sinks:
  my_print:
    type: "console"
    inputs: ["my_transform"] # 匹配过滤项
    encoding:
      codec: "json"
# 执行结果
{"hello":"world","user":"张三","value":18}
{"@timestamp":"2023-11-19T03:02:07.060420199Z","age":18,"hello":"world"}
  • 字段值操作
# 配置文件
sources:
  my_source:   # 数据源名称
    type: "stdin"

transforms:
  my_transform:
    type: remap
    inputs:    # 匹配输入源
      - my_source
    source: |
      . = parse_json!(.message) # 解析json数据
      .msg = downcase(string!(.msg)) # 转小写
      .user = replace(string!(.user), "三", "四") # 字符串替换
      .value = floor(float!(.value), precision: 2) # 小数保留指定长度

sinks:
  my_print:
    type: "console"
    inputs: ["my_transform"] # 匹配过滤项
    encoding:
      codec: "json"
# 执行结果
{"msg":"Hello, World!","user":"张三","value":3.1415926}
{"msg":"hello, world!","user":"张四","value":3.14}
  • 正则解析
# 配置文件
sources:
  my_source:   # 数据源名称
    type: "stdin"

transforms:
  my_transform:
    type: remap
    inputs:    # 匹配输入源
      - my_source
    source: |
      . = parse_regex!(.message, r'^\[(?<logtime>[^\]]*)\] (?<name>[^ ]*) (?<title>[^ ]*) (?<id>\d*)$')

sinks:
  my_print:
    type: "console"
    inputs: ["my_transform"] # 匹配过滤项
    encoding:
      codec: "json"
# 执行结果
[2023-11-11 12:00:00 +0800] alice engineer 1
{"id":"1","logtime":"2023-11-11 12:00:00 +0800","name":"alice","title":"engineer"}
  • 时间格式化
# 配置文件
sources:
  my_source:   # 数据源名称
    type: "stdin"

transforms:
  my_transform:
    type: remap
    inputs:    # 匹配输入源
      - my_source
    source: |
      . = parse_regex!(.message, r'^\[(?<logtime>[^\]]*)\] (?<name>[^ ]*) (?<title>[^ ]*) (?<id>\d*)$')
      .logtime = parse_timestamp!((.logtime), format:"%Y-%m-%d %H:%M:%S %:z")
sinks:
  my_print:
    type: "console"
    inputs: ["my_transform"] # 匹配过滤项
    encoding:
      codec: "json"
# 执行结果
[2023-11-11 12:00:00 +0800] alice engineer 1
{"id":"1","logtime":"2023-11-11T04:00:00Z","name":"alice","title":"engineer"}
  • geoip解析
# 配置文件
sources:
  my_source:   # 数据源名称
    type: "stdin"

transforms:
  my_transform:
    type: remap
    inputs:    # 匹配输入源
      - my_source
    source: |
      . = parse_json!(.message) # 解析json数据
      .geoip = get_enrichment_table_record!("geoip_table",
        {
          "ip": .ip_address
        })

sinks:
  my_print:
    type: "console"
    inputs: ["my_transform"] # 匹配过滤项
    encoding:
      codec: "json"

enrichment_tables:
  geoip_table: # 指定geoip数据库文件
    path: "/root/GeoLite2-City.mmdb"
    type: geoip
# 执行结果
{"ip_address":"185.14.47.131"}
{"geoip":{"city_name":"Hong Kong","continent_code":"AS","country_code":"HK","country_name":"Hong Kong","latitude":22.2842,"longitude":114.1759,"metro_code":null,"postal_code":null,"region_code":"HCW","region_name":"Central and Western District","timezone":"Asia/Hong_Kong"},"ip_address":"185.14.47.131"}

转换(transforms)-Lua

如果 VRL 不能满足用户对日志的处理需求,Vector 也支持嵌入 Lua 语言对日志进行处理,但是这种方式要比 VRL 慢将近 60 %。具体内容可参考文档:https://vector.dev/docs/reference/configuration/transforms/lua/,此处不再做过多介绍,推荐优先使用VRL语言转换。

转换(transforms)-过滤

很多时候从数据源采集过来的数据我们并不是全部都需要,filter顾名思义便是用来解决这一问题的,例如删除debug等级的日志信息。

# 配置文件
sources:
  my_source:   # 数据源名称
    type: "stdin"

transforms:
  transform_json:
    type: remap
    inputs:    # 匹配输入源
      - my_source
    source: |
      . = parse_json!(.message) # 解析json数据
    
  transform_filter:
    type: filter
    inputs:
      - transform_json
    condition: |
      .level != "debug"
  

sinks:
  my_print:
    type: "console"
    inputs: ["transform_filter"] # 匹配过滤项
    encoding:
      codec: "json"
# 执行结果
{"level":"debug","msg":"hello"}
{"level":"waring","msg":"hello"}
{"level":"waring","msg":"hello"}

接收器(sinks)

接收器是事件的目的地,Vector同样提供了很多 Sinks 类型,其中有些和 Sources 是重合的,比如 Kafka、AWS S3 等,更多支持的接收器类型可参考文档:https://vector.dev/docs/reference/configuration/sinks/

  • 输出到控制台
sinks:
  print:
    type: "console"
    inputs: ["in"]
    encoding:
      codec: "json" # 输出为json格式,也可设置为text文件
  • 输出到文件
sinks:
  my_sink_id:
    type: file
    inputs:
      - my-source-or-transform-id
    path: /tmp/vector-%Y-%m-%d.log
    encoding:
      codec: "text"
  • 输出到kafka
sinks:
  my_sink_id:
    type: kafka
    inputs:
      - my-source-or-transform-id
    bootstrap_servers: 10.14.22.123:9092,10.14.23.332:9092
    topic: topic-1234
  • 输出到elasticsearch
sinks:
  my_es:
    type: elasticsearch
    inputs: ["my_transform"] # 匹配转换配置
    api_version: "v8"        # ES版本,非必填
    mode: "data_stream" # 数据流方式写入
    auth: # es认证信息 
      strategy: "basic"
      user: "elastic"
      password: "WbZN3xfa5M4uy+UcxJeH"
    data_stream: # 数据流名称配置
      type: "logs"
      dataset: "vector"
      namespace: "default"
    endpoints: ["https://192.168.10.50:9200"] # es连接地址
    tls: # tls证书配置
      verify_certificate: false # 跳过证书验证
      # ca_file: "XXXXX" # ca证书路径

写入es中的数据流信息如下所示:
image.png

全局配置

  • 数据目录

用于持久化 Vector 状态的目录,例如 作为磁盘缓冲区、文件检查点等功能。

data_dir: "/var/lib/vector"
  • API

vector为我们提供了常用的API接口,可以很方便的进行监控检查与状态信息获取。
首先打开 Vector 的 api 功能,在 vector.toml 配置文件中加入以下内容即可:

api:
  enabled: true
  address: "0.0.0.0:8686"

重启 Vector,获取 Vector 的健康状态:

$ curl localhost:8686/health
{"ok":true}

开启api后,我们还可以通过命令行vector top命令获取各个任务的性能信息
image.png

监控

指标

  • prometheus_remote_write

将 Vector 内部指标 Sink 到 Prometheus,据此建立更为详细的 Dashboard 和告警。
在 vector.toml 配置文件中加入以下内容即可,vector 内置的指标通过远程写入的方式写入指定的 Prometheus中。

sources:
  vector_metrics:
    type: internal_metrics
sinks:
  prometheus:
    type:
      - prometheus_remote_write
    endpoint:
      - https://<prometheus_ip_address>:8087/
    inputs:
      - vector_metrics
  • prometheus_exporter

除了远程写入外,vector也支持通过exporter方式保留指标数据供Prometheus抓取,配置文件如下

sources:
  metrics:
    type: internal_metrics
    namespace: vector
    scrape_interval_secs: 30

sinks:
  prometheus:
    type: prometheus_exporter
    inputs:
      - metrics
    address: 0.0.0.0:9598
    default_namespace: service

然后在Prometheus的job中配置地址为http://IP:9598/metrics即可。

日志

将vector的运行日志写入本地文件或者elasticsearch中存储,以本地存储为例:

sources:
  logs:
    type: "internal_logs"

sinks:
  files:
    type: file
    inputs:
      - logs
    path: /tmp/vector-%Y-%m-%d.log
    encoding:
      codec: text

特色功能

vector自动均衡kafka消费

在之前使用logstash消费kafka数据时,需要根据topic数据量大小配置kafka partition数、Logstash副本数、每个logstash线程数,而这些数量只能根据性能监控图和数据量逐个调整至合适的大小。例如有6台logstash机器,其中5台机器专门用于消费数据量大的topic,其他机器消费小数据量的topic,经常存在logstash节点负载不均衡的问题。
使用vector后,我们只需要让所有机器使用相同的配置,借助Kafka的Consumer Group技术,不同配置文件通过同一个group_id即可一起消费所有的topic,vector在消费的过程中会自动均衡kafka消费速率。

自适应并发请求

在0.11.0版本后默认启用了自适应并发,这是一个智能、强大的功能,官方介绍https://vector.dev/blog/adaptive-request-concurrency/#rate-limiting-problem
在之前的版本中,为了保障数据正常写入下游Elasticsearch或Clickhouse时,需要进行速率限制。但限制速率值设定存在以下问题

  • 将限制设置得太高,从而使服务不堪重负,从而损害系统可靠性。
  • 设置的限制太低,浪费资源。

为了解决这个问题,vector推出了自适应并发的功能,它会重点观察两件事:请求的往返时间 (RTT) 和 HTTP 响应代码(失败与成功),从而决策出一个最佳的速率!
image.png
在写入Elasticsearch或Clickhouse时,默认已将其设为启用, 不需要进一步的配置。

vector解析日志实践

调试解析配置

假设线上应用原始日志格式如下,接下来我们通过vector解析日志内容。

2023-07-23 09:35:18.987 | INFO     | __main__:debug_log:49 - {'access_status': 200, 'request_method': 'GET', 'request_uri': '/account/', 'request_length': 67, 'remote_address': '185.14.47.131', 'server_name': 'cu-36.cn', 'time_start': '2023-07-23T09:35:18.879+08:00', 'time_finish': '2023-07-23T09:35:19.638+08:00', 'http_user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36'}
  1. 修改vector配置文件,添加sources配置项,从控制台读取数据。并新增sinks配置项,输出到控制台,vector配置文件如下所示:
sources:
  my_source:   
    type: "stdin"
  
sinks:
  my_print:
    type: "console"
    inputs: ["my_source"]
    encoding:
      codec: "json" 

观察控制台输出内容,已经将控制台输出的日志数据添加到了message字段中

2023-07-23 09:35:18.987 | INFO     | __main__:debug_log:49 - {'access_status': 200, 'request_method': 'GET', 'request_uri': '/account/', 'request_length': 67, 'remote_address': '185.14.47.131', 'server_name': 'cu-36.cn', 'time_start': '2023-07-23T09:35:18.879+08:00', 'time_finish': '2023-07-23T09:35:19.638+08:00', 'http_user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36'}
{"host":"huanbao","message":"2023-07-23 09:35:18.987 | INFO     | __main__:debug_log:49 - {'access_status': 200, 'request_method': 'GET', 'request_uri': '/account/', 'request_length': 67, 'remote_address': '185.14.47.131', 'server_name': 'cu-36.cn', 'time_start': '2023-07-23T09:35:18.879+08:00', 'time_finish': '2023-07-23T09:35:19.638+08:00', 'http_user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36'}","source_type":"stdin","timestamp":"2023-11-19T08:07:07.270947582Z"}
  1. 接下来调试VRL解析配置,推荐使用在线调试工具https://playground.vrl.dev/image.png

调试无误后,将VRL处理语句添加到vector配置中。

sources:
  my_source:   
    type: "stdin"
  
transforms:
  my_transform:
    type: remap
    inputs:    # 匹配输入源
      - my_source
    source: |
      . = parse_regex!(.message, r'^(?<logtime>[^|]+) \| (?<level>[A-Z]*) *\| __main__:(?<class>\D*:\d*) - (?<content>.*)$') # 正则提取logtime、level、class、content
      .content = replace(.content, "'", "\"") # 将content单引号替换为双引号
      .content = parse_json!(.content) # json解析content内容
      .access_status = (.content.access_status) # 将content中的子字段提取到根级
      .http_user_agent = (.content.http_user_agent)
      .remote_address = (.content.remote_address)
      .request_length = (.content.request_length)
      .request_method = (.content.request_method)
      .request_uri = (.content.request_uri)
      .server_name = (.content.server_name)
      .time_finish = (.content.time_finish)
      .access_status = (.content.access_status)
      .time_start = (.content.time_start)
      del(.content) # 删除content字段
      .logtime = parse_timestamp!((.logtime), format:"%Y-%m-%d %H:%M:%S.%3f") # 格式化时间字段
      .time_start = parse_timestamp!((.time_start), format:"%Y-%m-%dT%H:%M:%S.%3f%:z") 
      .time_finish = parse_timestamp!((.time_finish), format:"%Y-%m-%dT%H:%M:%S.%3f%:z")
      .level = downcase(.level) # 将level字段值转小写

sinks:
  my_print:
    type: "console"
    inputs: ["my_transform"] # 匹配转换配置
    encoding:
      codec: "json" 

添加VRL解析规则后,控制台输出内容如下,已成功对原始数据完成解析处理

2023-07-23 09:35:18.987 | INFO     | __main__:debug_log:49 - {'access_status': 200, 'request_method': 'GET', 'request_uri': '/account/', 'request_length': 67, 'remote_address': '185.14.47.131', 'server_name': 'cu-36.cn', 'time_start': '2023-07-23T09:35:18.879+08:00', 'time_finish': '2023-07-23T09:35:19.638+08:00', 'http_user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36'}
{"access_status":200,"class":"debug_log:49","http_user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36","level":"info","logtime":"2023-07-23T01:35:18.987Z","remote_address":"185.14.47.131","request_length":67,"request_method":"GET","request_uri":"/account/","server_name":"cu-36.cn","time_finish":"2023-07-23T01:35:19.638Z","time_start":"2023-07-23T01:35:18.879Z"}
  1. 我们已经成功解析到了remote_address字段,接下来从geoip数据库中查询ip的地理位置信息。
sources:
  my_source:   
    type: "stdin"
  
transforms:
  my_transform:
    type: remap
    inputs:    # 匹配输入源
      - my_source
    source: |
      . = parse_regex!(.message, r'^(?<logtime>[^|]+) \| (?<level>[A-Z]*) *\| __main__:(?<class>\D*:\d*) - (?<content>.*)$') # 正则提取logtime、level、class、content
      .content = replace(.content, "'", "\"") # 将content单引号替换为双引号
      .content = parse_json!(.content) # json解析content内容
      .access_status = (.content.access_status) # 将content中的子字段提取到根级
      .http_user_agent = (.content.http_user_agent)
      .remote_address = (.content.remote_address)
      .request_length = (.content.request_length)
      .request_method = (.content.request_method)
      .request_uri = (.content.request_uri)
      .server_name = (.content.server_name)
      .time_finish = (.content.time_finish)
      .access_status = (.content.access_status)
      .time_start = (.content.time_start)
      del(.content) # 删除content字段
      .logtime = parse_timestamp!((.logtime), format:"%Y-%m-%d %H:%M:%S.%3f") # 格式化时间字段
      .time_start = parse_timestamp!((.time_start), format:"%Y-%m-%dT%H:%M:%S.%3f%:z") 
      .time_finish = parse_timestamp!((.time_finish), format:"%Y-%m-%dT%H:%M:%S.%3f%:z")
      .level = downcase(.level) # 将level字段值转小写
      .geoip = get_enrichment_table_record!("geoip_table", # ip地理位置信息解析
        {
          "ip": .remote_address
        })

sinks:
  my_print:
    type: "console"
    inputs: ["my_transform"] # 匹配转换配置
    encoding:
      codec: "json" 

enrichment_tables:
  geoip_table: # 指定geoip数据库文件
    path: "/root/GeoLite2-City.mmdb"
    type: geoip

观察控制台输出,已经成功通过remote_address字段的ip地址获取到了地理位置信息内容。

2023-07-23 09:35:18.987 | INFO     | __main__:debug_log:49 - {'access_status': 200, 'request_method': 'GET', 'request_uri': '/account/', 'request_length': 67, 'remote_address': '185.14.47.131', 'server_name': 'cu-36.cn', 'time_start': '2023-07-23T09:35:18.879+08:00', 'time_finish': '2023-07-23T09:35:19.638+08:00', 'http_user_agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36'}
{"access_status":200,"class":"debug_log:49","geoip":{"city_name":"Hong Kong","continent_code":"AS","country_code":"HK","country_name":"Hong Kong","latitude":22.2842,"longitude":114.1759,"metro_code":null,"postal_code":null,"region_code":"HCW","region_name":"Central and Western District","timezone":"Asia/Hong_Kong"},"http_user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.2999.0 Safari/537.36","level":"info","logtime":"2023-07-23T01:35:18.987Z","remote_address":"185.14.47.131","request_length":67,"request_method":"GET","request_uri":"/account/","server_name":"cu-36.cn","time_finish":"2023-07-23T01:35:19.638Z","time_start":"2023-07-23T01:35:18.879Z"}

构建vector镜像

由于vector镜像未包含geoip数据库文件,如果需要根据IP地址解析获取地理位置信息,则需要提前构建包含geoip文件的vector镜像,并上传至harbor仓库中。

[root@tiaoban evk]# ls
Dockerfile  filebeat  GeoLite2-City.mmdb  kafka  log-demo.yaml  strimzi-kafka-operator  vector
[root@tiaoban evk]# cat Dockerfile 
FROM timberio/vector:0.34.1-debian
ADD GeoLite2-City.mmdb /etc/vector/GeoLite2-City.mmdb
[root@tiaoban evk]# docker build -t harbor.local.com/elk/vector:v0.34.1 .
[root@tiaoban evk]# docker push harbor.local.com/elk/vector:v0.34.1

k8s资源清单

  • vector-config.yaml

此配置文件为vector的全局通用配置文件,主要配置了api和监控指标。

apiVersion: v1
kind: ConfigMap
metadata:
  name: vector-config
  namespace: elk
data:
  vector.yaml: |
    data_dir: "/var/lib/vector"
    api:
      enabled: true
      address: "0.0.0.0:8686"
      
    sources:
      metrics:
        type: internal_metrics
        namespace: vector
        scrape_interval_secs: 30

    sinks:
      prometheus:
        type: prometheus_exporter
        inputs:
          - metrics
        address: 0.0.0.0:9598
        default_namespace: service
  • pod-config.yaml

此配置文件主要是从kafka中读取数据,然后移除非必要的字段信息,最后写入es中

apiVersion: v1
kind: ConfigMap
metadata:
  name: pod-config
  namespace: elk
data:
  pod.yaml: |
    sources:
      pod_kafka:
        type: "kafka"
        bootstrap_servers: "my-cluster-kafka-brokers.kafka.svc:9092"
        group_id: "pod"
        topics:
          - "pod_logs"
        decoding:
          codec: "json"
        auto_offset_reset: "latest"

    transforms:
      pod_transform:
        type: remap
        inputs:    # 匹配输入源
          - pod_kafka
        source: |
          del(.agent)
          del(.event)
          del(.ecs)
          del(.host)
          del(.input)
          del(.kubernetes.labels)
          del(.log)
          del(.orchestrator)
          del(.stream)
    
    sinks:
      pod_es:
        type: elasticsearch
        inputs: ["pod_transform"] # 匹配转换配置
        api_version: "v8"        # ES版本,非必填
        mode: "data_stream" # 数据流方式写入
        auth: # es认证信息 
          strategy: "basic"
          user: "elastic"
          password: "2zg5q6AU7xW5jY649yuEpZ47"
        data_stream: # 数据流名称配置
          type: "logs"
          dataset: "pod"
          namespace: "elk"
        endpoints: ["https://elasticsearch-es-http.elk.svc:9200"] # es连接地址
        tls: # tls证书配置
          verify_certificate: false # 跳过证书验证
  • myapp-config.yaml

此配置文件从kafka中读取pod日志数据,然后通过filter过滤出log-demo的日志数据,做进一步解析处理后写入es中

apiVersion: v1
kind: ConfigMap
metadata:
  name: myapp-config
  namespace: elk
data:
  myapp.yaml: |
    sources:
      myapp_kafka:
        type: "kafka"
        bootstrap_servers: "my-cluster-kafka-brokers.kafka.svc:9092"
        group_id: "myapp"
        topics:
          - "pod_logs"
        decoding:
          codec: "json"
        auto_offset_reset: "latest"
      
    transforms:
      myapp_filter:
        type: filter
        inputs:    # 匹配输入源
          - myapp_kafka
        condition: |
          .kubernetes.deployment.name == "log-demo"
      myapp_transform:
        type: remap
        inputs:    # 匹配输入源
          - myapp_filter
        source: |
          . = parse_regex!(.message, r'^(?<logtime>[^|]+) \| (?<level>[A-Z]*) *\| __main__:(?<class>\D*:\d*) - (?<content>.*)$') # 正则提取logtime、level、class、content
          .content = replace(.content, "'", "\"") # 将content单引号替换为双引号
          .content = parse_json!(.content) # json解析content内容
          .access_status = (.content.access_status) # 将content中的子字段提取到根级
          .http_user_agent = (.content.http_user_agent)
          .remote_address = (.content.remote_address)
          .request_length = (.content.request_length)
          .request_method = (.content.request_method)
          .request_uri = (.content.request_uri)
          .server_name = (.content.server_name)
          .time_finish = (.content.time_finish)
          .access_status = (.content.access_status)
          .time_start = (.content.time_start)
          del(.content) # 删除content字段
          .logtime = parse_timestamp!((.logtime), format:"%Y-%m-%d %H:%M:%S.%3f") # 格式化时间字段
          .time_start = parse_timestamp!((.time_start), format:"%Y-%m-%dT%H:%M:%S.%3f%:z") 
          .time_finish = parse_timestamp!((.time_finish), format:"%Y-%m-%dT%H:%M:%S.%3f%:z")
          .level = downcase(.level) # 将level字段值转小写
          .geoip = get_enrichment_table_record!("geoip_table",
          {
            "ip": .remote_address
          }) # 地理位置信息解析
        
    sinks:
      myapp_es:
        type: elasticsearch
        inputs: ["myapp_transform"] # 匹配转换配置
        api_version: "v8"        # ES版本,非必填
        mode: "data_stream" # 数据流方式写入
        auth: # es认证信息 
          strategy: "basic"
          user: "elastic"
          password: "2zg5q6AU7xW5jY649yuEpZ47"
        data_stream: # 数据流名称配置
          type: "logs"
          dataset: "myapp"
          namespace: "elk"
        endpoints: ["https://elasticsearch-es-http.elk.svc:9200"] # es连接地址
        tls: # tls证书配置
          verify_certificate: false # 跳过证书验证 
    
    enrichment_tables:
      geoip_table: # 指定geoip数据库文件
        path: "/etc/vector/GeoLite2-City.mmdb"
        type: geoip
  • deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: vector
  namespace: elk
spec:
  replicas: 2
  selector:
    matchLabels:
      app: vector
  template:
    metadata:
      labels:
        app: vector
    spec:
      securityContext:
        runAsUser: 0
      containers:
      - image: harbor.local.com/elk/vector:v0.34.1
        name: vector
        resources:
          limits:
            cpu: "1"
            memory: 1Gi
        args:
        - -c
        - /etc/vector/*.yaml
        ports:
          - name: exporter
            containerPort: 9598
          - name: api
            containerPort: 8686
        volumeMounts:
        - name: timezone
          mountPath: /etc/localtime
        - name: data
          mountPath: /var/lib/vector
        - name: vector-config
          mountPath: /etc/vector/vector.yaml
          subPath: vector.yaml
        - name: pod-config
          mountPath: /etc/vector/pod.yaml
          subPath: pod.yaml
        - name: myapp-config
          mountPath: /etc/vector/myapp.yaml
          subPath: myapp.yaml
        readinessProbe:
          httpGet:
            path: /health
            port: 8686
        livenessProbe:
          httpGet:
            path: /health
            port: 8686
      volumes:
      - name: timezone
        hostPath:
          path: /usr/share/zoneinfo/Asia/Shanghai
      - name: data
        hostPath:
          path: /data/vector
          type: DirectoryOrCreate
      - name: vector-config
        configMap:
          name: vector-config
      - name: pod-config
        configMap:
          name: pod-config
      - name: myapp-config
        configMap:
          name: myapp-config
  • service.yaml
apiVersion: v1
kind: Service
metadata:
  name: vector-exporter
  namespace: elk
spec:
  selector:
    app: vector
  ports:
  - name: exporter 
    port: 9598 
    targetPort: 9598

效果演示

查看kibana数据流信息,已成功创建myapp和pod日志的索引。
image.png
查看pod索引数据信息
image.png
查看myapp索引数据信息
image.png

注意事项

多配置文件启动

我们可以通过-c /etc/vector/*.yaml方式指定多个配置文件启动,此时vector会扫描指定路径下的所有yaml配置文件并加载启动。这样配置便于管理各个管道处理规则配置,使配置文件结构更加清晰,便于日后维护工作。

管道名称全局唯一

sources、transforms、sinks的自定义名称,全局必须唯一,尤其是多个vector配置文件时,唯一名称尤为重要。

完整资源清单

本实验案例所有yaml文件已上传至git仓库。访问地址如下:

github

https://github.com/cuiliang0302/blog-demo

gitee

https://gitee.com/cuiliang0302/blog_demo

参考文档

VRL常用函数:https://vector.dev/docs/reference/vrl/functions/
VRL常用案例:https://vector.dev/docs/reference/vrl/examples/
VRL时间格式化:https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers

查看更多

微信公众号

微信公众号同步更新,欢迎关注微信公众号《崔亮的博客》第一时间获取最近文章。

博客网站

崔亮的博客-专注devops自动化运维,传播优秀it运维技术文章。更多原创运维开发相关文章,欢迎访问https://www.cuiliangblog.cn

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1255130.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C语言做一个恶作剧关机程序

一、项目介绍 C语言实现一个简单的"流氓软件"&#xff0c;一个可以强制关机恶作剧关机程序&#xff0c;输入指定指令可以解除 二、运行截图 然后当你输入“n”才可以解锁关机。 三、完整源码 #include <stdlib.h> #include <stdio.h> #include <s…

字符画制作原理揭秘及代码复现-python

视频讲解链接&#xff1a;https://www.bilibili.com/video/BV1JC4y1d7vb/ 网站原理图像艺术画-代码讲解文字艺术画-代码讲解 字符画的原理 字符画&#xff08;ASCII art&#xff09;是一种通过使用文本字符来创建图像的艺术形式。字符画的原理基于以下概念&#xff1a; 字符…

智能优化算法应用:基于被囊群算法无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用&#xff1a;基于被囊群算法无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用&#xff1a;基于被囊群算法无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.被囊群算法4.实验参数设定5.算法结果6.参考文献7.…

玉渊谭天对电影色彩分析的“蚊香图”复现-python

视频教程链接&#xff1a;https://www.bilibili.com/video/BV1Lu4y1t7FG/ 最终的实现效果如下&#xff1a; 前几天刷抖音刷到了玉渊谭天对于电影抽取画面制作“蚊香图”&#xff0c;相关视频片段如下。 这种制作”蚊香图“的特效当时有点触动到到我&#xff0c;根据色彩来分…

jmeter测试dubbo接口

本文讲解jmeter测试dubbo接口的实现方式&#xff0c;文章以一个dubbo的接口为例子进行讲解&#xff0c;该dubbo接口实现的功能为&#xff1a; 一&#xff1a;首先我们看服务端代码 代码架构为&#xff1a; 1&#xff1a;新建一个maven工程&#xff0c;pom文件为&#xff1a; 1…

PyQt6 QLabel标签控件

​锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计21条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话…

性能测试必学教程之Jmeter:nmon性能系统监控工具

一、Nmon介绍 Nmon得名于 Nigel 的监控器&#xff0c;是IBM的员工 Nigel Griffiths 为 AIX 和 Linux 系统开发的&#xff0c;使用 Nmon 可以很轻松的监控系统的CPU、内存、网络、硬盘、文件系统、NFS、高耗进程、资源和 IBM Power 系统的微分区的信息 Nmon是一款计算机性能系…

Git远程仓库常用开发命令和理解

远程仓库 创建与合并分支 每次提交&#xff0c;Git都把它们串成一条时间线&#xff0c;这条时间线就是一个分支。截止到目前&#xff0c;只有一条时间线&#xff0c;在Git里&#xff0c;这个分支叫主分支&#xff0c;即master分支。 HEAD严格来说不是指向提交&#xff0c;而…

Python教程:DataFrame列数据类型的转换

Pandas提供了多种数据类型转换方法。可以使用astype()函数来转换数据类型。例如&#xff0c;可以将字符串类型的列转换为整数类型的列&#xff1a; # Author : 小红牛 # 微信公众号&#xff1a;wdPython import pandas as pd# 创建包含字符串类型列的DataFrame df pd.DataFra…

Postman如何使用(三):使用数据文件

数据文件是非常强大的方式使用不同的测试数据来测试我们的API&#xff0c;以检查它们是否在各种情况下都能正常运行。我们可以认为数据文件是“Collection Runner”中每个请求的参数。下面&#xff0c;我们通过一个例子来说明如何使用数据文件。 这篇文章需要结合下面两个文件进…

第二十二章 解读pycocotools的API,目标检测mAP的计算COCO的评价指标(工具)

Pycocotools介绍 为使用户更好地使用 COCO数据集, COCO 提供了各种 API。COCO是一个大型的图像数据集&#xff0c;用于目标检测、分割、人的关键点检测、素材分割和标题生成。这个包提供了Matlab、Python和luaapi&#xff0c;这些api有助于在COCO中加载、解析和可视化注释。 …

Linux环境下自动化创建大量的账号

参考《鸟哥的Linux私房菜基础篇第四版》13.7.2节微调而成&#xff1a; 下面脚本的目的是为服务器的管理员自动化创建大量的账号&#xff0c;节省生命。 #!/bin/bash # This shell script will create amount of Linux login accounts for you. # 1. check the "accounta…

ubuntu20.04打不开github网址的有效解决方案

问题描述&#xff1a;重装的ubuntu系统&#xff0c;chrome浏览器刚开始还能打开github网址&#xff0c;然后突然就打不开了&#xff0c;换网络也不行。 解决方案步骤 1&#xff0c;查询你的电脑IP对应的github网址信息 2&#xff0c;修改host文件&#xff0c;添加第1步查询到…

线程的认识,本质,和进程的区别,哪些结构是共享/独立的,切换成本,不同os下的线程

目录 再次认识进程 用户视角 内核视角 引入线程 概念 调度的基本单位 模拟出图像 思考 线程的本质 线程和进程的区别 线程哪些结构是共享的 引入 地址空间 系统资源 ​编辑 线程哪些结构是单独拥有 引入 地址空间 系统资源 线程间切换的成本更低 linu…

【强化学习】动态规划算法实践

文章目录 【强化学习】动态规划算法实践一. 实验过程1.1 Environment1.2 Policy Iteration1.3 Policy Evaluation1.4 Policy Improvement1.5 Value Iteration 二. 实验结果与分析2.1 分析Policy Iteration和Value Iteration收敛误差随着迭代次数的分布曲线 【强化学习】动态规划…

性能测试必会技能之Jmeter:利用jmeter插件收集性能测试结果汇总报告和聚合报告

利用jmeter插件收集性能测试结果 汇总报告&#xff08;Summary Report &#xff09; 用来收集性能测试过程中的请求以及事务各项指标。通过监听器--汇总报告 可以添加该元件。界面如下图所示 汇总报告界面介绍&#xff1a; 所有数据写入一个文件&#xff1a;保存测试结果到本地…

【c++模版】

在计算机科学的世界里&#xff0c;模板是一种非常强大的工具&#xff0c;它允许我们编写通用的代码&#xff0c;这些代码可以处理多种数据类型。在C中&#xff0c;模板是实现泛型编程的主要方式之一。本文将详细介绍C模板的基本概念&#xff0c;并通过一些有趣的例子和现实生活…

Unity 关于Input类的使用

Input类在我们游戏开发中需要获取外设设备&#xff08;比如键盘、鼠标、游戏手柄等&#xff09;进行交互时&#xff0c;基本都会用到。 它主要有以下一些常用的方法。 1、GetKey(KeyCode key)&#xff0c;检测按键是否被按下&#xff1b; 2、GetKeyDown(KeyCode key)&#x…

什么是CAS/CAS的应用/CAS的ABA问题

文章目录 CAS1. 什么是CAS2. CAS的应用2.1 实现原子类2.2 实现自旋锁 3. CAS的ABA问题3.1 什么是ABA问题3.2 ABA问题引来的BUG3.3 解决方案 CAS 1. 什么是CAS CAS: 全称Compare and swap, 字面意思:”比较并交换“. 操作: 设V为内存中的值, A为寄存器中的值(旧的预期值), B也…

无人零售已成为新兴趋势

无人零售已成为新兴趋势 在新零售浪潮中&#xff0c;必然会涌现新的商业形态&#xff0c;而无人零售则是其中典型代表之一。传统零售受制于人力和场地等限制&#xff0c;消费者体验较差&#xff0c;如长时间排队、缓慢结账、距离过远等问题。而无人零售解决方案&#xff0c;包括…