前言:本博客仅作记录学习使用,部分图片出自网络,如有侵犯您的权益,请联系删除
学习B站博主教程笔记:
最新版适合自学的ElasticStack全套视频(Elk零基础入门到精通教程)Linux运维必备—ElasticSearch+Logstash+Kibana精讲_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1VMW3e6Ezk/?spm_id_from=333.1007.tianma.1-1-1.click&vd_source=e539f90574cdb0bc2bc30a8b5cb3fc00
一、部署logstash环境
# 官方下载搜索地址
https://www.elastic.co/cn/downloads/past-releases#elasticsearch
# 使用wget在master节点进行下载安装
wget https://artifacts.elastic.co/downloads/logstash/logstash-7.17.3-x86_64.rpm
yum -y localinstall logstash-7.17.3-x86_64.rpm
创建软链接:
[root@k8s-master ~]# systemctl cat logstash
...
ExecStart=/usr/share/logstash/bin/logstash "--path.settings" "/etc/logstash"
...
# 创建一个软链接方便操作
ln -sv /usr/share/logstash/bin/logstash /usr/local/bin
logstash -h # 检查
# 创建一个用于存放配置文件的目录
[root@k8s-master ~]# mkdir config-logstash
二、input插件
1、收集标准输入
#(1)编写配置文件
# vim config-logstash/01--sdtin-to-stdout.yml
input { stdin {} }
output { stdout {} }
#(2)检查配置文件语法
logstash -tf config-logstash/01-stdin-to-stdout.conf
#(3)启动logstash实例
logstash -f config-logstash/01-stdin-to-stdout.conf
2、文件输入插件
input {
file {
#指定文件路径
path => ["/tmp/test/*.txt"]
#指定文件的读取位置,仅在".sincedb*"文件中没有记录的情况下生效
#默认从末尾开始读取
start_position => "beginning" # /"end"
}
}
output { stdout {} }
选择 Logstash 开始初始读取文件的位置:在开始时或在最后。默认行为将文件视为实时流,因此从最后开始
3、tcp输入插件,实现日志聚合
input { tcp { port => 8888 }}
output { stdout {} }
4、基于http案例
input {
http { port => 8888 }
http { port => 9999 }
}
output { stdout {} }
5、input插件基于redis案例
input {
redis {
data_type => "list" # 指定的redis的键的类型
db => 5 # 指定数据库的编号,默认值是0号数据库
host => "192.168.1.10" # 指定数据库的IP地址,默认值是localhost
port => 6379 # 指定数据库的端口号,默认值为6379
password => "cluster" # 指定redis的认证密码
key => "xxxx" # 指定从redis的哪个key取数据
}
http { port => 8888 }}
output { stdout {} }
6、input插件基于beats案例
# filebeat配置:
filebeat.inputs:
- type: tcp
host: "0.0.0.0:9000"
output.logstash:
hosts: ["192.168.1.10:5044"]
# logstash配置:
input { beats { port => 5044 } }
output { stdout {} }
三、output插件
1、logstash发送数据到redis环境
(1)编写配置文件
input { tcp { port => 9999 } }
ouput {
stdout {}
redis {
host => "192.168.1.10" # 指定redis的主机地址
port => "6379" # 指定redis的端口号
db => 10 # 指定redis的数据库编号
password => "cluster" # 指定redis的密码
data_type => "list" # 指定写入数据的key类型
key => "cluster-linux-logstash" # 指定写入的key名称
}
}
(2)启动测试
logstash -f config/tcp-to-redis.conf
# 发送测试数据
echo 111111 | nc 192.168.1.10 9999
# 在redis中查看数据
192.168.1.10:6379[10]>KEYS *
1) "cluster-linux-logstash"
192.168.1.10:6379[10]>type cluster-linux-logstash
list
2、logstash输出插件file案例
(1)编写配置文件
input { tcp { port => 9999 } }
output {
stdout{}
file {
path => "/tmp/cluster-linux-logstash.log" # 指定磁盘的落地位置
}
}
(2)测试运行
logstash -f /config-logstash/tcp-to-file.conf
# 同样的对9999端口发送测试数据
echo 老男孩教育 | nc 192.168.1.10 9999
cat /tmp/cluster-linux-logstash.log
...
3、logstash的输出插件到ES案例
(1)配置文件编写
input { tcp { port => 9999 } }
output {
stdout{}
elasticsearch {
hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]
index => "cluster-linux-logstash-%{+yyyy.MM.dd}"
}
}
(2)启动测试
logstash -rf config-logstash/tcp-to-es.conf
四、logstash综合案例
1、根据架构图收集日志
# many-to-es.conf
input {
tcp {
type => "tcp"
port => 6666
}
beats {
type => "beat"
port => 7777
}
redis {
type => "redis"
data_type => "list"
db => 5
host => "192.168.1.10"
port => "6379"
password => "cluster"
key => "cluster-linux-filebeat"
}
}
output {
# stdout{} # 生产环境注释,输出到屏幕(调试使用)
if [type] == "tcp" {
elasticsearch {
hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]
index => "cluster-linux-tcp-%{+yyyy.MM.dd}"
}
}else if [type] == "beat" {
elasticsearch {
hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]
index => "cluster-linux-beat-%{+yyyy.MM.dd}"
}
}else if [type] == "redis" {
elasticsearch {
hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]
index => "cluster-linux-redis-%{+yyyy.MM.dd}"
}
}else {
elasticsearch {
hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]
index => "cluster-linux-other-%{+yyyy.MM.dd}"
}
}
}
# 收集9999端口服务的日志信息:tcp-to-logstash.yml
filebeat.inputs:
- type: tcp
host: "0.0.0.0:9999"
output.logstash:
host: ["192.168.1.10:7777"]
# 收集redis服务端口日志信息:tcp-to-redis.yml
filebeat.inputs:
- type: tcp
host: "0.0.0.0:8888"
output.redis:
hosts: ["192.168.1.10:6379"]
password: "cluster"
db: 5
key: "cluster-linux-filebeat"
timeout: 3
启动测试:
filebeat -e -c tcp-to-logstash.yml
filebeat -e -c tcp-to-redis.yml --path.data /tmp/filebeat/
logstash -rf many-to-es.conf
2、收集Nginx与Tomcat日志
(1)收集Nginx日志到redis步骤:
#1、nginx-to-redis.yml
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log*
json.keys_under_root: true
output.redis:
hosts: ["192.168.1.10:6379"]
password: "cluster"
db: 8
key: "cluster-linux-filebeat"
timeout: 3
#2、运行测试
[root@k8s-node1 config]# rm -rf /var/lib/filebeat/*
[root@k8s-node1 config]# filebeat -e -c /etc/filebeat/config/32-nginx-to-redis.yml
[root@k8s-master ~]# redis-cli -a cluster -n 8
127.0.0.1:6379[8]> keys *
1) "cluster-linux-filebeat"
127.0.0.1:6379[8]> lrange cluster-linux-filebeat 0 -1
1) "{\"@timestamp\":\"202... # 发现数据已经送达redis
(2)logstash收集日志到ES
# many-to-es.conf
input {
beats {
port => 8888
}
redis {
data_type => "list"
db => 8
host => "192.168.1.10"
port => 6379
password => "cluster"
key => "cluster-linux-filebeat"
}
}
output {
stdout{}
elasticsearch {
hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]
index => "cluster-linux-logstash-%{+yyyy.MM.dd}"
}
}
#运行测试
[root@k8s-master ~]# logstash -f config-logstash/11-many-to-es.conf
(3)收集Tomcat日志到logstash:
#1、tomcat-to-logstash.yml
filebeat.inputs:
- type: log
paths:
- /root/software/apache-tomcat-10.1.28/logs/*.txt
json.keys_under_root: true
output.logstash:
hosts: ["192.168.1.10:8888"]
#2、运行测试
[root@k8s-node1 config]# filebeat -e -c /etc/filebeat/config/33-tomcat-to-logstash.yml --path.data /tmp/filebeat/
五、filter插件
1、gork
1.1、基本的gork使用
Grok是将非结构化日志数据解析为结构化和可查询的好方法;非常适合syslog日志、apache和其他网络服务器日志、Mysql日志,以及通常为人类而非计算机消耗而编写的任何日志格式。内置120种匹配模式,当然也可自定义匹配模式;
(1)还原nginx日志格式,使用原生日志
vim /etc/ngin/nginx.conf
(2)编写filebeat收集Nginx原生日志文件输出到logstash的8888端口文件:
# nginx-to-logstash.yml文件内容如下:
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log
output.logstash:
hosts: ["192.168.1.10:8888"]
(3)编写logstash收集8888端口收到的文件并发送的ES集群中文件:
使用grok过滤器对原生日志进行拆分;
# beat-to-es.conf文件内容如下:
input {
beats {
port => 8888
}
}
filter {
grok {
match => {
# "message" => "%{COMBINEDAPACHELOG}" # 官方GitHub已经废弃,建议使用下面的匹配模式
"message" => "%{HTTPD_COMMONLOG}"
}
}
}
output {
stdout {}
elasticsearch {
hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]
index => "cluster-linux-logstash-%{-yyyy.MM.dd}"
}
}
(4)运行测试
filebeat -e -c /etc/config/nginx-to-logstash.yml
logstash -rf config-logstash/beat-to-es.conf
1.2、gork自定义的正则案例(知识储备)
有时候logstash没有我们需要的模式;我们自定义方法为:
首先使用 Oniguruma syntax 进行命名捕获,可让我们匹配一段文本并将其保存为字段:
(?<field_name>这里的模式)
# 例如,后缀日志有queue id一个10或11个字符的十六进制值,我们可以
(?<queue_id>[0 - 9A - F]{10,11})
或者创建自定义模式文件:
- 创建一个目录,包含一个名为patterns的文件extra
-
在该文件中,将需要的模式写为模式名称、一个空格,然后是该模式的正则表达式。例如上述可以写成:
# ./patterns/posifix的内容:
POSITION_QUEUEID [0 - 9A - F]{10,11}
然后使用patterns_dir这个插件中的设置告诉logstash你的自定义模式目录在哪。完整示例:
Jan 1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>
filter {
grok {
patterns_dir => ["./patterns"]
match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
}
}
这会匹配并生成出一下字段:
timestamp: Jan 1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965
syslog_message: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>
1.3、remove_field与add_field子弹
如果此筛选器成功,将此事件中删除任意字段。 字段名称可以是动态的,并使用 %{field} 包含事件的一部分 例:
filter {
grok {
# 移除指定的字段
remove_field => [ "需要移除的字段1", "需要移除的字段2" ]
# 添加指定的字段
add_filed => {
"oldboyedu-clientip" => "clientip ---> %{clientip}"
"school" => "北京大学"
}
}
}
1.4、add_tag与remove_tag字段
添加与移除tag标签:
filter {
grok {
# 添加tag
add_tag => [ "linux","zookeeper","kafka","elk" ]
# 移除tag
remove_tag => [ "zookeeper","kafka" ]
}
}
1.5、id字段
ID为插件配置添加一个唯一的。若有两个或多个相同类型的插件时建议使用。
filter {
grok {
...
id => "nginx"
}
}
2、date插件修改写入ES的时间
filter {
grok {...}
date {
# 两种写法均可
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"]
match => ["timestamp","dd/MMM/yyyy:HH:mm:ss +0800"]
# 可选项;
timezone => "Asia/Shanghai"
# 将匹配到的时间字段解析后存储到目标字段,若不指定,则默认字段为"@timestamp"字段
target => "cluster-linux-access-time"
}
}
3、geoip分析源地址的地理位置
geoip字段对IP地址进行分析
filter {
grok {..}
date {..}
geoip {
# 指定基于哪个IP进行分析
source => "clientip"
# 可设置只展示指定字段,若不设置,表示显示所有的查询字段
fields => ["city_name","country_name","ip"]
# 指定geoip的输出字段,对多个IP地址进行分析
target => "cluster-linux-geoip"
}
}
4、useragent分析客户端的设备类型
filter {
date {..}
geoip {..}
useragent {
source => "http_user_agent" # 指定客户端的设备相关信息的字段
target => "cluster-linux-useragent" # 将分析的数据存储在一个指定的字段中,若不指定,则默认存储在target字段中
}
}
5、mutate组件数据准备
(1)python脚本准备数据
cat > generate_log.py <<EOF
#!/usr/bin/env/ python
import datetime
import random
import logging
import time
import sys
LOG_FORMAT = "%(levelname)s %(asctime)s [com.oldboyedu.%(module)s] - %(message)s "
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
# 配置root的logging.Logger实例的基本配置
logging.basicConfig(level=logging.INFO,format=LOG_FORMAT,detefmt=DATE_FORMAT,filename=sys.argv[1],filemode='a',)
actions = ["浏览页面","评论商品","加入收藏","加入购物车","提交订单","使用优惠券","领取优惠券","搜索","查看订单","付款","清空购物车"]
while True:
time.sleep(random.randint(1,5))
user_id = random.randint(1,10000)
price = round(random.uniform(15000,30000),2)
action = random.choice(actions)
svip = random.choice([0,1])
logging.info("DAU|{0}|{1}|{2}".format(user_id,action,svip,price))
EOF
# 运行、生成数据
nohup python generate_log.py /tmp/app.log &>/dev/null &
(2)编写filebeat文件收集日志,并发送到logstash服务器的8888端口,以便接收:
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/app.log*
output.logstash:
hosts: ["192.168.1.10:8888"]
(3)在logstash服务器编写文件收集日志并发送到ES
input {
beats {
port => 8888
}
}
filter {
mutate {
add_field => { "school" => "B站大学老男孩IT教育" }
remove_field => [ "@timestamp","agent","host","@version","ecs","tags","input","log"] }
# 对"message"字段内容使用"|"进行切分"
mutate { split => { "message" => "|" }}
mutate {
# 添加字段,其中引用到了变量
add_field => {
"user_id" => "%{[message][1]}"
"action" => "%{[message][2]}"
"svip" => "%{[message][3]}"
"price" => "%{[message][4]}"
}
}
mutate {
# 将指定字段转换成相应的数据类型
convert => {
"user_id" => "integer"
"svip" => "boolean"
"price" => "float"
}
}
mutate { strip => ["svip"] } # 去除空格
mutate { copy => { "price" => "cluster-linux-price" } } #拷贝字段
mutate { rename => { "message" => "cluster-ssvip" }} #重命名
mutate { replace => { "message" => "%{message}:My new message"}} #替换
mutate { uppercase => [ "message" ]} # 首字母大写
}
output {
stdout {}
elasticsearch {
hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]
}
}
6、多if分支判断
(1)架构图如下:
(2)通过filebeat收集nginx的access.log日志:
filebeat.inputs:
- type: log
paths:
- /var/log/nginx/access.log*
json.keys_under_root: true
output.logstash:
hosts: ["192.168.1.10:8888"]
(2)logstash收集端口日志文件如下:
input {
beats {
type => "cluster-beats"
port => 8888
}
tcp {
type => "cluster-tcp"
port => 9999
}
tcp {
type => "cluster-tcp-new"
port => 7777
}
tcp {
type => "cluster-http"
port => 6666
}
tcp {
type => "cluster-file"
path => "/tmp/apps.log"
}
}
filter {
mutate {
add_field => {
"school" => "B站大学老男孩IT"
}
}
if [type] == ["cluster-beats","cluster-tcp-new","cluster-http"] {
mutate {
remove_field => ["agent","host","@version","ecs","tags","input","log"]
}
geoip {
source => "clientip"
target => "cluster-linux-geoip"
}
useragent {
source => "http_user_agent"
target => "cluster-linux-useragent"
} else if [type] == "cluster-file" {
mutate {
add_field => {
"class" => "cluster-linux80"
"address" => "xxx"
"hobby" => ["LOL","王者荣耀"]
}
remove_field => ["host","@version","school"]
}
}
} else {
mutate {
remove_field => ["port","@version","host"]
}
mutate {
split => { "message" => "|" }}
add_field => {
"user_id" => "%{[message][1]}"
"action" => "%{[message][2]}"
"svip" => "%{[message][3]}"
"price" => "%{[message][4]}"
}
remove_field => ["message"]
strip => ["svip"]
}
mutate {
# 将指定字段转换成相应的数据类型
convert => {
"user_id" => "integer"
"svip" => "boolean"
"price" => "float"
}
}
}
}
output {
stdout {}
if [type] == "cluster-beats" {
elasticsearch {
hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]
index => "cluster-linux-logstash-beats-%{+YYYY.MM.dd}"
}
} else {
elasticsearch {
hosts => ["192.168.1.10:9200","192.168.1.11:9200","192.168.1.12:9200"]
index => "cluster-linux-logstash-tcp-%{+YYYY.MM.dd}"
}
}
}
(3)通过filebeat收集app.log的日志:
filebeat.inputs:
- type: log
enabled: true
paths:
- /tmp/app.log*
output.logstash:
hosts: ["192.168.1.10:9999"]
六、Kibana的样例数添加
...