文章目录
- 一、什么是filebeat
- 二、Filebeat的工作原理
- 2.1 filebeat的构成
- 2.1.1 Prospector 组件
- 2.1.2 Harvester 组件
- 2.2 filebeat如何保存文件的状态
- 2.3 filebeat何如保证至少一次数据消费
- 三、Filebeat配置文件
- 四、filebeat对比fluented
- 五、Filebeat的部署安装
- 5.1裸金属安装
- 5.1.1 压缩包方式安装
- 5.1.2 yum方式安装
- 5.2 K8s安装
- 5.3 基本命令
- 六、Filebeat实践
- 6.1 日志收集架构
- 6.2 收集nginx日志
- 6.3 收集tomcat日志
- 七、常见问题及解决
- 7.1 配置文件问题
- 7.2 Too many open file handlers
- 7.3 Filebeat使用了太多的CPU?
- 7.4 重新收集日志
一、什么是filebeat
Filebeat是用于转发和集中日志数据的轻量级传送程序。作为服务器上的代理安装,Filebeat监视您指定的日志文件或位置,收集日志事件,并将它们转发到Elasticsearch或Logstash等进行索引。
工作的流程图如下:
二、Filebeat的工作原理
2.1 filebeat的构成
Filebeat由两个主要组件组成:prospector和harvester。
这些组件一起工作来读取文件(tail file)并将事件数据发送到您指定的输出启动Filebeat时,它会启动一个或多个查找器,查看您为日志文件指定的本地路径。对于prospector所在的每个日志文件,prospector启动harvester。每个harvester都会为新内容读取单个日志文件,并将新日志数据发送到libbeat,后者将聚合事件并将聚合数据发送到您为Filebeat配置的输出。
2.1.1 Prospector 组件
harvester:负责读取单个文件的内容。读取每个文件,并将内容发送到the output
每个文件启动一个harvester,harvester负责打开和关闭文件,这意味着在运行时文件描述符保持打开状态
如果文件在读取时被删除或重命名,Filebeat将继续读取文件。
这有副作用,即在harvester关闭之前,磁盘上的空间被保留。默认情况下,Filebeat将文件保持打开状态,直到达到close_inactive状态
关闭harvester会产生以下结果:
1)如果在harvester仍在读取文件时文件被删除,则关闭文件句柄,释放底层资源。
2)文件的采集只会在scan_frequency过后重新开始。
3)如果在harvester关闭的情况下移动或移除文件,则不会继续处理文件。
要控制收割机何时关闭,请使用close_*配置选项
2.1.2 Harvester 组件
prospector负责管理harvester并找到所有要读取的文件来源。
如果输入类型为日志,则查找器将查找路径匹配的所有文件,并为每个文件启动一个harvester。
每个prospector都在自己的Go协程中运行。
Filebeat目前支持两种prospector类型:log和stdin。
每个prospector类型可以定义多次。
日志prospector检查每个文件以查看harvester是否需要启动,是否已经运行,
或者该文件是否可以被忽略(请参阅ignore_older)。
只有在harvester关闭后文件的大小发生了变化,才会读取到新行。
2.2 filebeat如何保存文件的状态
Filebeat保留每个文件的状态,并经常将状态刷新到磁盘中的注册表文件中。该状态用于记住harvester读取的最后一个偏移量,并确保发送所有日志行。如果无法访问输出(如Elasticsearch或Logstash),Filebeat将跟踪最后发送的行,并在输出再次可用时继续读取文件。当Filebeat运行时,每个输入的状态信息也保存在内存中。当Filebeat重新启动时,来自注册表文件的数据用于重建状态,Filebeat在最后一个已知位置继续每个harvester。对于每个输入,Filebeat都会保留它找到的每个文件的状态。由于文件可以重命名或移动,文件名和路径不足以标识文件。对于每个文件,Filebeat存储唯一的标识符,以检测文件是否以前被捕获。
2.3 filebeat何如保证至少一次数据消费
Filebeat保证事件将至少传递到配置的输出一次,并且不会丢失数据。是因为它将每个事件的传递状态存储在注册表文件中。在已定义的输出被阻止且未确认所有事件的情况下,Filebeat将继续尝试发送事件,直到输出确认已接收到事件为止。如果Filebeat在发送事件的过程中关闭,它不会等待输出确认所有事件后再关闭。当Filebeat重新启动时,将再次将Filebeat关闭前未确认的所有事件发送到输出。这样可以确保每个事件至少发送一次,但最终可能会有重复的事件发送到输出。通过设置shutdown_timeout选项,可以将Filebeat配置为在关机前等待特定时间。
三、Filebeat配置文件
Filebeat配置文件名称默认为filebeat.yml,也可以启动时,手动指定配置文件名称。
配置文件参数及解释:
type: log #input类型为log
enable: true #表示是该log类型配置生效
paths: #指定要监控的日志,目前按照Go语言的glob函数处理。没有对配置目录做递归处理,比如配置的如果是:
- /var/log/* /.log #则只会去/var/log目录的所有子目录中寻找以".log"结尾的文件,而不会寻找/var/log目录下以".log"结尾的文件。
recursive_glob.enabled: #启用全局递归模式,例如/foo/**包括/foo, /foo/, /foo//
encoding: #指定被监控的文件的编码类型,使用plain和utf-8都是可以处理中文日志的
exclude_lines: [‘^DBG’] #不包含匹配正则的行
include_lines: [‘^ERR’, ‘^WARN’] #包含匹配正则的行
harvester_buffer_size: 16384 #每个harvester在获取文件时使用的缓冲区的字节大小
max_bytes: 10485760 #单个日志消息可以拥有的最大字节数。max_bytes之后的所有字节都被丢弃而不发送。默认值为10MB (10485760)
exclude_files: [‘.gz$’] #用于匹配希望Filebeat忽略的文件的正则表达式列表
ingore_older: 0 #默认为0,表示禁用,可以配置2h,2m等,注意ignore_older必须大于close_inactive的值.表示忽略超过设置值未更新的文件或者文件从来没有被harvester收集
close_* #close_ *配置选项用于在特定标准或时间之后关闭harvester。 关闭harvester意味着关闭文件处理程序。 如果在harvester关闭后文件被更新,则在scan_frequency过后,文件将被重新拾取。 但是,如果在harvester关闭时移动或删除文件,Filebeat将无法再次接收文件,并且harvester未读取的任何数据都将丢失。
close_inactive #启动选项时,如果在制定时间没有被读取,将关闭文件句柄
读取的最后一条日志定义为下一次读取的起始点,而不是基于文件的修改时间
如果关闭的文件发生变化,一个新的harverster将在scan_frequency运行后被启动
建议至少设置一个大于读取日志频率的值,配置多个prospector来实现针对不同更新速度的日志文件
使用内部时间戳机制,来反映记录日志的读取,每次读取到最后一行日志时开始倒计时使用2h 5m 来表示
close_rename #当选项启动,如果文件被重命名和移动,filebeat关闭文件的处理读取
close_removed #当选项启动,文件被删除时,filebeat关闭文件的处理读取这个选项启动后,必须启动clean_removed
close_eof #适合只写一次日志的文件,然后filebeat关闭文件的处理读取
close_timeout #当选项启动时,filebeat会给每个harvester设置预定义时间,不管这个文件是否被读取,达到设定时间后,将被关闭
close_timeout #不能等于ignore_older,会导致文件更新时,不会被读取如果output一直没有输出日志事件,这个timeout是不会被启动的,至少要要有一个事件发送,然后haverter将被关闭设置0 表示不启动
clean_inactived #从注册表文件中删除先前收获的文件的状态设置必须大于ignore_older+scan_frequency,以确保在文件仍在收集时没有删除任何状态配置选项有助于减小注册表文件的大小,特别是如果每天都生成大量的新文件此配置选项也可用于防止在Linux上重用inode的Filebeat问题
clean_removed #启动选项后,如果文件在磁盘上找不到,将从注册表中清除filebeat如果关闭close removed 必须关闭clean removed
scan_frequency #prospector检查指定用于收获的路径中的新文件的频率,默认10s
tail_files: #如果设置为true,Filebeat从文件尾开始监控文件新增内容,把新增的每一行文件作为一个事件依次发送,而不是从文件开始处重新发送所有内容。
symlinks: #符号链接选项允许Filebeat除常规文件外,可以收集符号链接。收集符号链接时,即使报告了符号链接的路径,Filebeat也会打开并读取原始文件。
backoff: #backoff选项指定Filebeat如何积极地抓取新文件进行更新。默认1s,backoff选项定义Filebeat在达到EOF之后再次检查文件之间等待的时间。
max_backoff: #在达到EOF之后再次检查文件之前Filebeat等待的最长时间
backoff_factor: #指定backoff尝试等待时间几次,默认是2
harvester_limit: #harvester_limit选项限制一个prospector并行启动的harvester数量,直接影响文件打开数
tags #列表中添加标签,用过过滤,例如:tags: [“json”]
Fields #可选字段,选择额外的字段进行输出可以是标量值,元组,字典等嵌套类型
默认在sub-dictionary位置
filebeat.inputs:
fields:
app_id: query_engine_12
fields_under_root #如果值为ture,那么fields存储在输出文档的顶级位置
multiline.pattern #必须匹配的regexp模式
multiline.negate #定义上面的模式匹配条件的动作是 否定的,默认是false假如模式匹配条件’^b’,默认是false模式,表示讲按照模式匹配进行匹配 将不是以b开头的日志行进行合并如果是true,表示将不以b开头的日志行进行合并
multiline.match # 指定Filebeat如何将匹配行组合成事件,在之前或者之后,取决于上面所指定的negate
multiline.max_lines #可以组合成一个事件的最大行数,超过将丢弃,默认500
multiline.timeout #定义超时时间,如果开始一个新的事件在超时时间内没有发现匹配,也将发送日志,默认是5smax_procs #设置可以同时执行的最大CPU数。默认值为系统中可用的逻辑CPU的数量。
Name #为该filebeat指定名字,默认为主机的hostname
四、filebeat对比fluented
Filebeat
在版本 5.x 中,Elasticsearch 具有解析的能力(像 Logstash 过滤器)— Ingest。这也就意味着可以将数据直接用 Filebeat 推送到 Elasticsearch,并让 Elasticsearch 既做解析的事情,又做存储的事情。也不需要使用缓冲,因为 Filebeat 也会和 Logstash 一样记住上次读取的偏移,如果需要缓冲(例如,不希望将日志服务器的文件系统填满),可以使用 Redis/Kafka,因为 Filebeat 可以与它们进行通信。
优势
Filebeat 只是一个二进制文件没有任何依赖。它占用资源极少,尽管它还十分年轻,正式因为它简单,所以几乎没有什么可以出错的地方,所以它的可靠性还是很高的。它也为我们提供了很多可以调节的点,例如:它以何种方式搜索新的文件,以及当文件有一段时间没有发生变化时,何时选择关闭文件句柄。
劣势
Filebeat 的应用范围十分有限,所以在某些场景下我们会碰到问题。例如,如果使用 Logstash 作为下游管道,我们同样会遇到性能问题。正因为如此,Filebeat 的范围在扩大。开始时,它只能将日志发送到 Logstash 和 Elasticsearch,而现在它可以将日志发送给 Kafka 和 Redis,在 5.x 版本中,它还具备过滤的能力。
典型应用场景
Filebeat 在解决某些特定的问题时:日志存于文件,我们希望将日志直接传输存储到 Elasticsearch。这仅在我们只是抓去(grep)它们或者日志是存于 JSON 格式(Filebeat 可以解析 JSON)。或者如果打算使用 Elasticsearch 的 Ingest 功能对日志进行解析和丰富。
将日志发送到 Kafka/Redis。所以另外一个传输工具(例如,Logstash 或自定义的 Kafka 消费者)可以进一步丰富和转发。这里假设选择的下游传输工具能够满足我们对功能和性能的要求。
Fluentd
Fluentd 创建的初衷主要是尽可能的使用 JSON 作为日志输出,所以传输工具及其下游的传输线不需要猜测子字符串里面各个字段的类型。这样,它为几乎所有的语言都提供库,这也意味着,我们可以将它插入到我们自定义的程序中。
优势
和多数 Logstash 插件一样,Fluentd 插件是用 Ruby 语言开发的非常易于编写维护。所以它数量很多,几乎所有的源和目标存储都有插件(各个插件的成熟度也不太一样)。这也意味这我们可以用 Fluentd 来串联所有的东西。
劣势
因为在多数应用场景下,我们会通过 Fluentd 得到结构化的数据,它的灵活性并不好。但是我们仍然可以通过正则表达式,来解析非结构化的数据。尽管,性能在大多数场景下都很好,但它并不是***的,和 syslog-ng 一样,它的缓冲只存在与输出端,单线程核心以及 Ruby GIL 实现的插件意味着它大的节点下性能是受限的,不过,它的资源消耗在大多数场景下是可以接受的。对于小的或者嵌入式的设备,可能需要看看 Fluent Bit,它和 Fluentd 的关系与 Filebeat 和 Logstash 之间的关系类似。
典型应用场景
Fluentd 在日志的数据源和目标存储各种各样时非常合适,因为它有很多插件。而且,如果大多数数据源都是自定义的应用,所以可以发现用 fluentd 的库要比将日志库与其他传输工具结合起来要容易很多。特别是在我们的应用是多种语言编写的时候,即我们使用了多种日志库,日志的行为也不太一样。
五、Filebeat的部署安装
5.1裸金属安装
注:以linux系统为例,版本为7.17
5.1.1 压缩包方式安装
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.17.5-linux-x86_64.tar.gz
tar xzvf filebeat-7.17.5-linux-x86_64.tar.gz -C /root/
mv /root/filebeat-7.17.5 /usr/local/filebeat
启动
/usr/local/filebeat/bin/filebeat -e -c filebeat.yml
5.1.2 yum方式安装
导入KEY
sudo rpm --import https://packages.elastic.co/GPG-KEY-elasticsearch
创建elastic.repo文件
cd /etc/yum.repo.d/
vim elastic.repo
[elastic-7.x]
name=Elastic repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
安装
sudo yum install filebeat
启动
systemctl start filebeat
5.2 K8s安装
下载manifest file
curl -L -O https://raw.githubusercontent.com/elastic/beats/7.17/deploy/kubernetes/filebeat-kubernetes.yaml
修改文件
Filebeat的输出为es,将以下es地址修改为你的es集群地址
- name: ELASTICSEARCH_HOST
value: elasticsearch
- name: ELASTICSEARCH_PORT
value: "9200"
- name: ELASTICSEARCH_USERNAME
value: elastic
- name: ELASTICSEARCH_PASSWORD
value: changeme
以root用户运行容器
securityContext:
runAsUser: 0
privileged: true
Filebeat账户授权
oc adm policy add-scc-to-user privileged system:serviceaccount:kube-system:filebeat
部署
kubectl create -f filebeat-kubernetes.yaml
查看状态
$ kubectl --namespace=kube-system get ds/filebeat
NAME DESIRED CURRENT READY UP-TO-DATE AVAILABLE NODE-SELECTOR AGE
filebeat 32 32 0 32 0 <none>
5.3 基本命令
export #导出
run #执行(默认执行)
test #测试配置
keystore #秘钥存储
modules #模块配置管理
setup #设置初始环境
例如: ./filebeat test config #用来测试配置文件是否正确
export命令
将配置或索引模板导出到stdout。您可以使用此命令快速查看配置或索引模板的内容。
filebeat export SUBCOMMAND [FLAGS]
SUBCOMMANDS
config
将当前配置导出到stdout。如果使用该-c标志,则此命令将导出在指定文件中定义的配置。
template
将索引模板导出到stdout。您可以指定--es.version和 --index标志以进一步定义导出的内容。
FLAGE
--es.version VERSION 与其一起指定时template,导出与指定版本兼容的索引模板。
-h, --help 显示export命令的帮助。
--index BASE_NAME 与其一起指定时template,设置用于索引模板的基本名称。如果未指定此标志,则默认基本名称为filebeat。
例子:
filebeat export config
filebeat export template --es.version 6.2.4 --index myindexname
keystore命令
管理秘钥库
filebeat keystore SUBCOMMAND [ FLAGS ]
SUBCOMMAND
add KEY 将指定的密钥添加到密钥库。使用该--force标志覆盖现有密钥。使用--stdin标志传递值stdin
create 创建一个密钥库来保存秘密。使用该--force标志覆盖现有密钥库。
list 列出密钥库中的密钥。
remove KEY 从密钥库中删除指定的密钥。
FLAGS
--force 适用于add和create子命令。使用时add,会覆盖指定的密钥。与之一起使用时create,会覆盖密钥库。
--stdin 与之一起使用时add,使用stdin作为键值的来源。
-h,--help 显示keystore命令的帮助。
例子:
filebeat keystore create
filebeat keystore add ES_PWD
filebeat keystore remove ES_PWD
filebeat keystore list
modules命令
管理配置的模块。您可以使用此命令启用和禁用modules.d目录中定义的特定模块配置。使用此命令所做的更改将保留,并用于后续运行的Filebeat。
filebeat modules SUBCOMMAND [FLAGS]
SUBCOMMANDS
disable MODULE_LIST 禁用以空格分隔的列表中指定的模块。
enable MODULE_LIST 启用以空格分隔的列表中指定的模块。
list 列出当前启用和禁用的模块。
FLAGS
-h,--help 显示export命令的帮助。
例子:
filebeat modules list
filebeat modules enable apache2 auditd mysql
run命令
运行Filebeat。如果在未指定命令的情况下启动Filebeat,则默认使用此命令。
filebeat run [FLAGS] or filebeat [FLAGS]
FLAGS
-N, --N 禁用将事件发布到已定义的输出。此选项仅用于测试Filebeat。
--cpuprofile FILE 将CPU配置文件数据写入指定的文件。此选项对于Filebeat故障排除很有用。
-h, --help 显示run命令的帮助。
--httpprof [HOST]:PORT 启动http服务器以进行性能分析。此选项对于故障排除和分析Filebeat很有用。
--memprofile FILE 将内存配置文件数据写入指定的输出文件。此选项对于Filebeat故障排除很有用。
--modules MODULE_LIST 指定要运行的以逗号分隔的模块列表。例如:
filebeat run --modules nginx,mysql,system
您可以使用该modules命令启用和禁用特定模块,而不是每次运行Filebeat时指定模块列表。然后,当您运行Filebeat时,它将运行任何已启用的模块。
--once 使用该once标志时,Filebeat将启动所有已配置的收集器和探测器,并运行每个探测器,直到收割机关闭。如果设置了--once标志,则还应设置close_eof为在达到文件末尾时关闭收割机。默认情况下,收割机在close_inactive到达后关闭。
--setup 加载示例Kibana仪表板。如果要在不运行Filebeat的情况下加载仪表板,请改用setup命令。
例子:filebeat run -e --setup
setup命令
设置初始环境,包括索引模板,Kibana仪表板(如果可用)和机器学习作业(如果可用)。
索引模板可确保在Elasticsearch中正确映射字段。
Kibana仪表板使您可以更轻松地在Kibana中可视化Filebeat数据。
机器学习作业包含分析异常数据所需的配置信息和元数据。
使用此命令而不是run --setup在不实际运行Filebeat和摄取数据的情况下设置环境。
filebeat setup [FLAGS]
FLAGS
--dashboards 仅设置Kibana仪表板。此选项从Filebeat包加载仪表板。有关更多选项,例如加载自定义仪表板,请参阅Beats开发人员指南 中的导入现有节拍仪表板。
-h, --help 显示setup命令的帮助。
--machine-learning 仅设置机器学习作业配置。
--modules MODULE_LIST 指定以逗号分隔的模块列表。当filebeat.yml文件中没有定义模块时,使用此标志可以避免错误。
--template 仅设置索引模板。
例子:filebeat setup --dashboards
test命令
测试配置
filebeat test SUBCOMMAND [FLAGS]
SUBCOMMAND
config 测试配置设置
output 测试Filebeat可以使用当前设置连接到输出。
FLAGS
-h, --help 显示test命令的帮助。
例子:filebeat test config
version命令
显示有关当前版本的信息。
filebeat version [FLAGS]
-h, --help 显示version命令的帮助。
全局标志
运行Filebeat时,可以使用这些全局标志。
-E, --E "SETTING_NAME=VALUE" 覆盖特定的配置设置。您可以指定多个覆盖。例如:filebeat -E "name=mybeat" -E "output.elasticsearch.hosts=["http://myhost:9200"]" 此设置适用于当前运行的Filebeat进程。Filebeat配置文件未更改。
-M, --M "VAR_NAME=VALUE" 覆盖Filebeat模块的默认配置。您可以指定多个变量覆盖。例如:filebeat -modules=nginx -M "nginx.access.var.paths=[/var/log/nginx/access.log*]" -M "nginx.access.var.pipeline=no_plugins"
-c, --c FILE 指定用于Filebeat的配置文件。您在此处指定的文件是相对于path.config。如果-c未指定标志filebeat.yml,则使用默认配置文件。
-d, --d SELECTORS 启用指定选择器的调试。对于选择器,您可以指定以逗号分隔的组件列表,也可以使用它-d "*"来启用所有组件的调试。例如,-d "publish"显示所有“发布”相关消息。
-e, --e 记录到stderr并禁用syslog/file输出。
--path.config 设置配置文件的路径
--path.data 设置数据文件的路径
--path.home 设置其他文件的路径
--path.logs 设置日志文件的路径
--strict.perms 设置对配置文件的严格权限检查。默认是 -strict.perms=true
-v, --v 记录INFO级别的消息。
更多命令参考:
链接: link
六、Filebeat实践
6.1 日志收集架构
我们在收集应用日志过程中,首先经过filebeat收集器,到kafka消息队列,在经过logstash进行日志切割过滤,输出到elasticsearch中。
注:
- filebeat需要对应用日志有可读权限。
- Logstash 应用单独部署 一个应用对用一个logstash服务。
6.2 收集nginx日志
修改nginx配置文件,修改默认日志格式
log_format sun '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" $request_time $upstream_response_time $upstream_addr $upstream_status $upstream_cache_status';
重启nginx服务
systemctl restart nginx
修改filebeat配置文件
[root@nginx01 filebeat]# cat nginx.yml
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
fields:
name: nginx-access
fields_under_root: false
tail_files: false
#=============================== Processors ===============================
processors:
- drop_fields:
fields: ["beat", "input_type", "source", "offset"]
output.kafka:
enabled: true
hosts: ["1.10.10.103:9092","1.10.10.104:9092","1.10.10.105:9092"]
# message topic selection + partitioning
topic: 'nginx-access'
partition.round_robin:
reachable_only: true
# The number of concurrent load-balanced Kafka output workers(default 1)
worker: 2
required_acks: 1
compression: gzip
max_message_bytes: 1000000
#================================ Logging ======================================
# Available log levels are: critical, error, warning, info, debug
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
rotateeverybytes: 52428800 # 50MB
keepfiles: 5
启动filebeat
/usr/share/filebeat/bin/filebeat -path.home /usr/share/filebeat -path.config /etc/filebeat
Logstash配置
input {
kafka {
bootstrap_servers => ["1.10.10.105:9092,1.10.10.104:9092,1.10.10.103:9092"]
group_id => "logstash"
client_id => "nginx"
topics => "nginx-access"
codec => "json"
consumer_threads =>5
decorate_events =>true
type => "nginx-logs"
}
}
filter {
grok {
patterns_dir => ["/usr/local/logstash/patterns.d"]
match => { "message" => "%{NGINX_ACCESS}"}
remove_field => [tags, message, offset, source, input_type, "@version","[beat][name]","[beat][version]"]
}
geoip {
source => "remote_addr"
target => "geoip"
database => "/usr/local/logstash/GeoLite2-City.mmdb"
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
remove_field => ["[geoip][country_code3]", "[geoip][dma_code]", "[geoip][postal_code]", "[geoip][continent_code]", "[geoip][timezone]", "[geoip][region_code]"]
}
if [user_agent] != "-" {
useragent {
target => "ua"
source => "user_agent"
}
}
mutate {
remove_field => ["host"]
convert => [ "[geoip][coordinates]", "float" ]
}
}
output {
if [type] == "nginx-logs" {
elasticsearch {
hosts => ["1.10.10.60:9200","1.10.10.61:9200","1.10.10.62:9200"]
index => "nginx-access-%{+YYYY-MM-dd}"
}
}
}
修改logstash的nginx日志分割
cat /usr/local/logstash/patterns.d/nginx
NGINX_ACCESS %{IPORHOST:remote_addr} - %{NOTSPACE} \[%{HTTPDATE:time_local}\] "%{WORD:method} %{DATA:request} HTTP/%{NUMBER:http_version}" %{NUMBER:http_status} (%{NUMBER:response_size}|-) (%{QS:http_referer}|-) (%{QS:user_agent}|-) (%{QS:x_forwarded_for}|-) (%{NUMBER:request_time:float}|-) (%{NUMBER:upstream_response_time:float}|-) (%{HOSTPORT:upstream_addr}|-) (%{NUMBER:upstream_status}|-)
启动logstash
nohup /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf.d/ --config.reload.automatic &
6.3 收集tomcat日志
日志格式如下
Info log
Error log
2022-08-04 03:00:00,213 [scheduling-1] ERROR com.zijin.sys.trigger.BalanceTask [72] - 发送邮件失败
java.lang.NullPointerException: null
at java.math.BigDecimal.compareTo(BigDecimal.java:2628)
at com.zijincheng.www.cyadmin.sys.trigger.BalanceTask.run(BalanceTask.java:61)
at sun.reflect.GeneratedMethodAccessor494.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
修改filebeat配置文件
cat /etc/filebeat/eureka.yml
#=========================== Filebeat inputs =============================
filebeat.inputs:
- type: log
enabled: true
paths:
- /data/logs/admin/error.log
exclude_lines: ['^$']
multiline:
pattern: '^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}\,\d{3}'
negate: true
match: after
max_lines: 1000
timeout: 2s
fields_under_root: true
fields:
env: prod
name: eureka-error
log_topic: application-logs
serverip: 172.17.153.87
tail_files: false
- type: log
enabled: true
paths:
- /data/logs/admin/info.log
exclude_lines: ['^$']
multiline:
pattern: '^\d{4}-\d{1,2}-\d{1,2}\s\d{1,2}:\d{1,2}:\d{1,2}\,\d{3}'
negate: true
match: after
max_lines: 1000
timeout: 2s
fields_under_root: true
fields:
env: prod
name: eureka-info
log_topic: application-logs
serverip: 172.17.153.87
tail_files: false
#=============================== Processors ===============================
processors:
- drop_fields:
fields: ["beat", "input_type", "source", "offset"]
#output.console:
# codec.json:
# pretty: true
output.kafka:
enabled: true
hosts: ["1.10.10.103:9092","1.10.10.104:9092","1.10.10.105:9092"]
# message topic selection + partitioning
topic: '%{[log_topic]}'
partition.round_robin:
reachable_only: true
# The number of concurrent load-balanced Kafka output workers(default 1)
worker: 2
required_acks: 1
compression: gzip
max_message_bytes: 1000000
#================================ Logging ======================================
# Available log levels are: critical, error, warning, info, debug
logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
rotateeverybytes: 52428800 # 50MB
keepfiles: 5
修改logstash文件
cat /usr/local/logstash/conf.d/application.conf
input{
kafka {
bootstrap_servers => ["1.10.10.105:9092,1.10.10.104:9092,1.10.10.103:9092"]
group_id => "application"
client_id => "apisinfo"
topics => "application-logs"
consumer_threads =>5
codec => "json"
decorate_events =>true
type => "prod-logs"
}
}
filter {
mutate {
remove_field => ["@version","prospector","input","beat","source","offset"]
}
grok {
match => {
"message" => "%{TIMESTAMP_ISO8601:access_time} \[%{DATA:profunc_id}\] %{LOGLEVEL:loglevel} %{DATA:class} \[%{NUMBER:exception_info}\] - %{MESSAGE:message}"
}
pattern_definitions => {
"MESSAGE" => "[\s\S]*"
}
}
date {
match => ["access_time", "yyyy.MM.dd HH.mm.ss","UNIX_MS"]
target => "@timestamp"
}
ruby {
code => "event.set('timestamp', event.get('@timestamp').time.localtime + 8*60*60)"
}
ruby {
code => "event.set('@timestamp',event.get('timestamp'))"
}
mutate {
remove_field => ["timestamp"]
remove_field => ["[message][0]"]
}
}
output {
if [type] == "prod-logs" {
elasticsearch {
hosts => ["1.10.10.60:9200","1.10.10.61:9200","1.10.10.62:9200"]
index => "application-logs-%{+YYYY-MM-dd}"
}
}
}
启动logstash
nohup /usr/local/logstash/bin/logstash -f /usr/local/logstash/conf.d/ --config.reload.automatic &
七、常见问题及解决
7.1 配置文件问题
配置文件的结构有问题,或者使用了YAML分析程序无法解析的路径或表达式,因为配置文件包含未正确转义的字符。
另请参阅YAML提示和疑难解答中的一般建议:
链接: link
7.2 Too many open file handlers
Filebeat保持文件处理程序打开,直到文件到达文件末尾,以便它可以近乎实时地读取新的日志行。 如果Filebeat正在收集大量文件,则打开文件的数量可能会成为问题。 在大多数环境中,正在更新的文件数量很少。
close_inactive配置选项应相应地设置为关闭不再处于活动状态的文件。
您可以使用其他配置选项来关闭文件处理程序,但所有这些选项都应该小心使用,因为它们可能有副作用。
选项是:
close_renamed
close_removed
close_eof
close_timeout
harvester_limit
7.3 Filebeat使用了太多的CPU?
Filebeat可能被配置为频繁扫描文件。 检查filebeat.yml配置文件中scan_frequency的设置。 将scan_frequency设置为小于1s可能会导致Filebeat在频繁的循环中扫描磁盘。
7.4 重新收集日志
若想从文件开头收集日志,需要删除filebeat保存的日志收集位置文件。
一般在/var/lib/filebeat/registry/目录下。直接删除启动filebeat即可。