基于Kafka的日志采集

news2024/12/22 14:33:59

目录

前言

架构图

资源列表

基础环境

关闭防护墙

关闭内核安全机制

修改主机名

添加hosts映射

一、部署elasticsearch

修改limit限制

部署elasticsearch

修改配置文件

启动

二、部署filebeat

部署filebeat

添加配置文件

启动

三、部署kibana

部署kibana

修改配置文件

启动

四、部署Kafka

安装java

安装kafka

配置环境变量

创建数据存储目录和日志存储目录

修改zk配置文件

修改Kafka配置文件

启动zk

启动Kafka

测试

五、部署logstash

部署logstash

添加配置文件

启动


前言

        当日志量变得非常大时,传统的日志收集平台可能会遇到性能瓶颈、单点故障或扩展性问题。在这种情况下,引入消息队列(如Kafka)可以显著增强日志收集系统的健壮性、可扩展性和实时性。

以下是当在日志收集平台中加入Kafka时,可以带来的优势和改进:

  1. 缓冲和异步处理
    Kafka作为一个消息队列,可以充当Filebeat(或其他日志收集器)和Logstash(或其他日志处理组件)之间的缓冲层。Filebeat可以将日志数据异步地发送到Kafka,而不需要等待Logstash的即时响应。这样,即使Logstash暂时无法处理数据,Kafka也可以暂时存储数据,直到Logstash恢复处理能力。

  2. 水平扩展
    随着日志量的增长,Kafka可以通过添加更多的节点(brokers)来实现水平扩展。这种扩展方式使得Kafka能够处理更多的并发写入和读取操作,而不会遇到单点故障或性能瓶颈。此外,Kafka的分布式架构还允许数据在多个节点之间进行复制,以提高数据的可靠性和容错性。

  3. 实时数据处理
    Kafka支持实时数据流处理,使得日志数据可以立即被消费和处理。这意味着一旦日志数据被写入Kafka,就可以立即被Logstash(或其他流处理工具)读取和处理,以满足实时分析、监控和告警的需求。

  4. 数据持久化
    Kafka将数据持久化到磁盘上,以确保即使在系统崩溃或重启的情况下,数据也不会丢失。这种持久化机制使得Kafka成为了一个可靠的数据传输和存储平台,特别适用于对日志数据进行长期存储和分析的场景。

  5. 多消费者支持
    Kafka允许多个消费者(如Logstash、其他数据分析工具或应用)从同一个主题(topic)中消费数据。这意味着您可以同时运行多个消费者来处理和分析日志数据,以满足不同的业务需求和数据使用场景。

  6. 可定制性和灵活性
    Kafka提供了丰富的API和工具,使得您可以轻松地定制和扩展日志收集系统。例如,您可以编写自定义的Kafka生产者来收集特定格式的日志数据,或者编写自定义的Kafka消费者来处理和分析日志数据。

  7. 与其他系统的集成
    Kafka是一个广泛使用的消息队列系统,它支持与其他各种系统和工具进行集成。这意味着您可以将Kafka轻松地集成到现有的日志收集、处理、存储和分析系统中,以构建一个更加健壮、可扩展和灵活的日志收集平台。

        综上所述,当日志量变得非常大时,在日志收集平台中加入Kafka可以显著提高系统的性能、可靠性和可扩展性。通过利用Kafka的缓冲、异步处理、水平扩展、实时数据处理、数据持久化、多消费者支持、可定制性和与其他系统的集成能力,您可以构建一个更加健壮、高效和灵活的日志收集系统。

        有需要本次实验软件包的评论区可以找我要,无偿提供。

架构图

资源列表

操作系统配置主机名IP
CentOS7.3.16112C4Ges01192.168.207.131
CentOS7.3.16112C4Gkibana192.168.207.165
CentOS7.3.16112C4Gfilebeat192.168.207.166
CentOS7.3.16112C4Gkafka192.168.207.167
CentOS7.3.16112C4Glogstash192.168.207.168

基础环境

关闭防护墙

systemctl stop firewalld
systemctl disable firewalld

关闭内核安全机制

sed -i "s/.*SELINUX=.*/SELINUX=disabled/g" /etc/selinux/config
reboot

修改主机名

hostnamectl set-hostname es01
hostnamectl set-hostname kibana
hostnamectl set-hostname filebeat
hostnamectl set-hostname kafka
hostnamectl set-hostname logstash

添加hosts映射

cat >> /etc/hosts << EOF
192.168.207.131 es01
192.168.207.165 kibana
192.168.207.166 filebeat
192.168.207.167 kafka
192.168.207.168 logstash
EOF

一、部署elasticsearch

修改limit限制

cat > /etc/security/limits.d/es.conf << EOF
* soft nproc 655360
* hard nproc 655360
* soft nofile 655360
* hard nofile 655360
EOF
​
cat >> /etc/sysctl.conf << EOF
vm.max_map_count=655360
EOF
sysctl -p

部署elasticsearch

mkdir -p /data/elasticsearch
tar zxvf elasticsearch-7.14.0-linux-x86_64.tar.gz -C /data/elasticsearch

修改配置文件

mkdir /data/elasticsearch/{data,logs}

[root@es01 elasticsearch-7.14.0]# grep -v "^#" /data/elasticsearch/elasticsearch-7.14.0/config/elasticsearch.yml
cluster.name: my-application
node.name: es01
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
bootstrap.memory_lock: false
network.host: 0.0.0.0
http.port: 9200
cluster.initial_master_nodes: ["es01"]

启动

useradd es 
chown -R es:es /data/
su - es
/data/elasticsearch/elasticsearch-7.14.0/bin/elasticsearch -d

二、部署filebeat

部署filebeat

mkdir -p /data/filebeat
tar zxvf filebeat-7.14.0-linux-x86_64.tar.gz -C /data/filebeat/

添加配置文件

这里提供了两份filebeat配置文件的参考

[root@filebeat filebeat-7.14.0-linux-x86_64]# cat filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages         ###要监控的日志文件
setup.template.settings:
  index.number_of_shards: 3
output.kafka:
  #version:0.10.2             ### 根据不同 CKafka 实例开源版本配置
  hosts: ["192.168.207.167:9092"]  ###接入方式所用的IP和端口
  topic: 'topic_test1'       ###topic实例名
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: none
  max_message_bytes: 10000000

[root@filebeat filebeat-7.14.0-linux-x86_64]# cat filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/httpd/access_log         ###要监控的日志文件
  fields:
    kafka_topic: httpd_access
- type: log
  enabled: true
  paths:
    - /var/log/httpd/error_log         ###要监控的日志文件
  fields:
    kafka_topic: httpd_error
setup.template.settings:
  index.number_of_shards: 3
output.kafka:
  #version:0.10.2             ### 根据不同 CKafka 实例开源版本配置
  hosts: ["192.168.207.167:9092"]  ###接入方式所用的IP和端口
  topic: '%{[fields.kafka_topic]}'       ###topic实例名
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: none
  max_message_bytes: 10000000

启动

/data/filebeat/filebeat-7.14.0-linux-x86_64/filebeat -e -c filebeat.yml

三、部署kibana

部署kibana

mkdir -p /data/kibana
tar zxvf kibana-7.14.0-linux-x86_64.tar.gz -C /data/kibana/

修改配置文件

grep -v "^#" /data/kibana/kibana-7.14.0-linux-x86_64/config/kibana.yml  | grep -v "^$"
server.port: 5601
server.host: "0.0.0.0"
elasticsearch.hosts: ["http://192.168.207.131:9200"]
kibana.index: ".kibana"

启动

useradd kibana
chown -R kibana:kibana /data 
su - kibana
/data/kibana/kibana-7.14.0-linux-x86_64/bin/kibana

四、部署Kafka

安装java

# 安装java环境
yum -y install java-1.8.0-openjdk

安装kafka

tar zxvf kafka_2.12-3.0.0.tgz
mv kafka_2.12-3.0.0 /usr/local/kafka

配置环境变量

# 配置环境变量
cat > /etc/profile.d/zookeeper.sh << 'EOF'
export ZOOKEEPER_HOME=/usr/local/kafka
export PATH=$ZOOKEEPER_HOME/bin:$PATH
EOF

cat > /etc/profile.d/kafka.sh << 'EOF'
export KAFKA_HOME=/usr/local/kafka
export PATH=$KAFKA_HOME/bin:$PATH
EOF

source /etc/profile

创建数据存储目录和日志存储目录

mkdir -p /usr/local/kafka/zookeeper
mkdir -p /usr/local/kafka/log/zookeeper
mkdir -p /usr/local/kafka/log/kafka

# 创建zk需要的myid文件
echo 0 > /usr/local/kafka/zookeeper/myid

修改zk配置文件

# 注意Kafka安装目录下的config目录里
server.properties             #是Kafka的配置文件
zookeeper.properties          #是zookeeper的配置文件
cat >> /usr/local/kafka/config/zookeeper.properties << EOF
dataLogDir=/usr/local/kafka/log/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
server.0=192.168.207.167:2888:3888
EOF

sed -i "s/dataDir\=\/tmp\/zookeeper/dataDir\=\/usr\/local\/kafka\/zookeeper/g" /usr/local/kafka/config/zookeeper.properties

修改Kafka配置文件

# /usr/local/kafka/config/server.properties修改

listeners=PLAINTEXT://192.168.207.167:9092
advertised.listeners=PLAINTEXT://192.168.207.167:9092
log.dirs=/usr/local/kafka/log/kafka
delete.topic.enable=true
zookeeper.connect=192.168.207.167:2181

启动zk

/usr/local/kafka/bin/zookeeper-server-start.sh -daemon /usr/local/kafka/config/zookeeper.properties

启动Kafka

/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties

测试

# 创建一个topic
[root@kafka kafka]# bin/kafka-topics.sh --create --bootstrap-server 192.168.207.167:9092 --replication-factor 1   --partitions 1 --topic Hello-Kafka
Created topic Hello-Kafka.

# 往topic里面输入消息
[root@kafka kafka]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 192.168.207.167:9092 --topic Hello-Kafka

# 从topic里面消费消息
[root@kafka ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.207.167:9092 --topic Hello-Kafka --from-beginning

# 查看topic列表
[root@kafka kafka]# /usr/local/kafka/bin/kafka-topics.sh --bootstrap-server 192.168.207.167:9092 --list
Hello-Kafka

# 删除topic
[root@kafka kafka]# bin/kafka-topics.sh --delete --bootstrap-server 192.168.207.167:9092 --topic Hello-Kafka

五、部署logstash

部署logstash

mkdir -p /data/logstash
tar zxvf logstash-7.14.0-linux-x86_64.tar.gz -C /data/logstash/

添加配置文件

mkdir /data/logstash/logstash-7.14.0/conf.d

cat > /data/logstash/logstash-7.14.0/conf.d/system.conf << 'EOF'
input { 
  kafka{ 
    bootstrap_servers =>"192.168.207.167:9092" 
    topics =>"topic_test1" 
    type =>"topic_test1"
    codec =>"json" 
  } 
}
output { 
  if [type] == "topic_test1" {
  elasticsearch { 
    hosts => ["192.168.207.131:9200"] 
    index =>"kafka-system-%{+YYYY.MM.dd}" 
  } 
  }
}

EOF
cat > /data/logstash/logstash-7.14.0/conf.d/httpd.conf << 'EOF'
input { 
  kafka{ 
    bootstrap_servers =>"192.168.207.167:9092" 
    topics =>"httpd_access" 
    type =>"httpd_access"
    codec =>"json" 
  } 
  kafka{ 
    bootstrap_servers =>"192.168.207.167:9092" 
    topics =>"httpd_error" 
    type =>"httpd_error"
    codec =>"json" 
  }
}
output { 
  if [type] == "httpd_access" {
  elasticsearch { 
    hosts => ["192.168.207.131:9200"] 
    index =>"httpd-access-%{+YYYY.MM.dd}" 
  } 
  }
  if [type] == "httpd_error" {
  elasticsearch { 
    hosts => ["192.168.207.131:9200"] 
    index =>"httpd-error-%{+YYYY.MM.dd}" 
  } 
  }
}

EOF

启动

/data/logstash/logstash-7.14.0/bin/logstash -f /data/logstash/logstash-7.14.0/conf.d/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1690441.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vitis HLS 学习笔记--抽象并行编程模型-不良示例

目录 1. 简介 2. 基础 kernel 2.1 pass kernel 2.2 double_pass kernel 2.3 add_kernel 2.4 split kernel 3. 三种bypass 3.1 input_bypass 3.2 middle_bypass 3.3 output_bypass 4. 总结 1. 简介 本文展示三个在数据流水线中常见的问题&#xff1a; 输入参数绕过…

DAMA:数据治理 CDGA/CDGP 认证考试备考经验分享

一、关于DAMA中国和CDGA/CDGP考试 国际数据管理协会&#xff08;DAMA国际&#xff09;是一个全球性的专业组织&#xff0c;由数据管理和相关的专业人士组成&#xff0c;非营利性机构&#xff0c;厂商中立。协会自1980年成立以来&#xff0c;一直致力于数据管理和数字化的研究、…

计算机毕业设计hadoop+spark微博舆情大数据分析 微博爬虫可视化 微博数据分析 微博采集分析平台 机器学习(大屏+LSTM情感分析+爬虫)

电商数据建模 一、分析背景与目的 1.1 背景介绍 电商平台数据分析是最为典型的一个数据分析赛道&#xff0c;且电商数据分析有着比较成熟的数据分析模型&#xff0c;比如&#xff1a;人货场模型。此文中我将通过分析国内最大的电商平台——淘宝的用户行为&#xff0c;来巩固数…

WebRTC | 网络传输协议 RTP 和 RTCP

WebRTC | 网络传输协议 RTP 和 RTCP WebRTC | 网络传输协议 RTP 和 RTCP如何选择 TCP 与 UDPRTP概述工作机制报文结构RTP 的使用RTP 拓展头RTP 中的填充数据翻译器和混合器同步控制报文大小wireshark 抓取 RTP 报文 RTCP概述工作机制分组类型报文结构WebRTC 的反馈报文RTPFBPSF…

接口响应断言

目录 接口断言介绍接口断言方式介绍响应状态码断言 课程目标 掌握什么是接口断言。了解接口断言的多种方式。掌握如何对响应状态码完成断言。 思考 这两段代码是完整的接口自动化测试代码吗&#xff1f; …省略… when().get(“https://httpbin.ceshiren.com/get?namead&…

白鹭群优化算法,原理详解,MATLAB代码免费获取

白鹭群优化算法&#xff08;Egret Swarm Optimization Algorithm&#xff0c;ESOA&#xff09;是一种受自然启发的群智能优化算法。该算法从白鹭和白鹭的捕食行为出发&#xff0c;由三个主要部分组成:坐等策略、主动策略和判别条件。将ESOA算法与粒子群算法(PSO)、遗传算法(GA)…

5.24学习记录

[FSCTF 2023]ez_php2 比较简单的pop链 <?php highlight_file(__file__); Class Rd{public $ending;public $cl;public $poc;public function __destruct(){echo "All matters have concluded";die($this->ending);}public function __call($name, $arg){for…

揭秘Python的魔法:装饰器的超能力大揭秘 ‍♂️✨

文章目录 Python进阶之装饰器详解1. 引言装饰器的概念与意义装饰器在Python编程中的作用 2. 背景介绍2.1 函数作为对象2.2 高阶函数 3. 装饰器基础3.1 理解装饰器3.2 装饰器的工作原理 4. 带参数的装饰器4.1 为什么需要带参数4.2 实现带参数的装饰器使用函数包裹装饰器使用类实…

【ZYNQ】AXI-Quad-SPI SDK 开发记录 测试

前人工作 如前人工作&#xff0c;在Navigate to BSP Settings中找到历例程 file:///F:/Xilinx/Vitis/2019.2/data/embeddedsw/XilinxProcessorIPLib/drivers/spi_v4_5/doc/html/api/example.html使用XSpi_LowLevelExample例子&#xff0c;源代码的AI解析 int XSpi_LowLeve…

蓝桥杯Web开发【模拟题一】15届

1.动态的Tab栏 日常在使用移动端 APP 或访问 PC 端网站的时候&#xff0c;常常发现在一些有工具栏或者 Tab 栏的页面会有顶栏固定的效果。简单来说&#xff0c;在页面未开始滚动时顶栏处在其原有的位置上&#xff0c;当页面向下滚动一定区域后&#xff0c;顶栏会跟随滚动固定在…

python-数据分析与可视化基础

1、data1.csv中的B、C、D和E列数据分别是日期、权重、A企业的销售额、B企业的销售额。读取C、D、E列数据,并统计E列数据的算术平均数、加权平均值(权值为C列数据)、方差、中位数、最小值、最大值。并绘制E列数据的直方图。 &#xff08;1&#xff09;源代码&#xff1a; impo…

vue3的api风格

Vue的组件有两种不同的风格&#xff1a;组合式API 和 选项式API 选项式api 选项式API&#xff0c;可以用包含多个选项的对象来描述组件的逻辑&#xff0c;如&#xff1a;data&#xff0c;methods&#xff0c;mounted等。 组合式api setup&#xff1a;是一个标识&#xff0c;告…

ST-SLAS Technology 实验室自动化与筛查学会技术

文章目录 一、期刊简介二、征稿信息三、期刊表现四、投稿须知五、出版支持 一、期刊简介 SLAS Technology ——SLAS技术强调促进和改进生命科学研发的科学和技术进步;药物递送;诊断;生物医学和分子成像&#xff1b;以及个性化和精准医疗。这包括高通量和其他实验室自动化技术;…

Springboot项目打包:将依赖的jar包输出到指定目录

场景 公司要对springboot项目依赖的jar包进行升级&#xff0c;但是遇到一个问题&#xff0c;项目打包之后&#xff0c;没办法看到他里面依赖的jar包&#xff0c;版本到底是不是升上去了&#xff0c;没办法看到。 下面是项目打的jar包 我们通过反编译工具jdgui&#xff0c;来…

云计算架构最全方案详解

云计算架构最全详解(图文全面总结) 一、引言云计算已经成为现代企业和科技发展的重要支柱。本文将详细介绍云计算架构的组成部分及其工作原理&#xff0c;帮助读者深入理解这一复杂而强大的系统。二、云计算架构组成部分云计算架构主要包括以下几个关键组件&#xff1a;基础设施…

【软件工程】【23.10】p3

关键字&#xff1a; 软件工程定义及目的、需求规约定义及性质、模块的控制域及作用域、类和类图、调试特征、瀑布模型

LINUX系统编程:命名管道

匿名管道的通信只能在&#xff0c;有血缘关系的进程中&#xff0c;本质就是&#xff0c;子进程会拷贝一份父进程的文件描述符表&#xff0c;父子进程就可以看到操作系统的同一块资源&#xff08;文件&#xff09;&#xff0c;以这块资源为媒介进行通信。 命名管道&#xff0c;…

shell文本三剑客——awk命令【☆】

目录 一、akw原理 二、命令格式 三、常用变量 四、awk的用法 1.输出整行内容 2.按字段输出文本内容 3.按列输出文件内容 FS变量为列分隔符 4.awk的三个模式 5. awk ‘控制语句条件 {操作}’ 文件 6.awk的数组 7.awk的应用 一、akw原理 逐行读取文本&#xff0c;默认…

SpringFramework实战指南

二、SpringFramework实战指南 目录 一、技术体系结构 1.1 总体技术体系1.2 框架概念和理解 二、SpringFramework介绍 2.1 Spring 和 SpringFramework概念2.2 SpringFramework主要功能模块2.3 SpringFramework 主要优势 三、Spring IoC容器和核心概念 3.1 组件和组件管理概念3…

基于灰狼优化算法优化RBF(GWO-RBF)的数据回归预测(多输入多输出)

代码原理及流程 基于灰狼优化算法优化多输入多输出&#xff08;MIMO&#xff09;的RBF神经网络的数据回归预测&#xff0c;可以采取以下步骤&#xff1a; 1. 数据准备&#xff1a;准备包含多个输入特征和多个输出目标的数据集&#xff0c;确保数据已经经过预处理和归一化。 …