ELK安装、部署、调试(六) logstash的安装和配置

news2025/1/15 20:01:56

1.介绍

Logstash是具有实时流水线能力的开源的数据收集引擎。Logstash可以动态统一不同来源的数据,并将数据标准化到您选择的目标输出。它提供了大量插件,可帮助我们解析,丰富,转换和缓冲任何类型的数据。

管道(Logstash Pipeline)是Logstash中独立的运行单元,每个管道都包含两个必须的元素输入(input)和输出(output),和一个可选的元素过滤器(filter),事件处理管道负责协调它们的执行。 输入和输出支持编解码器,使您可以在数据进入或退出管道时对其进行编码或解码,而不必使用单独的过滤器。如:json、multiline等

inputs(输入阶段):

会生成事件。包括:file、kafka、beats等

filters(过滤器阶段):

可以将过滤器和条件语句结合使用对事件进行处理。包括:grok、mutate等

outputs(输出阶段):

将事件数据发送到特定的目的地,完成了所以输出处理,改事件就完成了执行。如:elasticsearch、file等

Codecs(解码器):

基本上是流过滤器,作为输入和输出的一部分进行操作,可以轻松地将消息的传输与序列化过程分开。

logstash为开源的数据处理pipeline管道,能接受多个源的数据,然后进行转换,再发送到指定目的地,logstash通过插件的机
制实现各种功能插件的下载地址为https://github.com/logstash-plugins
logstash 主要用于接收数据,解析过滤并转化,输出数据三个部分
对应的插件为input插件,filter插件,output插件

ingpu插件介绍,主要接收数据,支持多少数据源。
支持file : 读取一个文件,类似于tail命令,一行一行的实施读取。
支持syslog:监听系统的514端口的syslog messages,使用RFC3164格式进行解析
redis: 可以从Redis服务器读取数据,此时redis类似于一个消息缓存组件
kafka: 从kafka集群读取数据,kafka和logstash架构一般用于数据量较大的场景,kafka用于数据的换从和存储
filebeat: filebeat为一个文本的日志收集器,占系统资源小


filter插件介绍,主要进行数据的过滤,解析,格式化
grok:是logstash最重要的插件,可以解析任意数据,支持正则表达式,并提供很多内置的规则及模板
mutate 提供丰富的技术类型数据处理能力。包括类型转换,字符串处理,字段处理等
date,转化日志记录中的时间字符串格式
Geoip:更具ip地址提供对应的地域信息。包括国,省,经纬度等,对于可视化地图和区域统计非常有用。

output插件,输入日志到目的地
elasticsearch :发动到es中
file:发动数据到文件中
redis :发送日志到redis
kafka:发送日志到kafka

2.下载 

https://www.elastic.co/cn/downloads/logstash
百度云elk上有

使用向导的连接
https://www.elastic.co/guide/en/logstash/7.9/installing-logstash.html#package-repositories

3.安装

logstash
 tar  zxvf logstash-7.9.3.tar.gz -C /usr/local/
 cd /usr/local/ 
 mv logstash-7.9.3 logstash
 logstash/bin/logstash为启动文件

4.启动

cd /usr/local/logstash/bin
./logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'

后台运行

nohup logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}' &


input 使用input插件
stdin{} 采用标准的终端输入,如键盘输入

output 使用output插件
stdout 为标准输出
codec 插件,定义输出格式
rubydebug 一般输出为jison格式的测试

5.测试

5.1 测试1:采用标准的输入和输出

root@localhost bin]# ./logstash -e 'input{stdin{}} output{stdout{codec=>rubydebug}}'

Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2023-08-29T10:22:35,442][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.9.3",
"jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc OpenJDK 64-Bit Server VM 25.242-b08 on 1.8.0_242
-b08 +indy +jit [linux-x86_64]"}
[2023-08-29T10:22:36,612][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because
modules or command line options are specified
[2023-08-29T10:22:40,097][INFO ][org.reflections.Reflections] Reflections took 77 ms to scan 1 urls, producing
22 keys and 45 values
[2023-08-29T10:22:42,937][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main",
"pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500,
"pipeline.sources"=>["config string"], :thread=>"#<Thread:0xdd19934 run>"}
[2023-08-29T10:22:44,380][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time
{"seconds"=>1.41}
[2023-08-29T10:22:44,466][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2023-08-29T10:22:44,590][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>
[:main], :non_running_pipelines=>[]}
[2023-08-29T10:22:45,030][INFO ][logstash.agent           ] Successfully started Logstash API endpoint
{:port=>9600}
hello             #我用键盘输入的,下面的信息是logstash的输出。
{
       "message" => "hello",
      "@version" => "1",
          "host" => "localhost.localdomain",
    "@timestamp" => 2023-08-29T02:22:53.965Z
}


5.2 测试2:使用配置文件 +标准输入输出

参见logstash-1.conf
内容如下

[root@localhost logstash]# cat logstash-1.conf
input { stdin { }
}

output {
   stdout { codec => rubydebug }
}


[root@localhost logstash]#
执行
 ./logstash -f ../logstash-1.conf

[root@localhost bin]# ./logstash -f ../logstash-1.conf
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2023-08-29T10:30:50,169][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.9.3",
"jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc OpenJDK 64-Bit Server VM 25.242-b08 on 1.8.0_242
-b08 +indy +jit [linux-x86_64]"}
[2023-08-29T10:30:51,542][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because
modules or command line options are specified
[2023-08-29T10:30:55,077][INFO ][org.reflections.Reflections] Reflections took 82 ms to scan 1 urls, producing
22 keys and 45 values
[2023-08-29T10:30:58,205][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main",
"pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500,
"pipeline.sources"=>["/usr/local/logstash/logstash-1.conf"], :thread=>"#<Thread:0x7b58200e run>"}
[2023-08-29T10:30:59,713][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time
{"seconds"=>1.48}
[2023-08-29T10:30:59,829][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
The stdin plugin is now waiting for input:
[2023-08-29T10:31:00,060][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>
[:main], :non_running_pipelines=>[]}
[2023-08-29T10:31:00,664][INFO ][logstash.agent           ] Successfully started Logstash API endpoint
{:port=>9600}

5.3 测试三:配置文件+file输入 +标准的屏幕输出

vi logstash-2.conf

input {
  file {
    path => "/var/log/messages"
  }
}
output {
  stdout {
    codec=>rubydebug
  }
}

[root@localhost logstash]# vi logstash-2.conf

[root@localhost logstash]# ./bin/logstash -f logstash-2.conf
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2023-08-29T10:36:25,171][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.9.3",
"jruby.version"=>"jruby 9.2.13.0 (2.5.7) 2020-08-03 9a89c94bcc OpenJDK 64-Bit Server VM 25.242-b08 on 1.8.0_242
-b08 +indy +jit [linux-x86_64]"}

[2023-08-29T10:36:37,490][INFO ][logstash.agent           ] Successfully started Logstash API endpoint
{:port=>9600}
{
    "@timestamp" => 2023-08-29T02:37:14.162Z,
          "path" => "/var/log/messages",
          "host" => "localhost.localdomain",
       "message" => "Aug 29 10:37:13 localhost systemd-logind: Removed session 992.",
      "@version" => "1"
}
{
    "@timestamp" => 2023-08-29T02:37:14.231Z,
          "path" => "/var/log/messages",
          "host" => "localhost.localdomain",
       "message" => "Aug 29 10:37:13 localhost systemd-logind: Removed session 993.",
      "@version" => "1"
}

5.4 测试4:配置文件+文件输入+kafka输出

[root@localhost logstash]# cat logstash-3.conf

input {
  file {
    path => "/var/log/messages"

  }
}
output {
  kafka {
  bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"
  topic_id => "osmessages"

  }
}
[root@localhost logstash]# ./bin/logstash -f logstash-3.conf

到10.10.10.71上用kafka消费

./kafka-console-consumer.sh  --bootstrap-server 10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092 --topic
osmessages

有信息输出,

2023-08-29T02:49:42.383Z localhost.localdomain Aug 29 10:49:41 localhost systemd-logind: New session 996 of user
root.
2023-08-29T02:49:42.300Z localhost.localdomain Aug 29 10:49:41 localhost systemd-logind: New session 997 of user
root.
2023-08-29T02:49:42.380Z localhost.localdomain Aug 29 10:49:41 localhost systemd: Started Session 997 of user
root.
2023-08-29T02:49:42.384Z localhost.localdomain Aug 29 10:49:41 localhost systemd: Started Session 996 of user
root.
2023-08-29T02:49:43.465Z localhost.localdomain Aug 29 10:49:42 localhost dbus[745]: [system] Activating service
name='org.freedesktop.problems' (using servicehelper)
2023-08-29T02:49:43.467Z localhost.localdomain Aug 29 10:49:42 localhost dbus[745]: [system] Successfully
activated service 'org.freedesktop.problems'
2023-08-29T02:50:01.531Z localhost.localdomain Aug 29 10:50:01 localhost systemd: Started Session 998 of user
root.


5.5 测试5:配置文件+filebeat端口输入+标准输出

服务器产生日志(filebeat)---》logstash服务器

[root@localhost logstash]# cat logstash-4.conf
input {
  beats {
    port => 5044

  }
}
output {
  stdout {
    codec => rubydebug
  }
}
[root@localhost logstash]# ./bin/logstash -f logstash-4.conf

启动后会在本机启动一个5044端口,此段欧在配置文件中可
以任意配置。不要和系统已启动的端口冲突即可

[root@localhost logstash]# netstat -anp | grep 5044
tcp6       0      0 :::5044                 :::*                    LISTEN      10545/java
[root@localhost logstash]#

配合测试我们在filebeat服务器上修改配置文件

# ------------------------------ Logstash Output -------------------------------
output.logstash:
  hosts: ["10.10.10.74:5044"]      

   

##filebeat使用logstash作为输出,

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications 
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/ce

修改filebeat配置后,重启filebeat

nohup /usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat-to-logstash.yml &

经过测试在logstash下看到了filebeat的日志内容。


5.6测试6   配置文件+filebeat端口输入+输出到kafka

场景
服务器产生日志(filebeat)--->  logstash服务器---->kafka服务器

[root@localhost logstash]# cat logstash-5.conf
input {
  beats {
    port => 5044

  }
}
output {
  kafka {
  bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"
  topic_id => "osmessages"

  }
}

filebeat服务器:
filebeat.yml配置同5

# ------------------------------ Logstash Output -------------------------------
output.logstash:
  hosts: ["10.10.10.74:5045"]   

 


logstash服务器:
配置

[root@localhost logstash]# cat logstash-5.conf
input {
  beats {
    port => 5044

  }
}
output {
  kafka {
#  codec ==> json                 #打开注释将会使用json格式传输,使用json感觉更乱
  bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"
  topic_id => "osmessages"

  }
}
./bin/logstash -f logstash-5.conf

kafka服务器:

./kafka-console-consumer.sh  --bootstrap-server 10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092 --topic
osmessages


消费了一下日志,可以看到filebeat服务器同步过来的信息。


5.7 测试7 配置文件+kafka读取当输入+输出到es


服务器产生日志(filebeat)--->  kafka服务器__抽取数据___> logstash服务器---->ESlogstash的配置
[root@localhost logstash]# cat f-kafka-logs-es.conf

input {
    kafka {
    bootstrap_servers => "10.10.10.71:9092,10.10.10.72:9092,10.10.10.73:9092"
    topics => ["osmessages"]
    }
}
output {
    elasticsearch {
    hosts => ["10.10.10.65:9200,10.10.10.66:9200,10.10.10.67:9200"]
    index => "osmessageslog-%{+YYYY-MM-dd}"

  }
}


./bin/logstash -f f-kafka-logs-es.conf
nohup /usr/local/logstash/bin logstash -f /usr/local/logstash/f-kafka-logs-es.conf


filebeat.yml配置

# ------------------------------ KAFKA Output -------------------------------
output.kafka:
  eanbled: true
  hosts: ["10.10.10.71:9092","10.10.10.72:9092","10.10.10.73:9092"]
  version: "2.0.1"
  topic: '%{[fields][log_topic]}'
  partition.round_robin:
    reachable_only: true
  worker: 2
  required_acks: 1
  compression: gzip
  max_message_bytes: 10000000


 nohup /usr/local/filebeat/filebeat -e -c /usr/local/filebeat/filebeat.yml &
完成到kafka的输出

完成

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/952370.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WebGIS的一些学习笔记

一、简述计算机网络的Internet 概念、网络类型分类、基本特征和功用是什么 计算机网络的Internet 概念 计算机网络是地理上分散的多台独立自主的计算机遵循约定的通讯协议&#xff0c;通过软、硬件互连以实现交互通信、资源共享、信息交换、协同工作以及在线处理等功能的系统…

基于深度学习的AI生成式人脸图像鉴别

AIGC&#xff08;AI内容生成&#xff09;技术的快速发展确实为创作者提供了高效生产力工具&#xff0c;但同时也引发了一些问题和挑战。这些技术可以生成以假乱真的图像、视频换脸等&#xff0c;给不法分子提供了滥用的机会。其中&#xff0c;一些不法分子可能利用AIGC技术制造…

JZ13 机器人的运动范围

题目描述&#xff1a; 思路&#xff1a;使用深度优先&#xff08;dfs&#xff09;搜索方法 从[0,0]开始&#xff0c;每次选择一个方向开始检查能否访问&#xff0c;如果能访问进入该节点&#xff0c;该节点作为子问题&#xff0c;继续按照这个思路访问&#xff0c;一条路走到…

verilator的安装

出现libgz安装包找不到的问题

官方推荐使用的OkHttp4网络请求库全面解析(Android篇)

作者&#xff1a;cofbro 前言 现在谈起网络请求&#xff0c;大家肯定下意识想到的就是 okhttp 或者 retrofit 这样的三方请求库。诚然&#xff0c;现在有越来越多的三方库帮助着我们快速开发&#xff0c;但是对于现在的程序员来说&#xff0c;我们不仅要学会如何去用&#xff…

(牛客周赛 9)C.小美的01串翻转

题目&#xff1a; 样例&#xff1a; 输入 10001 输出 8 思路&#xff1a; 这里是连续的找子串&#xff0c;权值的意思是 我们取反操作了多少次&#xff0c; 我们有假设长度是 5 &#xff0c;字符串是 10001 那么相邻不一样的字符串有两种情况 01010 或者 10101&#xf…

【项目】Reactor模式的服务器

目录 Reactor完整代码连接 前置知识&#xff1a; 1.普通的epoll读写有什么问题&#xff1f; 2.Connection内的回调函数是什么 3.服务器的初始化&#xff08;Connection只是使用的一个结构体&#xff09; 4.等待就绪事件&#xff1a;有事件就绪&#xff0c;对使用Connectio…

MATLAB实现AHP层次分析法——以情人节选取礼物为例

问题背景&#xff1a; 情人节来临之际&#xff0c;广大直男&#xff08;女&#xff09;同胞在给异性朋友选购礼物时会遇到难题——什么才是礼物好坏最重要的标准&#xff1f;基于层次分析法AHP进行计算&#xff0c;得出最高权重的指标&#xff0c;给出各位朋友选购礼物的一种思…

Vector<T> 动态数组(模板语法)

C数据结构与算法 目录 本文前驱课程 1 C自学精简教程 目录(必读) 2 动态数组 Vector&#xff08;难度1&#xff09; 其中&#xff0c;2 是 1 中的一个作业。2 中详细讲解了动态数组实现的基本原理。 本文目标 1 学会写基本的C类模板语法&#xff1b; 2 为以后熟练使用 S…

java.lang.classnotfoundexception: com.android.tools.lint.client.api.vendor

Unity Android studio打包报错修复 解决方式 java.lang.classnotfoundexception: com.android.tools.lint.client.api.vendor 解决方式 在 launcherTemplate 目录下找到 Android/lintOptions 选项 加上 checkReleaseBuilds false lintOptions { abortOnError false checkRelea…

以GitFlow分支模型为基准的Git版本分支管理流程

以GitFlow分支模型为基准的Git版本分支管理流程 文章目录 以GitFlow分支模型为基准的Git版本分支管理流程GitFlow分支模型中的主要概念GitFlow的分支管理流程图版本号说明借助插件Git Flow Integration Plus实现分支模型管理其他模型TBD模型阿里AoneFlow模型 GitFlow分支模型中…

设计模式的使用——建造者模式+适配器模式

项目代码地址 一、需求介绍 现公司数据库有一张表中的数据&#xff0c;需要通过外部接口将数据推送到别人的系统中。现有的问题是&#xff1a; 数据字段太多&#xff0c;而且双方系统实体字段不一致&#xff0c;每次都要通过get、set方法去对数据取值然后重新赋值。如果后期需…

使用php实现微信登录其实并不难,可以简单地分为三步进行

使用php实现微信登录其实并不难&#xff0c;可以简单地分为三步进行。 第一步&#xff1a;用户同意授权&#xff0c;获取code //微信登录public function wxlogin(){$appid "";$secret "";$str"http://***.***.com/getToken";$redirect_uriu…

家政电子邮件营销怎么做?邮件营销的方案?

家政电子邮件营销的作用&#xff1f;企业如何利用营销邮件拓客&#xff1f; 随着科技的不断发展&#xff0c;家政服务行业也逐渐融入了电子邮件营销的方式&#xff0c;这为家政企业提供了与客户更紧密互动的机会。在本文中&#xff0c;我们将探讨家政电子邮件营销的几个关键步…

OLED透明屏显示技术:未来显示科技的领航者

OLED透明屏显示技术是一种创新性的显示技术&#xff0c;它的特殊性质使其成为未来显示科技的领航者。 OLED透明屏具有高对比度、快速响应时间、广视角和低功耗等优势&#xff0c;同时&#xff0c;其透明度、柔性和薄型设计使其成为创新设计的理想选择。 本文将深入探讨OLED透…

从零做软件开发项目系列之九——项目结项

前言 一个项目&#xff0c;经过前期的需求调研分析&#xff0c;软件设计&#xff0c;程序开发&#xff0c;软件测试、系统部署、试运行系统调试等过程&#xff0c;最后到了项目的验收阶段&#xff0c;也就是项目生命周期的最后一个阶段&#xff0c;即项目结项&#xff0c;它涉…

什么是浏览器缓存(browser caching)?如何使用HTTP头来控制缓存?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 浏览器缓存和HTTP头控制缓存⭐ HTTP头控制缓存1. Cache-Control2. Expires3. Last-Modified 和 If-Modified-Since4. ETag 和 If-None-Match ⭐ 缓存策略⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击…

什么是同源策略(same-origin policy)?它对AJAX有什么影响?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 同源策略&#xff08;Same-Origin Policy&#xff09;与 AJAX 影响⭐ 同源策略的限制⭐ AJAX 请求受同源策略影响⭐ 跨域资源共享&#xff08;CORS&#xff09;⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记…

rrpc:实现熔断与限流

一、服务端的自我保护&#xff08;实现限流&#xff09; 为什么需要限流器&#xff1f; 我们先看服务端&#xff0c;举个例子&#xff0c;假如我们要发布一个 Rrpc 服务&#xff0c;作为服务端接收调用端发送过来的请求&#xff0c;这时服务端的某个节点负载压力过高了&#x…

ARM DIY(五)摄像头调试

前言 今天&#xff0c;就着摄像头的调试&#xff0c;从嵌入式工程师的角度&#xff0c;介绍如何从无到有&#xff0c;一步一步地调出一款设备。 摄像头型号&#xff1a;OV2640 开发步骤 分为 2 个阶段 5 个步骤 阶段一&#xff1a; 设备树、驱动、硬件 阶段二&#xff1a; 应…