使用Filebeat+Kafka+Logstash+Elasticsearch构建日志分析系统

news2025/1/24 5:34:21

        随着时间的积累,日志数据会越来越多,当您需要查看并分析庞杂的日志数据时,可通过Filebeat+Kafka+Logstash+Elasticsearch采集日志数据到Elasticsearch中,并通过Kibana进行可视化展示与分析。本文介绍具体的实现方法。

一、背景信息

Kafka是一种分布式、高吞吐、可扩展的消息队列服务,广泛用于日志收集、监控数据聚合、流式数据处理、在线和离线分析等大数据领域,已成为大数据生态中不可或缺的部分。

在实际应用场景中,为了满足大数据实时检索的需求,您可以使用Filebeat采集日志数据,并输出到Kafka中。Kafka实时接收Filebeat采集的数据,并输出到Logstash中。输出到Logstash中的数据在格式或内容上可能不能满足您的需求,此时可以通过Logstash的filter插件过滤数据。最后将满足需求的数据输出到Elasticsearch中进行分布式检索,并通过Kibana进行数据分析与展示。简单流程如下。

流程图

二、操作流程

1、准备工作

完成环境准备,包括创建Elasticsearch、Logstash、ECS和消息队列 Kafka 版实例、创建Topic和Consumer Group等。

2、步骤一:安装并配置Filebeat

  安装并配置Filebeat,设置input为系统日志,output为Kafka,将日志数据采集到Kafka的指定Topic中。

3、步骤二:配置Logstash管道

配置Logstash管道的input为Kafka,output为阿里云Elasticsearch,使用Logstash消费Topic中的数据并传输到阿里云Elasticsearch中。

4、步骤三:查看日志消费状态

在消息队列Kafka中查看日志数据的消费的状态,验证日志数据是否采集成功。

5、步骤四:通过Kibana过滤日志数据

在Kibana控制台的Discover页面,通过Filter过滤出Kafka相关的日志。

三、步骤一:安装并配置Filebeat

  1. 连接ECS服务器。

  2. 安装Filebeat。

    ​本文以6.8.5版本为例,安装命令如下,详细信息请参见Install Filebeat。

    curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.8.5-linux-x86_64.tar.gz
    tar xzvf filebeat-6.8.5-linux-x86_64.tar.gz
  3. 执行以下命令,进入Filebeat安装目录,创建并配置filebeat.kafka.yml文件。

    cd filebeat-6.8.5-linux-x86_64
    vi filebeat.kafka.yml

    filebeat.kafka.yml配置如下。

    filebeat.prospectors:
      - type: log
        enabled: true
        paths:
            - /var/log/*.log
    
    output.kafka:
        hosts: ["alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092"]
        topic: estest
        version: 0.10.2

    重要

    当Filebeat为7.0及以上版本时,filebeat.prospectors需要替换为filebeat.inputs。

    参数

    说明

    type

    输入类型。设置为log,表示输入源为日志。

    enabled

    设置配置是否生效:

    • true:生效

    • false:不生效

    paths

    需要监控的日志文件的路径。多个日志可在当前路径下另起一行写入日志文件路径。

    hosts

    消息队列Kafka实例的单个接入点,可在实例详情页面获取,详情请参见查看接入点。由于本文使用的是VPC实例,因此使用默认接入点中的任意一个接入点。

    topic

    日志输出到消息队列Kafka的Topic,请指定为您已创建的Topic。

    version

    Kafka的版本,可在消息队列Kafka的实例详情页面获取。

    重要

    • 不配置此参数会报错。

    • 由于不同版本的Filebeat支持的Kafka版本不同,例如8.2及以上版本的Filebeat支持的Kafka版本为2.2.0,因此version需要设置为Filebeat支持的Kafka版本,否则会出现类似报错:Exiting: error initializing publisher: unknown/unsupported kafka vesion '2.2.0' accessing 'output.kafka.version' (source:'filebeat.kafka.yml'),详细信息请参见version。

  4. 启动Filebeat。

    ./filebeat -e -c filebeat.kafka.yml

四、步骤二:配置Logstash管道

  1. 进入阿里云Elasticsearch控制台的Logstash页面。
  2. 进入目标实例。
    1. 在顶部菜单栏处,选择地域。
    2. Logstash实例中单击目标实例ID。
  3. 在左侧导航栏,单击管道管理

  4. 单击创建管道

  5. 创建管道任务页面,输入管道ID并配置管道。

    本文使用的管道配置如下。

    input {
      kafka {
        bootstrap_servers => ["alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-3-vpc.alikafka.aliyuncs.com:9092"]
        group_id => "es-test"
        topics => ["estest"]
        codec => json
     }
    }
    filter {
    
    }
    output {
      elasticsearch {
        hosts => "http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200"
        user =>"elastic"
        password =>"<your_password>"
        index => "kafka‐%{+YYYY.MM.dd}"
     }
    }
    表 1. input参数说明

    参数

    说明

    bootstrap_servers

    消息队列Kafka实例的接入点,可在实例详情页面获取,详情请参见查看接入点。由于本文使用的是VPC实例,因此使用默认接入点。

    group_id

    指定为您已创建的Consumer Group的名称。

    topics

    指定为您已创建的Topic的名称,需要与Filebeat中配置的Topic名称保持一致。

    codec

    设置为json,表示解析JSON格式的字段,便于在Kibana中分析。

    表 2. output参数说明

    参数

    说明

    hosts

    阿里云Elasticsearch的访问地址,取值为http://<阿里云Elasticsearch实例的私网地址>:9200

    说明

    您可在阿里云Elasticsearch实例的基本信息页面获取其私网地址,详情请参见查看实例的基本信息。

    user

    访问阿里云Elasticsearch的用户名,默认为elastic。您也可以使用自建用户,详情请参见通过Elasticsearch X-Pack角色管理实现用户权限管控。

    password

    访问阿里云Elasticsearch的密码,在创建实例时设置。如果忘记密码,可进行重置,重置密码的注意事项及操作步骤请参见重置实例访问密码。

    index

    索引名称。设置为kafka‐%{+YYYY.MM.dd}表示索引名称以kafka为前缀,以日期为后缀,例如kafka-2020.05.27

    更多Config配置详情请参见Logstash配置文件说明。

    如果您有多topic的数据同步需求,需要在kafka中添加新的topic,然后在Logstash的管道配置中添加input。示例如下:

    input {
     kafka {
      bootstrap_servers => ["alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-3-vpc.alikafka.aliyuncs.com:9092"]
      group_id => "es-test"
      topics => ["estest"]
      codec => json
    }
    
    kafka {
      bootstrap_servers => ["alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-3-vpc.alikafka.aliyuncs.com:9092"]
      group_id => "es-test-2"
      topics => ["estest_2"]
      codec => json
    }
    }
  6. 单击下一步,配置管道参数。

    管道参数配置

    参数

    说明

    管道工作线程

    并行执行管道的Filter和Output的工作线程数量。当事件出现积压或CPU未饱和时,请考虑增大线程数,更好地使用CPU处理能力。默认值:实例的CPU核数。

    管道批大小

    单个工作线程在尝试执行Filter和Output前,可以从Input收集的最大事件数目。较大的管道批大小可能会带来较大的内存开销。您可以设置LS_HEAP_SIZE变量,来增大JVM堆大小,从而有效使用该值。默认值:125。

    管道批延迟

    创建管道事件批时,将过小的批分派给管道工作线程之前,要等候每个事件的时长,单位为毫秒。默认值:50ms。

    队列类型

    用于事件缓冲的内部排队模型。可选值:

    • MEMORY:默认值。基于内存的传统队列。

    • PERSISTED:基于磁盘的ACKed队列(持久队列)。

    队列最大字节数

    请确保该值小于您的磁盘总容量。默认值:1024 MB。

    队列检查点写入数

    启用持久性队列时,在强制执行检查点之前已写入事件的最大数目。设置为0,表示无限制。默认值:1024。

    警告

    配置完成后,需要保存并部署才能生效。保存并部署操作会触发实例重启,请在不影响业务的前提下,继续执行以下步骤。

  7. 单击保存或者保存并部署

    • 保存:将管道信息保存在Logstash里并触发实例变更,配置不会生效。保存后,系统会返回管道管理页面。可在管道列表区域,单击操作列下的立即部署,触发实例重启,使配置生效。

    • 保存并部署:保存并且部署后,会触发实例重启,使配置生效。

五、步骤三:查看日志消费状态

  1. ​进入消息队列Kafka控制台。

  2. 参见查看消费状态,查看详细消费状态。

    预期结果如下:

    查看消费详情

六、步骤四:通过Kibana过滤日志数据

  1. 登录目标阿里云Elasticsearch实例的Kibana控制台,根据页面提示进入Kibana主页。

  2. 创建一个索引模式。

    1. 在左侧导航栏,单击Management

    2. 在Kibana区域,单击Index Patterns

    3. 单击Create index pattern

    4. 输入Index pattern(本文使用kafka-*),单击Next step

      创建索引模式

    5. 选择Time Filter field name(本文选择@timestamp),单击Create index pattern

      Time Filter field name

  3. 在左侧导航栏,单击Discover

  4. 从页面左侧的下拉列表中,选择您已创建的索引模式(kafka-*)。

  5. 在页面右上角,选择一段时间,查看对应时间段内的Filebeat采集的日志数据。

    查看日志数据

  6. 单击Add a filter,在Add filter页面中设置过滤条件,查看符合对应过滤条件的日志数据。

    过滤日志数据

七、常见问题

Q:同步日志数据出现问题,管道一直在生效中,无法将数据导入Elasticsearch,如何解决?

A:查看Logstash实例的主日志是否有报错,根据报错判断原因,具体操作请参见查询日志。常见的原因及解决方法如下。

原因

解决方法

Kafka的接入点不正确。

参见查看接入点获取正确的接入点。完成后,修改管道配置替换错误接入点。

Logstash与Kafka不在同一VPC下。

重新购买同一VPC下的实例。购买后,修改现有管道配置。

说明

VPC实例只能通过专有网络VPC访问

云消息队列 Kafka 版

Kafka或Logstash集群的配置太低,例如使用了测试版集群。

升级集群规格,完成后,刷新实例,观察变更进度。升级Logstash实例规格的具体操作,请参见升配集群;升级Kafka实例规格的具体操作,请参见升级实例配置。

管道配置中包含了file_extend,但没有安裝logstash-output-file_extend插件。

选择以下任意一种方式处理:

  • 安装logstash-output-file_extend插件。具体操作,请参见 安装或卸载插件。

  • 中断变更,等到实例处于变更中断状态后,在管道配置中,去掉file_extend配置,触发重启恢复。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1209729.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

软件工程-第7章 面向对象方法基础

第7章 面向对象方法基础 面向对象的基本概念 面向对象方法的世界观&#xff1a;一切系统都是由对象构成的&#xff0c;他们的相互作用、相互影响&#xff0c;构成了大千世界的各式各样系统。面向对象方法是一种以对象、对象关系等来构造软件系统模型的系统化方法。 面向对象 …

浅谈:Flutter现状、与为什么选择Flutter——其实大家都在用只是你不知道罢了

浅谈&#xff1a;谁将会动那些抵制学习还装懂的人的蛋糕 开发环境现状与为什么选择Flutter 我本从不屑于写这种技术外的技术文章&#xff0c;但是今天刷某应用优点上头&#xff0c;想发唯一一篇。这篇文章可能会得罪一些就喜欢地址学新架构的&#xff0c;以及还不了解就开始起哄…

MySQL是如何进行排序的,ORDER BY是如何执行的

MySQL 会给每个线程分配一块内存用于排序&#xff0c;称为 sort_buffer。 假设找出在杭州居住的人&#xff0c;按名字排序前1000个人&#xff08;假设city有索引&#xff0c;那么非常舒服&#xff0c;不用全表扫描&#xff09; select city,name,age from t where city杭州 or…

git简明指南

目录 安装 创建新仓库 检出仓库 工作流 安装 下载 git OSX 版 下载 git Windows 版 下载 git Linux 版 创建新仓库 创建新文件夹&#xff0c;打开&#xff0c;然后执行 git init 以创建新的 git 仓库。 检出仓库 执行如下命令以创建一个本地仓库的克隆版本&…

第3章:搜索与图论【AcWing】

文章目录 图的概念图的概念图的分类有向图和无向图 连通性连通块重边和自环稠密图和稀疏图参考资料 图的存储方式邻接表代码 邻接矩阵 DFS全排列问题题目描述思路回溯标记剪枝代码时间复杂度 [N 皇后问题](https://www.luogu.com.cn/problem/P1219)题目描述全排列思路 O ( n ! …

【数据分享】2015-2023年我国地级市逐月房价数据(Excel格式/Shp格式)

房价是一个城市发展程度的重要体现&#xff0c;一个城市的房价越高通常代表这个城市越发达&#xff0c;对于人口的吸引力越大&#xff01;因此&#xff0c;房价数据是我们在各项城市研究中都非常常用的数据&#xff01;之前我们分享过我国主要城市2023年房价数据&#xff08;可…

无标题栏的Qt子窗体在父窗体中停靠时,如何做到严丝合缝

目录 1. 问题的提出 2. 一般实现 3. 加强版 1. 问题的提出 由于业务的要求&#xff0c;需要从父窗体弹出一个子窗体&#xff0c;该子窗体无标题栏&#xff0c;且该子窗体要停靠到父窗体右下角。这个看似很容易的问题&#xff0c;细研起来其实不容易&#xff01; 2. 一般实现…

维基百科是非营利性机构 词条内容具有中立性、准确性、可靠性

维基百科对一些企业很有神秘性&#xff0c;自行操作很多次也没有成功建立维基百科&#xff0c;这一定是没有按照维基百科的规则和流程去操作。小马识途营销顾问提醒企业&#xff0c;维基百科是一种基于协作的在线百科全书&#xff0c;由维基媒体基金会运营。维基百科的创建流程…

内涝积水设备推荐,城市易涝点怎么监测?

随着城市化逐步发展&#xff0c;城市内涝问题越来越多且越来越严重&#xff0c;给人们的出行和生活带来很多的不便。为了解决这一问题&#xff0c;内涝积水监测仪受到越来越受到关注。 内涝积水设备能够实时监测道路积水情况&#xff0c;包括积水深度等信息&#xff0c;为相关人…

nodejs+vue公益帮学网站的设计与实现-微信小程序-安卓-python-PHP-计算机毕业设计

在当今高度发达的信息中&#xff0c;信息管理改革已成为一种更加广泛和全面的趋势。为确保中国经济的持续发展&#xff0c; 如何用方便快捷的方式使管理者在广阔的数据海洋里面查询、存储、管理和共享有效的数据信息&#xff0c;对我们的学习&#xff0c;工作和生活具有重要的现…

μC/OS-II---计时器管理2(os_tmr.c)

目录 获取计时器的名字获取计时器到期前剩余的时间查看计时器的状态 计时器是倒计时器&#xff0c;当计数器达到零时执行某个动作。用户通过回调函数提供这个动作。回调函数是用户声明的函数&#xff0c;在计时器到期时被调用。在回调函数中绝对不能进行阻塞调用&#xff08;例…

vue 事件总线 非父子组件之间的简单信息传递

如果两个组件不是父子关系&#xff0c;那么传递信息就不能通过props了。 此时可以使用vue的事件总线来传递信息。 1.创建非父子组件都能访问的事件总线&#xff08;也就是空的vue实例&#xff09; 1.创建一个EventBus.js 2.引入vue并且创建一个vue实例 import Vue from vuec…

微信小程序 解决tab页切换过快 数据出错问题

具体问题&#xff1a;切换tab页切换过快时,上一个列表接口未响应完和当前列表数据冲突 出现数据错误 具体效果如下&#xff1a; 解决方式&#xff1a;原理 通过判断是否存在request 存在中断 并发送新请求 不存在新请求 let shouldAbort false; // 添加一个中断标志 let re…

STM32 LED编程 GPIO的初始化(标准库)

实验的电路图介绍 实验的电路图类似于开漏接法 要初始化GPIOC接口 标准库的模板 GPIO的标准库编程接口 GPIO引脚的初始化 GPIO作为片上外设 每一个片上外设使用前一定要使能时钟 为什么要使能时钟&#xff1f;时钟是啥 时钟的使能 stm32的每一个片上外设都是时序电路 时序…

计算机视觉基础(6)——光流估计

前言 本章我们来学习一下图像处理基础中的运动估计。主要内容包括运动场估计和光流估计两个部分。在运动场估计中&#xff0c;我们将学习到运动场、光流、光流和运动场的区别&#xff1b;在光流估计中&#xff0c;我们将学习到光流估计任务、孔径问题&#xff0c;以及光流估计两…

高质量实时渲染笔记

文章目录 Real-time shadows1 自遮挡问题2 解决阴影detach问题&#xff1f;3 Aliasing4 近似积分5 percentage closer soft shadows(PCSS)percenta closer filtering(PCF)PCSS的思想 6 Variance Soft Shadow Mapping (VSSM)步骤Moment Shadow Mapping 7 Distance field shadow …

MongoDB入门级别教程全(Windows版,保姆级教程)

下载mongodb 进入官网&#xff1a; Download MongoDB Community Server | MongoDB 选择msi&#xff0c;Windows版本 下载完后直接双击&#xff1a; 选择complete 这里建议改地方&#xff1a; 我这里直接改成d盘&#xff1a;work目录下面&#xff1a; 点击next&#xff1a; 因…

C 语言实现 UDP

广播 发送广播信息&#xff0c;局域网中的客户端都可以接受该信息 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h>int main() {// 1.创建一个通信的socketint fd socket(PF_INET, …

MySQL 社区开源备份工具 Xtrabackup 详解

文章目录 前言1. Xtrabackup 介绍1.1 物理备份与逻辑备份区别1.2 Xtrabackup 系列版本 2. Xtrabackup 部署2.1 下载安装包2.2 二进制部署2.3 程序文件介绍2.4 备份需要的权限 3. Xtrabackup 使用场景3.1 本地全量备份3.2 本地压缩备份3.3 全量流式备份3.3.1 备份到远程主机3.3.…

技术架构 - 应用数据分离,应用服务集群架构

前言 上一篇文章介绍了单机架构&#xff0c;由于性能瓶颈&#xff0c;满足不了高访问量&#xff0c;所以演化出了数据分离架构。 这种架构也很简单只是将应用服务和数据库服务分离开来&#xff0c;避免单一架构的资源争夺的情况。 一、 应用数据分离架构 1. 简介 应用服务和…