elk高并发架构

news2025/1/11 4:09:21

1.前言

普通的elk架构只适合数据量小的情景,而且也不安全,在瞬时数据量大的情况下可能会导致logstash崩溃,从而导致数据的丢失,对于数据安全有较高要求,可以在架构中加入消息队列,既可以防止瞬时的大流量并发,也可以防止在logstash挂掉的情况下数据的丢失

2.架构

给每个需要收集日志的主机部署filebeat服务,filebeat统一将日志输出到kafka集群对应的topic中,再由logstash去主动消费kafka集群topic中的日志数据,由于是logstash主动去消费,就不会发生logstash顶不住大并发数据量而崩溃的问题,但是如果数据量大,logstash消费不过来会导致kafka的消息堆积,一旦kafka堆积过多导致崩溃,所以logstash的数量也是很重要的,logstash消费后的数据写入到elasticsearch集群中,elasticsearch使用集群模式,可以保证数据的安全与高可用,最后再通过kibana展示数据

对于这个架构可能会有一些疑点,因为kafka的特殊性,主题的数据被消费后是不会立刻被删除的,所以使用多个logstash节点去消费主题会不会导致重复消费,logstash单节点宕机的情况下,kafka的主题还在写入数据,logstash恢复后会不会消费挂掉期间主题产生的数据,是否是从头消费导致重复消费以前的数据

已通过试验证明,试验中auto_offset_reset 配置为latest模式,logstash单节点宕机恢复后,会根据宕机之前offset的偏移量继续去消费主题中的数据,所以即使在宕机期间topic产生的数据,在logstash恢复后会继续去消费

已通过试验证明,试验中所有logstash节点都配置相同的消费者组,logstash配置多个节点也不会产生重复消费的情况,且每个logstash节点都是有消费topic数据的,每个logstash节点消费的数据量都差不多,相差不大,类似于负载均衡效果

3.主机信息

nameipserivceport
A10.1.60.114zookeeper、kafka、logstash、elasticsearch2181、2888、3888、9092、9200、9300
B10.1.60.115zookeeper、kafka、logstash、elasticsearch、kibana2181、2888、3888、9092、9200、9300、5601
C10.1.60.118zookeeper、kafka、logstash、elasticsearch2181、2888、3888、9092、9200、9300

3.架构搭建 

zookeeper集群搭建(kafka需要使用zookeeper)

参考:zookeeper集群搭建_Apex Predator的博客-CSDN博客

kafka集群搭建 

参考:kafka集群搭建_Apex Predator的博客-CSDN博客

elasticsearch集群搭建 

参考:elasticsearch集群搭建_Apex Predator的博客-CSDN博客

kibana部署 

安装kibana需要部署java环境,7版本以上的要使用jdk 11版本以上

 参考:jdk1.8环境配置_Apex Predator的博客-CSDN博客

在官网下载kibana的安装包,本次搭建选择了7.17.10版本

Past Releases of Elastic Stack Software | Elastic

新建目录,将安装包上传到主机中并解压更改名称

mkdir /opt/kibana

cd /opt/kibana

tar -zxvf kibana-7.17.10-linux-x86_64.tar.gz

mv kibana-7.17.10-linux-x86_64 kibana

ls /opt/kibana

 编辑kibana配置文件

vi /opt/kibana/kibana/config/kibana.yml

server.port: 5601   #配置监听端口
server.host: "0.0.0.0"   #配置访问地址
server.name: "kibana"   #kibana名称
elasticsearch.hosts: ["http://10.1.60.114:9200","http://10.1.60.115:9200","http://10.1.60.118:9200"]    #配置elasticsearch集群地址
i18n.locale: "zh-CN"     #更改界面为中文模式

kibana不能使用root用户启动,所以需要创建启动kibana服务用户

groupadd kibana

useradd kibana -g kibana -p kibana

授予权限给kibana目录

chown -R kibana.kibana /opt/kibana/kibana

切换用户启动kibana服务

su kibana

nohup /opt/kibana/kibana/bin/kibana > /opt/kibana/kibana/kibana.log &

查看kibana服务是否正常

netstat -tlpn |grep 5601

logstash部署 

安装logstash服务也需要java环境,因为和es在同一台机器上,就不需要再部署java环境了

在官网下载Logstash的安装包,本次搭建选择了7.17.10版本

Past Releases of Elastic Stack Software | Elastic

新建目录,将安装包上传到主机中并解压更改名称

mkdir /opt/logstash

cd /opt/logstatsh

tar -zxvf logstash-7.17.10-linux-x86_64.tar.gz

mv logstash-7.17.10-linux-x86_64 logstatsh

ls /opt/logstash

filebeat部署

在官网下载filebeat的安装包,本次搭建选择了7.17.10版本

Past Releases of Elastic Stack Software | Elastic

新建目录,将安装包上传到主机中并解压更改名称

mkdir /opt/filebeat

cd /opt/filebeat

tar -zxvf filebeat-7.17.10-linux-x86_64.tar.gz

mv filebeat-7.17.10-linux-x86_64 filebeat

ls /opt/filebeat

 4.服务配置

先配置filebeat收集nginx日志

vi /opt/filebeat/filebeat/filebeat.yml

filebeat.inputs:
- type: log
  enabled: true
  paths:
     - /var/log/nginx/access.log    #配置收集日志的路径
  fields:                         #为该日志定义一个字段,供后面调用
     topic: nginx-access-log
  tail_files: true

- type: log

  enabled: true
  paths:
    - /var/log/nginx/error.log
  fields:
     topic: nginx-error-log
  tail_files: true

# ------------------------------ kafka Output -------------------------------
output.kafka:
  enabled: true 
  hosts: ["10.1.60.112:9092","10.1.60.114:9092","10.1.60.115:9092"]   #kafka集群地址
  topic: '%{[fields][topic]}'    #调用前面的日志字段,来自动写入对应的主题

先不启动filebeat,等启动了logstash再启动filebeat

编辑logstash配置文件

input {
  kafka {
    bootstrap_servers => "10.1.60.112:9092,10.1.60.114:9092,10.1.60.115:9092"
    client_id => "nginx"
    group_id => "nginx"    #配置消费者组,避免重复消费的关键
    auto_offset_reset => "latest"    #配置偏移策略,latest当消费者发现分区的偏移量无效或不存在时,将从分区的最新可用偏移量开始消费。这意味着消费者将只消费从订阅时刻之后发布的消息,之前的消息将被忽略,earliest当消费者发现分区的偏移量无效或不存在时,将从分区的最早可用偏移量开始消费。这意味着消费者将从分区的开头开始消费消息,无论之前是否有消费记录
    consumer_threads => 1   #配置消费线程
    decorate_events => true
    topics => ["nginx-access-log","nginx-error-log"]  #配置kafka存储数据的主题
    codec => json    #将kafka数据格式转换成json格式
  }
}

filter {
  if [fields][topic] == "nginx-access-log" {   #通过判断主题名称去将指定主题中的日志转化为结构化日志
    grok {
      match => {
        "message" => "%{IP:ip} - (%{USERNAME:remote_user}|-) \[%{HTTPDATE:timestamp}\] \"%{WORD:request_method} %{URIPATHPARAM:request_url} HTTP/%{NUMBER:http_version}\" %{NUMBER:http_status} %{NUMBER:bytes} \"(%{DATA:referrer}|-)\" \"%{DATA:http_agent}\" \"(%{DATA:forwarded}|-)\""
      }
    }
  }
  else if [fields][topic] == "nginx-error-log" {
    grok {
      match => {
        "message" => "%{DATA:timestamp} \[%{WORD:severity}\] %{NUMBER:pid}#%{NUMBER:tid}: %{GREEDYDATA:prompt}"
      }
    }
  }
}

output {
#  stdout{
#    codec => rubydebug
#  }
  elasticsearch {
    hosts => ["http://10.1.60.114:9200","http://10.1.60.115:9200","http://10.1.60.118:9200"]
    index => "%{[@metadata][kafka][topic]}-%{+YYYY.MM.dd}"   #通过主题名称和时间去规定索引的名称,使用主题的变量去自动获取不同主题的数据
  }
}

关于多节点logstash的配置,配置文件也是和以上的相同即可,只是kafka的client_id项需要配置不同的值,这里我没有配置日志过滤,需要过滤的可以参照以下链接中的文章去配置,grok的说明也可以看以下的文章去理解

mutate使用:mutate使用(日志过滤)_Apex Predator的博客-CSDN博客

grok使用: grok使用(将日志结构化)_Apex Predator的博客-CSDN博客

启动所有logstash服务

 nohup /opt/logstash/logstash/bin/logstash -f /opt/logstash/logstash/config/logstash.conf > /opt/logstash/logstash/logstash.log &

启动所有filebeat服务

 nohup /opt/filebeat/filebeat/filebeat -e -c /opt/filebeat/filebeat/filebeat.yml > /opt/filebeat/filebeat/filebeat.log &

5.生成日志查看效果

访问nginx服务,需要访问filebeat收集的nginx日志的服务

 安装压测工具,使用压测工具去测试

yum -y install httpd-tools

ab -n 20 -c 4 10.1.60.114/

 使用kibana查看是否生成对应的索引与收集的日志数量是否正确

因为我是已经使用过了,所以本身是有数据的,但是新的话,会看到创建了这个索引,并且收集到了20条日志数据

也可以观察kafka的topic数据情况

 查看日志数据也可以发现,日志是没有问题的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/734327.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

有过JVM调优经验吗【面试题】

写作目的 JVM其实比较偏理论的,日常工作中很少遇到。但是面试他问,所以需要自己mock一下场景进行准备这个问题的回复。 本次分析的场景的元空间太小导致频繁FGC的问题。 源码&启动参数 gitee下载源码 启动-调优前 nohup java -XX:MetaspaceS…

【MySQL】SQL索引失效的几种场景及优化

MySQL中提高性能的一个最有效的方式是对数据表设计合理的索引。索引提供了高效访问数据的方法,并且加快查询的速度, 因此索引对查询的速度有着至关重要的影响。 使用索引可以快速地定位表中的某条记录,从而提高数据库查询的速度,…

C++之函数模板高级用法(一百五十四)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 人生格言: 人生…

两个好用到爆的Python模块,建议收藏!

在日常开发工作中,经常会遇到这样的一个问题:要对数据中的某个字段进行匹配,但这个字段有可能会有微小的差异。比如同样是招聘岗位的数据,里面省份一栏有的写“广西”,有的写“广西壮族自治区”,甚至还有写…

基于单片机的智能鞋柜的设计与实现

功能介绍 以51单片机作为主控系统;通过DHT11温湿度采集;通过按键设置逻辑处理;通过LED紫外线消毒;通过继电器控制风扇进行换气除湿;通过继电器控制加热片进行加热;整个电路以5v供电; 电路图 PCB 源代码 #i…

nodejs 读取xlsx 文件转json 格式(包含表格时间类型)

需求概要:从xlsx 文件中读取内容转化成想要的json 格式,用于web 读取数据 newDoc.xlsx文档内容大概: 本内容主要是更新前端公告内容, const xlsx require(node-xlsx) const fs require(fs) const moment require(moment)//转换…

双非本大二上岸大厂——念念不忘,必有回响

⭐️前言⭐️ 博主就读于一所普通的学校(双非本),在大二下学期3月份开始网上投递简历,历时近百余天,投递简历500,面试近40余场,最终在6月份学期末,斩获了两个大厂offer(北…

最小栈——力扣155

方法&#xff1a;辅助栈 这些函数中只有求最小值函数需要借助辅助栈 代码如下&#xff1a; class MinStack {stack<int> x_stack;stack<int> min_stack; public:MinStack() {min_stack.push(INT_MAX);}void push(int val) {x_stack.push(val);min_stack.push(…

使用Java计算课程绩点、课程学分绩点、总绩点

1、定义实体类 实体类中包括属性表 名称释义xuefen该课程学分chengji该课程取得的成绩xuefenjidian该课程取得的学分绩点xuefen该课程取得的学分 其中有式子&#xff1a; j i d i a n ( c h e n g j i − 50 ) 10.0 jidian \frac{(chengji-50)}{10.0} jidian10.0(chengji−…

【Azure】解析 Microsoft Defender for Cloud:云安全的保护与管理

你在使用自己的电脑的时候&#xff0c;作为安全防护你可能直接装个杀毒软件&#xff0c;或者什么xx管家之类的&#xff0c;那么你是否有想过&#xff0c;如果我有一套云服务之后&#xff0c;我应该如何进行安全防护呢&#xff1f;本文带你了解在 Azure 云中的安全防护体系&…

同余最短路

同余最短路就是把每一个同余类当成一个结点&#xff0c;在同余类之间建边&#xff0c;然后跑最短路 答案统计的时候对每个同余类单独计算贡献 题意&#xff1a; 思路&#xff1a; 答案可以对模X的所有同余类计算贡献 设dis[i]为在模X意义下&#xff0c;Y和Z之后%X余数为i的…

数据库练习

数据库练习 建立三张表&#xff0c;以及表中的联系 由于学生表中存在外键&#xff0c;所以我们需要先创建课程表和班级表 课程表 mysql> create table course(-> course_id int primary key auto_increment comment 课程编号,-> course_name varchar(10) not null…

【C++初阶】C++入门——引用

文章目录 一、引用的概念二、共用同一块空间验证三、引用的特性3.1 引用在定义时必须初始化3.2 一个变量可以有多个引用3.3 引用不能改变 四、引用的使用场景4.1 做参数4.2 做返回值 五、传值、传引用效率比较六、常引用6.1 权限放大——不被允许6.2 权限平移6.3 权限缩小6.4 赋…

MYSQL索引为啥要用B+树储存数据呢

首先我们来分析一下需求 MYSQL索引需要怎样的数据结构 为了防止数据因为特(duan)殊(kai)情(dian)况(yuan)丢失,我们的数据肯定是要持久化的,也就是保存在硬件(磁盘)里面,而我们知道 磁盘相对于内存来讲 速度要慢了几万倍 甚至即使万倍 所以我们必须减少磁盘的I/O操作 再有呢…

电子时钟制作(瑞萨RA)(10)----电容触摸配置

概述 这篇文档将创建一个使用 e2 studio 集成 QE 的电容式触摸应用示例。 硬件准备 首先需要准备一个开发板&#xff0c;这里我准备的是芯片型号R7FA2E1A72DFL的开发板&#xff1a; 视频教程 https://www.bilibili.com/video/BV14h4y1E7py/ 电子时钟制作(10)----电容触摸配…

python接口自动化(二十三)--unittest断言——上(详解)

简介 在测试用例中&#xff0c;执行完测试用例后&#xff0c;最后一步是判断测试结果是 pass 还是 fail&#xff0c;自动化测试脚本里面一般把这种生成测试结果的方法称为断言&#xff08;assert&#xff09;。用 unittest 组件测试用例的时候&#xff0c;断言的方法还是很多的…

MyBatis查询数据库(1)

前言&#x1f36d; ❤️❤️❤️SSM专栏更新中&#xff0c;各位大佬觉得写得不错&#xff0c;支持一下&#xff0c;感谢了&#xff01;❤️❤️❤️ Spring Spring MVC MyBatis_冷兮雪的博客-CSDN博客 经过前⾯的学习咱们 Spring 系列的基本操作已经实现的差不多了&#xff0…

DEJA_VU3D - Cesium功能集 之 111-风场(局部)效果

前言 编写这个专栏主要目的是对工作之中基于Cesium实现过的功能进行整合,有自己琢磨实现的,也有参考其他大神后整理实现的,初步算了算现在有差不多实现小140个左右的功能,后续也会不断的追加,所以暂时打算一周2-3更的样子来更新本专栏(每篇博文都会奉上完整demo的源代码…

LeetCode-每日一题【2095.删除链表的中间节点】

题目 给你一个链表的头节点 head 。删除 链表的 中间节点 &#xff0c;并返回修改后的链表的头节点 head 。 长度为 n 链表的中间节点是从头数起第 ⌊n / 2⌋ 个节点&#xff08;下标从 0 开始&#xff09;&#xff0c;其中 ⌊x⌋ 表示小于或等于 x 的最大整数。 对于 n 1、…

event.stopPropagation()和event.preventDefault()之间的联系

目录 阻止事件冒泡&#xff0c;阻止默认事件&#xff0c;event.stopPropagation()和event.preventDefault()&#xff0c;return false的区别 今天来看看前端的冒泡和事件默认事件如何处理 1.event.stopPropagation()方法 这是阻止事件的冒泡方法&#xff0c;不让事件向documen上…