ELK的安装与使用
安装部署
部署环境:Elasticsearch-7.17.3 Logstash-7.17.3 Kibana-7.17.3
一、安装部署Elasticsearch
- 解压目录,进入conf目录下
- 编辑elasticsearch.yml文件,输入以下内容并保存
network.host: 127.0.0.1
http.port: 9200
# 跨域配置
http.cors.enabled: true
http.cors.allow-origin: "*"
3.进入bin目录,点击elasticsearch.bat文件启动项目
二、安装部署Logstash
文件配置
在conf目录下新建logstash.conf
input {
stdin {
}
jdbc {
# 配置数据库信息
jdbc_connection_string => "jdbc:mysql://localhost:3306/student?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
jdbc_user => "root"
jdbc_password => "root"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
jdbc_default_timezone => "Asia/Shanghai"
# mysql驱动所在位置
jdbc_driver_library => "D:\ELK\mysql-connector-java-8.0.11.jar"
#sql执行语句
statement => "SELECT * FROM user"
# 定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
schedule => "* * * * *"
# 是否将 sql 中 column 名称转小写
lowercase_column_names => false
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
index => "goods"
# 文档_id,%{goods_id}意思是取查询出来的goods_id的值,并将其映射到shop的_id字段中
# 文档_id,%{goodsId}如果是别名,意思是取查询出来的goodsId的值,并将其映射到shop的_id字段中
document_id => "%{goodsId}"
}
stdout {
codec => json_lines
}
}
遇到问题与解决
①JAVA_HOME环境找不到,不推荐使用JAVA_HOME变量名
系统找不到指定的路径。could not find java; set JAVA_HOME or ensurejava is in PATH D: ELK logstash-?.17.3 bin>
解决方法:
新增JDK环境变量:命名为 LS_JAVA_HOME
将jdk路径复制进去,注意!!!JDK版本必须1.8以上
②报以下错误
解决办法:
在bin目录下新建文件:logstash.conf
设置logstash.conf的值,或指定路径下的conf文件
input {
stdin{
}
}
output {
stdout{
}
}
启动logstash,进入bin目录下的cmd,输入以下命令
logstash -f logstash.conf
安装部署Kibana
进入config目录将kibana.yml文件修改
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://localhost:9200"]
kibana.index: ".kibana"
SpringBoot集成Logstash收集日志
1、导入依赖
在pom.xml文件导入以下依赖
<!--集成logstash-->
<dependency>
<groupId>net.logstash.logback</groupId>
<artifactId>logstash-logback-encoder</artifactId>
<version>5.3</version>
</dependency>
2、配置文件
新建"logback-spring.xml"文件,并编写内容如下:
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<include resource="org/springframework/boot/logging/logback/base.xml" />
<appender name="LOGSTASH" class="net.logstash.logback.appender.LogstashTcpSocketAppender">
<!--设置指定路径下的发送地址,端口号设置为4560-->
<destination>localhost:4560</destination>
<encoder charset="UTF-8" class="net.logstash.logback.encoder.LogstashEncoder" />
</appender>
<!--配置根路径下日志级别-->
<root level="INFO">
<appender-ref ref="LOGSTASH" />
</root>
</configuration>
3、配置application.yml文件
进入application.yml文件配置日志
logging:
level:
root: info
config: classpath:logback-spring.xml
4、配置logstash下的logstash.conf文件
input{
tcp{
mode => "server"
host => "127.0.0.1"
port => 4560
codec => json_lines
}
}
output{
elasticsearch{
action => "index"
hosts => "localhost:9200"
index => "springboot_log-%{+YYYY.MM.dd}"
}
}
以上配置完启动可运行成功
5、配置Logstash日志保存到数据库
Linux配置这里就不提了,网上很多。这里讲的是windows系统下的配置,并以MySQL数据库为例。
使用的是jdbc,由于Logstash默认不带jdbc,所以需要下载logstash-output-jdbc,在线下载方式:进入logstash的bin目录下输入以下命令:
logstash-plugin install --no-verify logstash-output-jdbc
若安装失败,则去百度找一个压缩包下载(小编是在百度网盘上面下载的)
下一步是采用不联网的安装方式:进入logstash的bin目录下输入以下命令:
logstash-plugin install file://文件路径+压缩包全名,以'/'分隔
若还是安装失败,则需要更改配置文件
在logstash根目录下的Gemfile文件修改以下配置
source "https://gems.ruby-china.com"
重新进入logstash的bin目录下输入以下命令
logstash-plugin install file://文件路径+压缩包全名,以'/'分隔
网上下载mysql驱动,譬如mysql-connector-java-8.0.11.jar
下载后在logstash下的vendor目录下新建jar目录,再进入jar目录新建jdbc目录
将mysql驱动包放到jdbc目录下,全局路径如下:logstash-7.17.3目录为安装模块
logstash-7.17.3\vendor\jar\jdbc\mysql-connector-java-8.0.11.jar
譬如springboot下的日志内容为
log.info(“|”+“version内容”+“|”+“taskId内容”+“|”+“taskType内容”+“|”+“data内容”+“|”+“time内容”)
重新进入logstash.conf文件,编辑以下内容
input{
tcp{
mode => "server"
host => "127.0.0.1"
port => 4560
codec => json_lines
}
}
filter{
mutate{
split => ["message","|"]
}
if[message][2]{
mutate{
add_field =>{
"version" => "%{[message][2]}"
}
}
}
if[message][3]{
mutate{
add_field =>{
"taskId" => "%{[message][3]}"
}
}
}
if[message][4]{
mutate{
add_field =>{
"taskType" => "%{[message][4]}"
}
}
}
if[message][5]{
mutate{
add_field =>{
"data" => "%{[message][5]}"
}
}
}
if[message][6]{
mutate{
add_field =>{
"time" => "%{[message][6]}"
}
}
}
mutate{
convert => {
"version" => "string"
"taskId" => "string"
"taskType" => "string"
"data" => "string"
"time" => "string"
}
}
}
output{
driver_class => "com.mysql.cj.jdbc.Driver"
connection_string => "jdbc:mysql://127.0.0.1:3306/student?user=用户名&password=密码&serverTimezone=GMT%2B8&characterEncoding=utf8&useSSL=false&autoReconnect=true"
statement => ["insert into 表名 (version,taskId,taskType,data,time) VALUES (?,?,?,?,?)","version","taskId","taskType","data","time"]
}
6、区分业务日志和错误日志存储
add_tag 属性,在input添加,用于区分输出
具体内容就不详细介绍了,看下面小编写的内容
input {
tcp {
mode => "server"
host => "127.0.0.1"
port => 4560
codec => json_lines
}
}
filter{
mutate{
split => ["message","|"]
}
if[message][1] == "business_log"{
mutate{
add_tag =>["business_log"]
}
}
if[message][1]== "error_log"{
mutate{
add_tag => ["error_log"]
}
}
if[message][2]=="ElasticSearch"{
mutate{
add_tag => ["ElasticSearch"]
}
}
if[message][2]=="MYSQL"{
mutate{
add_tag => ["MySQL"]
}
}
if[message][2]=="PgSQL"{
mutate{
add_tag => ["PgSQL"]
}
}
if[message][2]=="ElasticSearchMySQL"{
mutate{
add_tag => ["ElasticSearchAndMySQL"]
}
}
if[message][2]=="ElasticSearchPgSQL"{
mutate{
add_tag => ["ElasticSearchAndPgSQL"]
}
}
if [level] == "INFO"{
if[message][3]{
mutate{
add_field =>{
"version" => "%{[message][3]}"
}
}
}
if[message][4]{
mutate{
add_field =>{
"taskId" => "%{[message][4]}"
}
}
}
if[message][5]{
mutate{
add_field =>{
"taskType" => "%{[message][5]}"
}
}
}
if[message][6]{
mutate{
add_field =>{
"data" => "%{[message][6]}"
}
}
}
if[message][7]{
mutate{
add_field =>{
"time" => "%{[message][7]}"
}
}
}
mutate{
convert => {
"version" => "string"
"taskId" => "string"
"taskType" => "string"
"data" => "string"
"time" => "string"
}
}
}
}
output {
if [level] == "INFO"{
if "business_log" in [tags]{
if "ElasticSearch" in [tags]{
elasticsearch {
action => "index"
hosts => "localhost:9200"
index => "business_log-%{+YYYY.MM.dd}"
}
}
else if "MySQL" in [tags]{
jdbc{
driver_class => "com.mysql.cj.jdbc.Driver"
connection_string => "jdbc:mysql://127.0.0.1:3306/student?user=root&password=root&serverTimezone=GMT%2B8&characterEncoding=utf8&useSSL=false&autoReconnect=true"
statement => ["insert into business_log (version,taskId,taskType,data,time) VALUES (?,?,?,?,?)","version","taskId","taskType","data","time"]
}
}
}
else if "error_log" in [tags]{
if "ElasticSearch" in [tags]{
elasticsearch {
action => "index"
hosts => "localhost:9200"
index => "business_error_log-%{+YYYY.MM.dd}"
}
}
}
}
else if [level] == "ERROR"{
elasticsearch {
action => "index"
hosts => "localhost:9200"
index => "sys_error_log-%{+YYYY.MM.dd}"
}
}
}
配置30天删除日志记录
需求;每天都记录日志,但是考虑到日志文件存储占用内容,所以将30天前的日志文件删除掉
通过以上的配置已经配置好每天都会创建新的日志文件
思考:
①使用Kibana配置删除策略,对日志单独配置处理
②配置创建日志同时配置日志的删除策略
http://127.0.0.1:5601 命令进入Kibana
手动选择策略
若需要自定义策略,可以创建策略,如以下操作
进入索引管理,添加策略
以上步骤后,实现了指定的索引选择对应是策略
配置自动创建策略
配置内容如下
{
"index": {
"lifecycle": {
"name": "30-days-default",
"rollover_alias": ""
},
"routing": {
"allocation": {
"include": {
"_tier_preference": "data_content"
}
}
},
"number_of_shards": "1",
"auto_expand_replicas": "0-1",
"number_of_replicas": "1"
}
}
配置完点击"Next"
选择Dynamic templates,根据自定义动态添加字段
配置内容如下
{
"index": {
"lifecycle": {
"name": "30-days-default",
"rollover_alias": ""
},
"routing": {
"allocation": {
"include": {
"_tier_preference": "data_content"
}
}
},
"number_of_shards": "1",
"auto_expand_replicas": "0-1",
"number_of_replicas": "1"
}
}
配置完点击"Next"
[外链图片转存中…(img-ujTG21Kv-1684376256531)]
选择Dynamic templates,根据自定义动态添加字段
然后点击"Next",逐个点击下一步,最后创建成功。。成功实现了以上的功能