基于Filebeat+Kafka+ELK实现Nginx日志收集并采用Elastalert2实现钉钉告警

news2025/1/15 16:43:43

目录

  • 基于Filebeat+Kafka+ELK实现Nginx日志收集
  • 1.规划好项目架构图
  • 2.部署前端web集群
  • 3.部署ES集群
    • 集群介绍
    • 环境准备
    • 集群搭建
    • 分片和副本
  • 4.部署kafka集群
    • 环境准备
      • 静态IP配置
      • 重启网络服务
      • 域名解析设置
      • 安装时间同步服务
      • 关闭防火墙和selinux
    • kafka集群依赖于zookeeper
    • kafaka和zookeeper准备
      • kafka配置
      • zookeeper配置
      • 启动
      • 测试
  • 5.使用filebeat获取nginx日志数据
    • filebeat进行日志收集测试kafka集群
    • 配置
    • 测试
  • 6.在ES集群中都部署filebeat,然后将收集的日志输出到ES集群和kafka集群查看
    • ES上的filebeat
      • filebeat.yml文件---->输出到ES集群
      • elasticsearch收集日志成功
      • filebeat.yml文件---->输出到kafka集群
  • 7.使用logstash进行数据过滤
  • 8.使用kibana查看访问
  • 9.通过关键字段设置阈值,发送警告消息
    • 尝试使用elasticalert实现qq邮箱告警
    • 尝试使用elasticalert2实现钉钉告警

基于Filebeat+Kafka+ELK实现Nginx日志收集

1.规划好项目架构图

在这里插入图片描述

2.部署前端web集群

       先准备3台Nginx服务器,用做后端服务器,(由于机器有限,也直接用这三台机器来部署ES集群),然后准备2台服务器做负载均衡器(Nginx实现负载均衡具体实现操作有机会在介绍),如果是简单学习测试,可以先使用3台Nginx服务器就可以,先告一段落。

3台Nginx服务器备注:

服务器IP地址安装软件
elk-node1192.168.40.150elasticsearch、filebeat、logstash、kibana、nginx
elk-node2192.168.40.137elasticsearch、filebeat、logstash、nginx
elk-node3192.168.40.138elasticsearch、filebeat、logstash、nginx

安装nginx提供了一个脚本,可以直接运行这个脚本实现安装

#!/bin/bash
yum -y install wget zlib zlib-devel openssl openssl-devel pcre pcre-devel gcc gcc-c++ autoconf automake make 
#创建 nginx文件夹
mkdir -p /nginx
cd /nginx
#解压 下载的nginx的源码包
wget http://nginx.org/download/nginx-1.23.4.tar.gz
tar xf nginx-1.23.4.tar.gz
cd  nginx-1.23.4
#生成编译前配置工作-->Makefile,指定存放nginx1文件路径,安装相应的模块
./configure --prefix=/usr/local/nginx1  --with-threads --with-http_ssl_module --with-http_stub_status_module --with-stream
#编译
make
#编译安装-->将编译好的二进制程序安装指定目录/usr/local/nginx1
make   install

其它的软件安装过程都可以看我的博客 传送门

3.部署ES集群

集群介绍

       这里介绍集群部署和一些简单的概念,更详细的内容可以查看我的这篇文章 ElasticSearch集群8.0版本搭建、故障转移

环境准备

服务器IP地址安装软件
elk-node1192.168.40.150Elasticsearch
elk-node2192.168.40.137Elasticsearch
elk-node3192.168.40.138Elasticsearch

所有以下配置需要在配置文件中保持唯一,如果有重复的,则会报错

集群搭建

#启动3个虚拟机,分别在3台虚拟机上部署安装Elasticsearch
mkdir -p /opt/elk

#将之前单机安装的elasearch分发到其它机器
scp -r elsearch/ root@192.168.40.137:/opt/elk/
scp -r elsearch/ root@192.168.40.138:/opt/elk/

#node01的配置==>192.168.40.150
cluster.name: es-cluster
node.name: node01
network.host: 0.0.0.0
http.port: 9200
discovery.seed_hosts: ["192.168.40.137","192.168.40.138","192.168.40.150"]
cluster.initial_master_nodes: ["node01", "node02", "node03"]
# 最小节点数
node.roles: [master,data]
# 跨域专用
http.cors.enabled: true
http.cors.allow-origin: "*"

#node02的配置:
cluster.name: es-cluster
node.name: node02
network.host: 0.0.0.0
http.port: 9200
discovery.seed_hosts: ["192.168.40.137","192.168.40.138","192.168.40.150"]
cluster.initial_master_nodes: ["node01", "node02", "node03"]
# 最小节点数
node.roles: [master,data]

#node03的配置:
cluster.name: es-cluster
node.name: node03
network.host: 0.0.0.0
http.port: 9200
discovery.seed_hosts: ["192.168.40.137","192.168.40.138","192.168.40.150"]
cluster.initial_master_nodes: ["node01", "node02", "node03"]
# 最小节点数
node.roles: [master,data]

#分别启动3个节点
./elasticsearch -d

查看集群

在这里插入图片描述

创建一个索引,其中加粗的为主分片,其余为副本分片

在这里插入图片描述

查询集群状态:/_cluster/health
响应如下:在这里插入图片描述

分片和副本

       为了将数据添加到Elasticsearch,我们需要索引(index)——一个存储关联数据的地方。实际上,索引只是一个用来指向一个或多个分片(shards)的“逻辑命名空间(logical namespace)”.

  • 一个分片(shard)是一个最小级别“工作单元(worker unit)”,它只是保存了索引中所有数据的一部分。
  • 我们需要知道是分片就是一个Lucene实例,并且它本身就是一个完整的搜索引擎。应用程序不会和它直接通
    信。
  • 分片可以是主分片(primary shard)或者是复制分片(replica shard)。
  • 索引中的每个文档属于一个单独的主分片,所以主分片的数量决定了索引最多能存储多少数据。
  • 复制分片只是主分片的一个副本,它可以防止硬件故障导致的数据丢失,同时可以提供读请求,比如搜索或者从别的shard取回文档。
  • 当索引创建完成的时候,主分片的数量就固定了,但是复制分片的数量可以随时调整。

4.部署kafka集群

环境准备

服务器说明IP地址安装软件
nginx-kafka1192.168.40.170Kafka、Zookeeper、java
nginx-kafka2192.168.40.171Kafka、Zookeeper、java
nginx-kafka3192.168.40.172Kafka、Zookeeper、java

静态IP配置

nginx-kafka1

进入/etc/sysconfig/network-scripts/目录下,vim ifcfg-ens33文件

BOOTPROTO="none"
DEFROUTE="yes"
NAME="ens33"
DEVICE="ens33"
ONBOOT="yes"
IPADDR=192.168.40.170
PREFIX=24
GATEWAY=192.168.40.2
DNS1=192.168.40.2

nginx-kafka2

BOOTPROTO="none"
DEFROUTE="yes"
NAME="ens33"
DEVICE="ens33"
ONBOOT="yes"
IPADDR=192.168.40.171
PREFIX="24"
GATEWAY=192.168.40.2
DNS1=192.168.40.2

nginx-kafka3

BOOTPROTO="none"
DEFROUTE="yes"
NAME="ens33"
DEVICE="ens33"
ONBOOT="yes"
IPADDR=192.168.40.172
PREFIX=24
GATEWAY=192.168.40.2
DNS1=192.168.40.2

重启网络服务

       3台服务器都要重启网络服务,然后使用ping测试网络通信是否正常,如果可以相互ping通,则能通信

service network restart

域名解析设置

3台服务器都要设置域名解析

nginx-kafka1

root@nginx-kafaka1 ~]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
#添加对应的域名解析
192.168.40.170 nginx-kafka1
192.168.40.171 nginx-kafka2
192.168.40.172 nginx-kafka3

nginx-kafka2

root@nginx-kafaka2 ~]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
#添加对应的域名解析
192.168.40.170 nginx-kafka1
192.168.40.171 nginx-kafka2
192.168.40.172 nginx-kafka3

nginx-kafka3

root@nginx-kafaka3 ~]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6
#添加对应的域名解析
192.168.40.170 nginx-kafka1
192.168.40.171 nginx-kafka2
192.168.40.172 nginx-kafka3

安装时间同步服务

安装

yum install chrony -y

设置启动

systemctl enable chronyd
ststemctl start chronyd

设置时区

cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

关闭防火墙和selinux

防火墙关闭

systemctl enable chronyd
ststemctl start chronyd

selinux关闭

vim /etc/selinux/config 
SELINUX=disabled

kafka集群依赖于zookeeper

       Kafka集群依赖于Zookeeper主要是为了实现分布式协调和管理。Zookeeper是一个分布式协调服务,可以提供高可用的数据存储和访问服务。在Kafka集群中,Zookeeper主要用于以下几个方面:

  1. Broker注册和发现:Kafka集群中的每个Broker都会在Zookeeper中注册自己的信息,包括Broker的ID、主机名和端口等。通过Zookeeper,客户端可以发现Kafka集群中的所有Broker,并且可以动态地感知Broker的变化。
  2. Topic和Partition管理:Kafka中的Topic和Partition信息也会存储在Zookeeper中。当有新的Topic或Partition被创建时,Kafka会将这些信息写入Zookeeper中,同时在Broker上创建对应的Topic和Partition。当Topic或Partition被删除时,Kafka也会从Zookeeper中删除对应的信息。
  3. Controller选举:在Kafka集群中,每个Partition都有一个Leader和多个Follower。Leader负责处理读写请求,Follower则负责同步Leader的数据。Kafka通过Zookeeper实现了Controller选举机制,当某个Broker宕机或不可用时,Zookeeper会重新选举一个新的Controller,负责管理集群中的Partition和Replica。

       因此,Zookeeper是Kafka集群中非常重要的组件,它可以提供高可用性、一致性和可靠性的服务,保证Kafka集群的正常运行。

kafaka和zookeeper准备

kafaka下载: 传送门

zookeeper下载: 传送门

由于kafka和zookeeper都是用Java编写的,安装一下jdk,3台都要安装

yum install java -y

kafka配置

上传到三台机器的/opt目录下

nginx-kafka1已经上传,使用scp传送到其它服务器

[root@nginx-kafka1 opt]# ls
apache-zookeeper-3.8.1-bin.tar.gz  kafka_2.13-3.5.0.tgz
[root@nginx-kafka1 opt]# scp apache-zookeeper-3.8.1-bin.tar.gz kafka_2.13-3.5.0.tgz root@192.168.40.171:/opt/ 
[root@nginx-kafka1 opt]# scp apache-zookeeper-3.8.1-bin.tar.gz kafka_2.13-3.5.0.tgz root@192.168.40.172:/opt/

解压

 tar xf kafka_2.13-3.5.0.tgz

配置唯一的broker id

       broker id 其实就是kafaka集群的一个节点,为了表示集群中不同的节点,将broker.id设置为一个整数值进行标识,建议按顺序设置,进入kafaka配置目录

[root@nginx-kafka1 config]# vim server.properties 
[root@nginx-kafka1 config]# pwd
/opt/kafka_2.13-3.5.0/config
broker.id=1
[root@nginx-kafka2 config]# vim server.properties 
[root@nginx-kafka2 config]# pwd
/opt/kafka_2.13-3.5.0/config
broker.id=2
[root@nginx-kafka3 config]# vim server.properties 
[root@nginx-kafka3 config]# pwd
/opt/kafka_2.13-3.5.0/config

修改监听的主机名和端口

       在 ZooKeeper 集群中,配置 listeners 参数是为了让 ZooKeeper 能够与外部的客户端进行通信。

       例如:PLAINTEXT://nginx-kafka1:9092 是 Kafka 的监听地址,它告诉 ZooKeeper 监听来自该地址的客户端请求(这也就是为什么要设置域名解析)。这个地址包含了协议、主机名和端口号,其中 PLAINTEXT 是 Kafka 的一种传输协议,nginx-kafka1 是 Kafka 的主机名,9092 是 Kafka 的默认端口号。通过配置 listeners 参数,ZooKeeper 就可以与 Kafka 集群中的其他组件进行通信,比如消费者或生产者。

[root@nginx-kafka1 config]# vim server.properties
listeners=PLAINTEXT://nginx-kafka1:9092
[root@nginx-kafka2 config]# vim server.properties 
listeners=PLAINTEXT://nginx-kafka2:9092
[root@nginx-kafka3 config]# vim server.properties 
listeners=PLAINTEXT://nginx-kafka3:9092

修改日志存放目录和默认的分区数量

vim server.properties 
# 三台机器都配置 
log.dirs=/data
num.partitions=3

配置zookeeper连接

vim server.properties 
# 三台机器都配置
zookeeper.connect=192.168.40.170:2181,192.168.40.171:2181,192.168.40.172:2181

zookeeper配置

解压

 tar xf apache-zookeeper-3.8.1-bin.tar.gz 

进入配置文件目录,修改配置文件,3台机器都要操作,其中

  • server.1、server.2、server.3指的是服务器节点的 ID,这个 ID 必须是唯一的且在集群中必须按顺序递增
  • 192.168.40.17*:3888:4888表示这个节点的主机名或 IP 地址和端口号。其中,“3888”是用于 follower 之间通信的端口号,“4888”是用于选举 leader 的端口号。这两个端口号都是 ZooKeeper 集群中的节点之间进行通信的重要端口号。
cd apache-zookeeper-3.8.1-bin/conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg 
### 新加如下内容
server.1=192.168.40.170:3888:4888
server.2=192.168.40.171:3888:4888
server.3=192.168.40.172:3888:4888

       在刚才的配置文件中,可以看到存放数据的目录为/tmp/zookeeper,但该目录是不存在的,创建该目录

mkdir -p /tmp/zookeeper

       然后创建一个myid的文件,进行Zookeeper id 宣告,宣告的id要和之前写入zoo.cfg文件内容的id对应,通过执行这个命令,为每一个 ZooKeeper 服务器节点分配 id。

nginx-kafka1

[root@nginx-kafka1 conf]# echo 1 >/tmp/zookeeper/myid

nginx-kafka2

[root@nginx-kafka2 conf]# echo 2 >/tmp/zookeeper/myid

nginx-kafka3

[root@nginx-kafka3 conf]# echo 3 >/tmp/zookeeper/myid

启动

先启动zookeeper,然后在启动kafka

修改一下PATH环境变量吧

PATH=/opt/apache-zookeeper-3.8.1-bin/bin:$PATH
# 可以将其写入/root/.bashrc文件,然后刷新
source /root/.bashrc

启动zookeeper服务端

[root@nginx-kafka1 apache-zookeeper-3.8.1-bin]# cd bin/
[root@nginx-kafka1 bin]# ./zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/apache-zookeeper-3.8.1-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@nginx-kafka2 bin]# ./zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/apache-zookeeper-3.8.1-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@nginx-kafka3 bin]# ./zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /opt/apache-zookeeper-3.8.1-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

查看状态

zkServer.sh status

如果看到有一个leader,两个follower则zookeeper启动成功

在这里插入图片描述

启动kafaka

[root@nginx-kafka1 kafka_2.13-3.5.0]# ./bin/kafka-server-start.sh -daemon ./config/server.properties 
[root@nginx-kafka2 kafka_2.13-3.5.0]# ./bin/kafka-server-start.sh -daemon ./config/server.properties
[root@nginx-kafka3 kafka_2.13-3.5.0]# ./bin/kafka-server-start.sh -daemon ./config/server.properties

启动zookeeper客户端查看kafka是否启动成功

[root@nginx-kafka1 bin]# ./zkCli.sh

启动之后,执行如下命令

[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids 
[1, 2, 3]

       它会显示zookeeper中/brokers/ids节点下的子节点列表。该节点下有三个子节点,分别是1、2和3。在Kafka中,这些子节点表示Kafka集群中不同的broker节点。每个broker节点都有一个唯一的ID,这些ID就是/brokers/ids节点下的子节点。

测试

创建一个topic

bin/kafka-topics.sh --create --bootstrap-server 192.168.40.171:9092 --replication-factor 1 --partitions 1 --topic test

如果看到topics中有test这个主题的话,则成功

在这里插入图片描述

在test主题下创建生产者,往kafaka里写入数据

[root@nginx-kafka1 kafka_2.13-3.5.0]# bin/kafka-console-producer.sh --broker-list 192.168.40.171:9092 --topic test

从kafaka读取数据

[root@nginx-kafka2 kafka_2.13-3.5.0]#  bin/kafka-console-consumer.sh --bootstrap-server 192.168.40.171:9092 --topic test --from-beginning

效果如下

在这里插入图片描述

插一段:启动kafaka的时候有个提示

OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N

编辑虚拟机设置,将内核设置为2核,就不会报错了

在这里插入图片描述

5.使用filebeat获取nginx日志数据

filebeat进行日志收集测试kafka集群

filebeat的下载可以看我之前写的文章 Filebeat详细介绍,下载和启动,日志读取和模块设置等

配置

       下载好之后,进入filebeat目录,将该目录下的filebeat.yml文件备份一遍

# 备份
[root@nginx-kafka1 filebeat]# cp filebeat.yml  filebeat.yml.bak
# 清空原来的filebeat.yml文件
[root@nginx-kafka1 filebeat]# >filebeat.yml
# 编写filebeat.yml文件
[root@nginx-kafka1 filebeat]# cat filebeat.yml
filebeat.inputs:
- type: log # 输入类型为日志类型
  enabled: true
  paths: # 存放日志的路径
    - /var/log/nginx/shengxia/*.log
setup.template.settings:
  index.number_of_shards: 3 # 分片数量
#------------------------------kafaka-------------------------
# 将采集的日志输入的kafaka中
output.kafka:
  hosts: ["192.168.40.170:9092","192.168.40.171:9092","192.168.40.172:9092"] #kafka集群所在的IP地址
  topic: nginxlog # 主题
  keep_alive: 10s

启动filebeat服务

[root@nginx-kafka1 filebeat]# nohup ./filebeat -e -c filebeat.yml &

测试

在kafka中创建名为nginxlog的主题

[root@nginx-kafka2 kafka_2.13-3.5.0]# bin/kafka-topics.sh --create --bootstrap-server 192.168.40.171:9092 --replication-factor 1 --partitions 1 --topic nginxlog

消费者查看是否采集到数据

[root@nginx-kafka2 kafka_2.13-3.5.0]#  bin/kafka-console-consumer.sh --bootstrap-server 192.168.40.171:9092 --topic nginxlog --from-beginning

在这里插入图片描述

6.在ES集群中都部署filebeat,然后将收集的日志输出到ES集群和kafka集群查看

ES上的filebeat

filebeat.yml文件---->输出到ES集群

       输出到ES集群的目的是为了测试ES集群是否能够顺利的获取filebeat采集的日志,如果觉得能成功,这一步可以不需要操作

filebeat.inputs:
- type: log
  enabled: true
  paths: # 日志存放目录,根据自己存放日志目录修改
    - /usr/local/shengxia/logs/*.log
setup.template.settings:
  index.number_of_shards: 3 #  分片数量
#将采集的日志收入elasticsearch
output.elasticsearch:
  hosts: ["192.168.40.150:9200","192.168.40.137:9200","192.168.40.138:9200"]   

filebeat都放在后台运行

[root@elk-node1 filebeat]# nohup ./filebeat -e -c filebeat.yml &
[1] 112019
[root@elk-node1 filebeat]# nohup: 忽略输入并把输出追加到"nohup.out
[root@elk-node2 filebeat]# nohup ./filebeat -e -c filebeat.yml &
[1] 112019
[root@elk-node2 filebeat]# nohup: 忽略输入并把输出追加到"nohup.out
[root@elk-node3 filebeat]# nohup ./filebeat -e -c filebeat.yml &
[1] 112019
[root@elk-node3 filebeat]# nohup: 忽略输入并把输出追加到"nohup.out

elasticsearch收集日志成功

elk-node1的日志收集

在这里插入图片描述

elk-node2的日志收集
在这里插入图片描述

elk-node3的日志收集

在这里插入图片描述

filebeat.yml文件---->输出到kafka集群

3台服务器配置filebeat.yml文件,获取nginx日志数据

elk-node1

filebeat.inputs:
- type: log
  enabled: true
  paths: # 日志存放目录
    - /usr/local/shengxia/logs/*.log
setup.template.settings:
  index.number_of_shards: 3 #  分片数量
#------------------------------kafaka-------------------------
## 将采集的日志输入的kafaka中
output.kafka:
  hosts: ["192.168.40.170:9092","192.168.40.171:9092","192.168.40.172:9092"]
  enabled: true
  topic: nginxlog
  keep_alive: 10s

elk-node2

filebeat.inputs:
- type: log
  enabled: true
  paths: # 日志存放目录
    - /usr/local/shengxia/logs/*.log
setup.template.settings:
  index.number_of_shards: 3 #  分片数量
#------------------------------kafaka-------------------------
## 将采集的日志输入的kafaka中
output.kafka:
  hosts: ["192.168.40.170:9092","192.168.40.171:9092","192.168.40.172:9092"]
  enabled: true
  topic: nginxlog
  keep_alive: 10s

elk-node3

filebeat.inputs:
- type: log
  enabled: true
  paths: # 日志存放目录
    - /usr/local/nginx1/logs/*.log
setup.template.settings:
  index.number_of_shards: 3 #  分片数量
#------------------------------kafaka-------------------------
## 将采集的日志输入的kafaka中
output.kafka:
  hosts: ["192.168.40.170:9092","192.168.40.171:9092","192.168.40.172:9092"]
  topic: nginxlog
  keep_alive: 10s

       同时,装有三台nginx服务器的机器要配置域名解析,原因如蓝色部分字体所示

192.168.40.170 nginx-kafka1
192.168.40.171 nginx-kafka2
192.168.40.172 nginx-kafka3

可能存在的问题:
       filebeat抓取nginx日志传送到kafka没有接收,可能的原因是没有在安装filebeat所在的机器设置域名解析,因为配置kafka的server.properties中的listeners是通过域名配置的,所以需要设置域名解析。

启动kafka消费者

[root@nginx-kafaka2 kafka_2.13-3.5.0]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.40.171:9092 --topic nginxlog --from-beginning

elk-node1的日志输入到kafka中

在这里插入图片描述

elk-node2的日志输入到kafka

在这里插入图片描述

elk-node3的日志输入到kafka中

在这里插入图片描述

       这三台nginx服务器的日志就都收集到kafka中,接下来使用logstash进行数据过滤

7.使用logstash进行数据过滤

先使用es这个用户启动es集群

[es@elk-node1 bin]$ pwd
/opt/elk/elsearch/bin
[es@elk-node1 bin]$ ./elasticsearch -d
[es@elk-node2 bin]$ pwd
/opt/elk/elsearch/bin
[es@elk-node2 bin]$ ./elasticsearch -d
[es@elk-node3 bin]$ pwd
/opt/elk/elsearch/bin
[es@elk-node3 bin]$ ./elasticsearch -d

logstash从kafka中获取数据到es,并将日志进行转化

[root@elk-node3 logstash]# pwd
/opt/elk/logstash
[root@elk-node3 logstash]# nohup ./bin/logstash -f nginx-logstash.conf &

nginx-logstash.conf文件

input {
  kafka {
    bootstrap_servers => "192.168.40.170:9092" # kafka服务器的IP地址
    topics => ["nginxlog"] # 消费主题
    auto_offset_reset => "latest"
    codec => "json"
  }
}

filter {
  grok { # 使用grok模式解析nginx字段
        match => { "message" => "%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] \"%{WORD:verb} %{URIPATHPARAM:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response} %{NUMBER:bytes} \"%{DATA:referrer}\" \"%{DATA:agent}\"" }
  }

  date {
    match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
    target => "@timestamp"
  }

  mutate {
    convert => {
      "response_code" => "integer"
      "response_time" => "float"
    }
  }
}

output {
  elasticsearch {
    hosts => ["192.168.40.150:9200"] #elasticsearch服务器所在IP地址
    index => "nginx-logs-%{+YYYY.MM.dd}" # 索引
  }
}

elk-node1的日志写入成功

在这里插入图片描述

elk-node2的日志写入成功

在这里插入图片描述

elk-node2的日志写入成功

在这里插入图片描述

8.使用kibana查看访问

启动kibana

[root@elk-node1 kibana-8.8]# nohup ./bin/kibana --allow-root &

创建索引

在这里插入图片描述

访问,发现显示的字段没有达到预期的要求

在这里插入图片描述

       修改一下nginx.conf日志格式试一下(主要是grok解析nginx日志字段出现了点拆错,可以百度查找相关的解析格式,上面的grok是修正过后的,应该没啥问题)

可以查看到nginx日志的相关字段了
在这里插入图片描述

       有结果后进行出图展示,可以自己设置相关的字段进行出图展示,我这里简单设置了一下

在这里插入图片描述

       因为我也安装了grafana,也可以利用grafana进行出图展示,grafana安装的过程也可以看我之前的文章 Prometheus、Grafana、cAdvisor的介绍、安装和使用

选择Elasticsearch作为数据源,连接好ES集群所在的IP地址

在这里插入图片描述

创建面板进行出图展示

在这里插入图片描述

设置告警规则,好像没啥用,可以忽略

在这里插入图片描述

9.通过关键字段设置阈值,发送警告消息

尝试使用elasticalert实现qq邮箱告警

下载pyhton3的软件包

[root@elk-node1 elk]# yum -y install gcc gcc-c++ openssl-devel python3-devel

使用git下载elasticalert

[root@elk-node1 elasticalert]# yum install -y git && git clone https://github.com/Yelp/elastalert.git

如果git拉不下来,使用pip3安装

[root@elk-node1 elk]# pip3 install elastalert

如果存在如下错误

在这里插入图片描述

执行如下命令

[root@elk-node1 elk]# pip3 install blist 

再次安装

[root@elk-node1 elk]# pip3 install elastalert

安装好后默认存放路径

[root@elk-node1 elk]# find / -name elastalert
/usr/local/bin/elastalert # 二进制文件
/usr/local/lib/python3.6/site-packages/elastalert # 安装目录

复制到elk目录下

[root@elk-node1 elk]# cp -r /usr/local/lib/python3.6/site-packages/elastalert .

踩坑,因为我的ES集群8.x版本以上的,与elastalert版本不兼容,所以…………………………失败了

尝试使用elasticalert2实现钉钉告警

安装相关依赖

yum -y groupinstall "Development tools"
yum install -y ncurses-devel gdbm-devel xz-devel sqlite-devel tk-devel uuid-devel readline-devel bzip2-devel libffi-devel
yum install -y openssl-devel openssl11 openssl11-devel
export CFLAGS=$(pkg-config --cflags openssl11)
export LDFLAGS=$(pkg-config --libs openssl11) 

下载并安装Python3.10.4,elastalert2需要最低Python3.10版本支持

[root@elk-node1 ~]# mkdir -p /opt/alert
# 下载安装包
[root@elk-node1 alert]# wget https://www.python.org/ftp/python/3.10.4/Python-3.10.4.tgz 
# 解压
[root@elk-node1 alert]# tar zxvf Python-3.10.4.tgz
[root@elk-node1 alert]# cd Python-3.10.4
# 编译安装
[root@elk-node1 Python-3.10.4]# ./configure --enable-optimizations && make altinstall 

git克隆elasalert2

[root@elk-node1 alert]# git clone https://github.com/jertel/elastalert2.git

安装模块

[root@elk-node1 elastalert2]# pip install "setuptools>=11.3"
[root@elk-node1 Python-3.10.4]# python3.10 setup.py install

安装钉钉插件

[root@elk-node1 alert]# wget https://github.com/xuyaoqiang/elastalert-dingtalk-plugin/archive/master.zip
# 解压
[root@elk-node1 alert]# unzip master.zip
[root@elk-node1 alert]# mv elastalert-dingtalk-plugin-master dingtalk
[root@elk-node1 alert]# cd dingtalk/
[root@elk-node1 dingtalk]# cp -r elastalert_modules /opt/alert/elastalert2/
[root@elk-node1 dingtalk]# cd /opt/alert/elastalert2/

修改配置文件

[root@elk-node1 elastalert2]# cd examples/
[root@elk-node1 elastalert2]# cp config.yaml.example config.yaml

查看config.yaml文件

[root@elk-node1 examples]# cat config.yaml|grep -Ev "^#|^$"
rules_folder: examples/rules # 规则文件所在的文件夹路径。
run_every: # 规则执行的频率,每隔1分钟执行一次。
  minutes: 1
buffer_time: # 缓冲时间,在触发警报之前缓冲的时间,为15分钟。
  minutes: 15
es_host: 192.168.40.150 # Elasticsearch集群的IP地址,用于存储警报数据。
es_port: 9200 # 端口
writeback_index: elastalert_status # 用于保存警报状态的索引名称。
alert_time_limit: # 警报的有效时间限制,2天内的警报将被处理。
  days: 2

进入rules目录,新建规则文件

[root@elk-node1 examples]# cd rules/
[root@elk-node1 rules]# cp example_frequency.yaml  example_Warn.yaml

查看 example_Warn.yaml文件

[root@elk-node1 rules]# cat example_Warn.yaml | grep -Ev "^#|^$"
name: Warn # 规则的名称。
type: frequency # 规则类型为频率,表示要检测的事件是在一定时间范围内出现的次数。
index: nginx-logs* # 要在哪个索引中搜索日志数据,这里是以"nginx-logs"开头的索引。
num_events: 5 # 在指定的时间范围内,触发警报所需的事件数量,这里是在最近30分钟内出现了5次404错误。
timeframe:
  minutes: 30
filter: # 过滤要检查的日志事件。这里使用了一个term(项)筛选器,表示只关注response.keyword字段等于"404"的事件。
- term:
     response.keyword: "404"
alert: # 定义触发警报时的操作。
- "elastalert_modules.dingtalk_alert.DingTalkAlerter"
# 钉钉机器人的Webhook地址,用于发送警报消息。
dingtalk_webhook: "https://oapi.dingtalk.com/robot/send?access_token=填你自己的地址"
dingtalk_msgtype: text # 钉钉消息的类型为文本。
dingtalk_text: # 钉钉消息的内容,使用了'%{变量名}'来插入动态信息,这里不完善,有兴趣可以自己修改
  content: "Nginx 404 Alert: %{num_events} 404 errors occurred in the last %{timeframe}"

钉钉机器人设置

获取webhook的token

在这里插入图片描述

设置关键词

在这里插入图片描述

获取IP地址,注意,该IP地址不是虚拟机的IP地址,使用curl ifconfig.me可以获得

在这里插入图片描述

测试

curl -H 'Content-Type: application/json' \
-d '{"msgtype": "text", "text": {"content": "This is a test message"}}' \
-X POST https://oapi.dingtalk.com/robot/send?你自己的token

如果机器人显示消息,则可以正常工作
在这里插入图片描述

启动elastalert2

[root@elk-node1 elastalert2]# python3.10 -m elastalert.elastalert --verbose --config examples/config.yaml --rule examples/rules/example_Warn.yaml

报错

在这里插入图片描述

       这段日志中信息是No mapping found for [@timestamp] in order to sort on,意思是Elasticsearch中找不到 @timestamp 字段的映射,因此不能按照 @timestamp 字段进行排序。这个问题的解决方法是在Elasticsearch中创建一个映射,将 @timestamp 字段定义为日期类型

{
  "properties": {
    "@timestamp": {
      "type": "date"
    }
  }
}

在这里插入图片描述

放在后台运行

[root@elk-node1 elastalert2]# nohub python3.10 -m elastalert.elastalert --verbose --config examples/config.yaml --rule examples/rules/example_Warn.yaml &

简单测试一下,钉钉可以接收警告消息

在这里插入图片描述

参考文章:传送门

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/689659.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

8 系统定时器(Systick)

目录 系统定时器(Systick) SysTick定时器特性介绍 SysTick定时器的功能 SysTick定时器寄存器介绍 Systick定时器的使用 系统定时器(Systick) SysTick定时器特性介绍 计数宽度: 24bit来存储数据,2^24…

HOT21-搜索二维矩阵II

leetcode原题链接: 搜索二维矩阵II 题目描述 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性: 每行的元素从左到右升序排列。每列的元素从上到下升序排列。 示例 1: 输入:matrix [[…

【CSS3系列】第八章 · 伸缩盒模型

写在前面 Hello大家好, 我是【麟-小白】,一位软件工程专业的学生,喜好计算机知识。希望大家能够一起学习进步呀!本人是一名在读大学生,专业水平有限,如发现错误或不足之处,请多多指正&#xff0…

基于matlab使用深度学习的语义分割算法实现(附源码)

一、前言 此示例演示如何使用语义分割网络对图像进行分割。 语义分割网络对图像中的每个像素进行分类,从而生成按类分割的图像。语义分割的应用包括自动驾驶的道路分割和医疗诊断的癌细胞分割。 此示例首先向您展示如何使用预训练的 Deeplab v3 [1] 网络分割图像&am…

git权限异常:fatal: Authentication failed for

本地电脑用户密码修改了,git 提交,拉取出现异常, 就会报这个错误fatal: Authentication failed for http://xxx/xxx/xxx 解决方法: git config --system --unset credential.helper 免输入用户密码方法: git config --global…

【超简单】Ubuntu 安装 bcompare 对比工具

下载 bcompare 安装包:《bcompare-4.4.6.27483_amd64.deb》 https://www.scootersoftware.com/download.php?zzdl4 安装 becompare 安装: sudo dpkg -i bcompare-4.4.6.27483_amd64.deb破解 cd /usr/lib/beyondcompare/ sudo sed -i "s/kee…

革新写作方式:ChatGPT最新版带来高质量聚合文章的批量生成

随着人工智能技术的不断发展,自然语言处理模型也在不断进步。ChatGPT最新版作为一种强大的语言模型,可以生成高质量的聚合文章,为写作方式带来了革新。本文将详细阐述ChatGPT最新版带来的革新之处,包括其应用领域、生成文章的流程…

剑指 Offer 12: 矩阵中的路径

这道题看着简直是完全没思路,看了下发现是使用回溯的方法。 下面这里要注意,newi是旧的i加上新的偏移值!newj同理,并不是加自己,别昏头! s是String类型的变量,要写成size() 下面是正确的代码&a…

计算机组成原理笔记——指令系统、中央处理器、总线、I/O系统(王道考研)(持续更新)

文章目录 前言指令系统指令系统概述指令格式扩展操作码 指令寻址方式指令寻址数据寻址基本的寻址思路偏移寻址隐含寻址 程序的机器级代码表示汇编基础AT&T格式和Intel格式C语言控制结构的汇编表示函数调用栈帧切换栈帧构造 CISC与RISC 中央处理器CPU的功能和基本结构运算器…

企业级微服务架构实战项目--xx优选-详情页面的异步调用completablefuture

一 常见的调用方式 1.1 并发 并发:同一时刻多个线程在访问同一个资源,多个线程对一个点 ​ 例子:春运抢票、微信抢红包、电商秒杀... 1.2 同步串行 代表多任务按先后顺序执行,并且都是同一个线程来执行。 1.3 异步串行 代表…

【Java面试题】Java基础语法

文章目录 JDK、JRE和JVM的关系栈和堆分别存的什么数据异步和同步线程和进程区别java的数据类型有哪些equals和HashCode重写的问题?深拷贝和浅拷贝的区别和equals的区别常见的运行时异常有哪些? JDK、JRE和JVM的关系 JDK、JRE和JVM的关系: JDK(Java De…

数据库监控与调优【一】—— 数据库调优的维度

数据库调优的维度 左边是千金良方的,右边是个人优化的 业务需求 不合理的需求,可能会造成很多问题勇敢地对不合理的需求说不拨乱反正,把不合理的需求变成合理的需求 例子 财务SaaS系统,财务领域有种叫做代账的概念&#xff0c…

uniapp+vue3+vite+ts+uviewPlus搭建项目

创建项目架构 首先使用官方提供的脚手架创建一个项目,这里我创建的 vue3 vite ts 的项目: npx degit dcloudio/uni-preset-vue#vite-ts project-name(如命令行创建失败,请直接访问 giteehttps://gitee.com/dcloud/uni-preset-vu…

2023-06-20 x-ui-服务器配置记录

基础软件: yum update -y yum install -y curl yum install -y socat 工作软件: bash <(curl -Ls https://raw.githubusercontent.com/sprov065/x-ui/master/install.sh) 设置: 参考: 使用云服务器搭建代理-腾讯云开发者社区-腾讯云

9 HAL库驱动框架简述

目录 HAL库驱动框架简述 HAL库外设设计思想 HAL库和Cube MX相结合 一、对外设的封装——句柄结构体 二、外设初始化 初始化结构体 初始化的逻辑 三、外设使用逻辑 通用接口函数 初始化函数 I/O操作函数 控制函数 状态参数 扩展接口函数 总结 补充&#xff1a;H…

算法拾遗三十四线段树

算法拾遗三十四线段树 线段树说明物理结构使用线段树落方块的问题 线段树说明 给定固定长度的数组&#xff0c;然后要在数组给定的范围内完成加法【如数组1&#xff0c;200下标元素加6】&#xff0c;更新【7&#xff0c;375范围数组元素更新为4】&#xff0c;查询操作【查询3到…

【云原生丶Docker】Docker镜像加速器:给冰山提提速!

Docker镜像加速器是一种用于加速Docker镜像下载和构建的工具。它可以提高Docker镜像的下载速度&#xff0c;从而加快应用程序的开发和部署速度。 Docker镜像加速器通常使用CDN(内容分发网络)技术来实现加速。CDN是一种分布式网络架构&#xff0c;可以将内容缓存在全球各地的服务…

命令行--git--多次commit如何合并成一个commit

参考:https://blog.csdn.net/qq_50652600/article/details/120800309 在我们平时开发中&#xff0c;我们提交代码免不了要和git打交道&#xff0c;那么我们肯定是先从预发分支上&#xff08;公司一般都用pre命名&#xff0c;这里为了方便演示用master&#xff09;上拉去最新的代…

测试工程师首用chatGPT,编写python读取xmind测试用例chatgtp+python+xmind

背景 有用xmind写测试用例的吧&#xff0c;统计一个xmind的条&#xff0c;需要花大量的时间&#xff0c;还有要统计有多少条冒烟的&#xff0c;多少条不通过的&#xff0c;还有通过的条数。 需求 快速使用python&#xff0c;写一个简单的脚本&#xff0c;统计所有xmind节点&…

FPGA_学习_09_PWM呼吸灯

PWM在三相电机控制中&#xff0c;有着非常重要的地位。 如果你需要用FPGA去实现三相电机的控制&#xff0c; PWM这一关是绕不过的。好在PWM的基本原理是比较简单的。所以原理部分本文就略过&#xff0c;本文基于PWM实现呼吸灯。 1 时序 {signal: [{name: clk, wave: p....…