Elk+Filebeat+Kafka实现日志收集

news2025/1/6 21:33:55

Elk+Filebeat+Kafka实现日志收集(本机nginx)

部署Zookeeper

1.实验组件

#准备3台服务器做Zookeeper集群
20.0.0.10
20.0.0.20
20.0.0.30

2.安装前准备

#关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
setenforce 0

#安装JDK
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version

#将apache-zookeeper-3.5.7-bin.tar.gz压缩包上传至/opt目录

3.安装Zookeeper

#三台服务器一齐操作
cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /opt/zookeeper

#修改配置文件
cd /opt/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
--2--
tickTime=2000
#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒

--5--
initLimit=10
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为10*2s

--8--
syncLimit=5
#Leader和Follower之间同步通信的超时时间,这里表示如果超过5*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer

--12--修改
dataDir=/opt/zookeeper/data
#指定保存Zookeeper中的数据的目录,目录需要单独创建

--添加--
dataLogDir=/opt/zookeeper/logs
#指定存放日志的目录,目录需要单独创建

--15--
clientPort=2181
#客户端连接端口

--末尾添加集群信息--
server.1=20.0.0.10:3188:3288
server.2=20.0.0.20:3188:3288
server.3=20.0.0.30:3188:3288

#在每个节点上创建数据目录和日志目录
mkdir /opt/zookeeper/data
mkdir /opt/zookeeper/logs

#在每个节点的dataDir指定的目录下创建一个 myid 的文件,不同节点分配1、2、3
echo 1 > /opt/zookeeper/data/myid
echo 2 > /opt/zookeeper/data/myid
echo 3 > /opt/zookeeper/data/myid

#配置 Zookeeper 启动脚本
vim /etc/init.d/zookeeper

#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/opt/zookeeper'
case $1 in
start)
	echo "---------- zookeeper 启动 ------------"
	$ZK_HOME/bin/zkServer.sh start
;;
stop)
	echo "---------- zookeeper 停止 ------------"
	$ZK_HOME/bin/zkServer.sh stop
;;
restart)
	echo "---------- zookeeper 重启 ------------"
	$ZK_HOME/bin/zkServer.sh restart
;;
status)
	echo "---------- zookeeper 状态 ------------"
	$ZK_HOME/bin/zkServer.sh status
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

#设置开机自启
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper

#分别启动 Zookeeper
service zookeeper start

#查看当前状态
service zookeeper status

部署Kafka(3.4.1版本)

1.安装Kafka

cd /opt
--上传kafka_2.13-3.4.1.tgz--
tar -xf kafka_2.13-3.4.1.tgz
mv kafka_2.13-3.4.1 kafka
cd kafka/config/
cp server.properties server.properties.bak
vim server.properties
--24--
broker.id=1
#broker的全局唯一编号,每个broker不能重复,因此要在其他机器上配置 broker.id=2、broker.id=3

--34--
listeners=PLAINTEXT://20.0.0.10:9092
#每台服务器分别为10、20、30,不用加地址映射

--62--
log.dirs=/var/log/kafka
#kafka运行日志存放的路径,也是数据存放的路径

--125--
zookeeper.connect=20.0.0.10:2181,20.0.0.20:2181,20.0.0.30:2181
#配置连接Zookeeper集群地址

#修改全局配置
vim /etc/profile
--添加--
export KAFKA_HOME=/opt/kafka
export PATH=$PATH:$KAFKA_HOME/bin

source /etc/profile
#配置Zookeeper启动脚本
vim /etc/init.d/kafka

#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka Service Control Script
KAFKA_HOME='/opt/kafka'
case $1 in
start)
	echo "---------- Kafka 启动 ------------"
	${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
	echo "---------- Kafka 停止 ------------"
	${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
	$0 stop
	$0 start
;;
status)
	echo "---------- Kafka 状态 ------------"
	count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
	if [ "$count" -eq 0 ];then
        echo "kafka is not running"
    else
        echo "kafka is running"
    fi
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

#设置开机自启
chmod +x /etc/init.d/kafka
chkconfig --add kafka

#分别启动Kafka
service kafka start

2.命令行测试

#创建topic
kafka-topics.sh --create --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --replication-factor 2 --partitions 3 --topic test1

#查看当前服务器中的所有 topic
kafka-topics.sh --list --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092

#发布消息
kafka-console-producer.sh --broker-list 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092  --topic test1

#消费消息
kafka-console-consumer.sh --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1 --from-beginning

#修改分区数
kafka-topics.sh --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --alter --topic test1 --partitions 6

#删除 topic
kafka-topics.sh --delete --bootstrap-server 20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092 --topic test1

部署Filebeat 

 1.安装Filebeat

#10
cd /opt/
--上传filebeat-6.7.2-linux-x86_64.tar.gz--
tar -xf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64 filebeat
vim /etc/logstash/logstash.yml
--64--
path.config: /opt/log

systemctl restart logstash

2.时间同步

#所有节点
yum -y install ntpdate
ntpdate ntp.aliyun.com 
date

3.配置filebeat

#给nginx日志文件赋权
cd /var/log/nginx/
chmod 777 access.log error.log

#配置filebeat
cd /usr/local/filebeat
vim filebeat.yml
filebeat.inputs:

- type: log
  enabled: true	
  paths:
    - /var/log/nginx/access.log
    - /var/log/nginx/error.log
  tags: ["nginx"]
  fields:
    service_name: 20.0.0.10_nginx
    log_type: nginx
    from: 20.0.0.10

output.kafka:
  enabled: true
  hosts: ["20.0.0.10:9092","20.0.0.20:9092","20.0.0.30:9092"]
  topic: "nginx"


--------------Elasticsearch output-------------------
(全部注释掉)

----------------Logstash output---------------------
(全部注释掉)

nohup ./filebeat -e -c filebeat.yml > filebeat.out &
#启动filebeat

 

4.配置logstash

cd /opt/log/
vim kafka.conf

input {
    kafka {
        bootstrap_servers => "20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092"
        topics  => "nginx"
        type => "nginx_kafka"
        codec => "json"
                auto_offset_reset => "earliest"
                decorate_events => true
    }
}

output {
  if "nginx" in [tags] {
    elasticsearch {
      hosts => ["20.0.0.20:9200","20.0.0.30:9200"]
      index => "nginx_access-%{+YYYY.MM.dd}"
    }
  }

  stdout { codec => rubydebug }
}

logstash -f kafka.conf --path.data /opt/test1

日志收集(远程Apache+Mysql)

部署Filebeat

1.安装配置filebeat

#收集81服务器上的mysql和apache日志
cd /opt/
--上传filebeat-6.7.2-linux-x86_64.tar.gz--
tar -xf filebeat-6.7.2-linux-x86_64.tar.gz
mv filebeat-6.7.2-linux-x86_64 filebeat
cd filebeat/
vim filebeat.yml
 
filebeat.inputs:

- type: log
  enabled: true
  paths:
    - /etc/httpd/logs/access_log
    - /etc/httpd/logs/error_log
  tags: ["httpd_81"]
  fields:
    service_name: 20.0.0.81_httpd
    log_type: httpd
    from: 20.0.0.81
 
- type: log
  enabled: true
  paths:
    - /usr/local/mysql/data/mysql_general.log
  tags: ["mysql_81"]
  fields:
    service_name: 20.0.0.81_mysql
    log_type: mysql
    from: 20.0.0.81

output.kafka:
  enabled: true
  hosts: ["20.0.0.10:9092","20.0.0.20:9092","20.0.0.30:9092"]
  topic: "httpdmysql"
 
--------------Elasticsearch output-------------------
(全部注释掉)

----------------Logstash output---------------------
(全部注释掉)

nohup ./filebeat -e -c filebeat.yml > filebeat.out &
#启动filebeat

2.配置logstash

10:
cd /opt/log/
vim 81_a+m.conf 
 
input {
    kafka {
        bootstrap_servers => "20.0.0.10:9092,20.0.0.20:9092,20.0.0.30:9092"
        topics  => "httpdmysql"
        type => "httpd+mysql_kafka"
        codec => "json"
                auto_offset_reset => "earliest"
                decorate_events => true
    }
}

output {
  if "httpd_81" in [tags] {
    elasticsearch {
      hosts => ["20.0.0.20:9200","20.0.0.30:9200"]
      index => "nginx_access-%{+YYYY.MM.dd}"
    }
  }


  if "mysql_81" in [tags] {
    elasticsearch {
      hosts => ["20.0.0.20:9200","20.0.0.30:9200"]
      index => "nginx_access-%{+YYYY.MM.dd}"
    }
  }

  stdout { codec => rubydebug }
}

logstash -f 81_a+m.conf --path.data /opt/test2

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1271369.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

井盖位移报警器安装,智能化井盖厂家推荐

当井盖发生位移或倾斜时,通常会引起所处道路的安全隐患,给过往的车辆和行人带来许多潜在的危险。为了避免潜在的安全事故频繁出现,及时发现并处理井盖位移或倾斜才能更好的保障人民的安全。因此安装井盖位移报警器是满足政府和市民需求的。 单…

抖音本地生活服务商申请条件

抖音的本地生活服务商目前有两种,一种是可以做全国的服务商,我们一般叫抖音本地生活服务商,一种是区域优待服务商,也就是后面出来的服务商,这两种服务商的申请方式大同小异。 相同的地方就是都需要给平台交保证金。抖…

用java制作飞翔的小鸟

第一步是创建项目 项目名自拟 第二步创建个包名 来规范class 再创建一个包 来存储照片 如下: package game; import java.awt.*; import javax.swing.*; import javax.imageio.ImageIO;public class Bird {Image image;int x,y;int width,height;int size;doubl…

计算机体系结构----指令系统(二)

本文仅供学习,不作任何商业用途,严禁转载。绝大部分资料来自----计算机系统结构教程(第二版)张晨曦等 计算机体系结构----指令系统(二) 2.1 指令系统结构的分类2.2 寻址方式2.3 MIPS 指令系统结构2.3.1 MIPS的寄存器2.3.2 MIPS的…

虚拟化CentOS7分区大小调整+磁盘扩容=新分区

一、适应场景 1、虚拟化环境ESXI6.7下的虚拟服务器,使用一段时间后,把空闲的存储空间腾出来,给新的分区使用。 2、Linux的版本为CentOS7 3、本例为部署minio存储业务做准备 4、虚拟化存储扩容 二、配置过程 调整分区大小,为min…

Go语言 值传递

官方说法,Go中只有值传递,没有引用传递 而Go语言中的一些让你觉得它是引用传递的原因,是因为Go语言有值类型和引用类型,但是它们都是值传递。 值类型 有int、float、bool、string、array、sturct等 引用类型有slice&#xff0c…

免费SSL证书有效果吗?

首先,我们要明确一点:无论是付费还是免费的SSL证书,它们都能实现基本的HTTPS加密功能,确保数据在客户端和服务器之间的传输过程中不会被窃取或篡改。从这个角度来看,免费SSL证书的确可以提供一定的安全保障。 然而&…

每天一点python——day81

#每天一点Python——81 #递归函数: 递归函数: 一个函数在该函数体内调用了该函数本身,这个函数称为递归函数 【释:我自己调用自己的函数】 递归函数的组成部分: 递归调用与递归终止条件。 【一定有一个跳出循环的终止条…

【MySQL】事务(事务四大特性+四种隔离级别+MVCC)

事务 前言正式开始事务的四大特性为什么会出现事务事务的版本支持事务提交方式事务常见操作方式启动事务回滚演示提交事务事务的异常autocommit 事务的隔离性隔离级别查看隔离级别修改隔离级别验证四种隔离级别读未提交(read uncommitted) —— 缩写为RU读提交(read committed)…

web:catcat-new(文件包含漏洞、flask_session伪造)

前提知识 /etc/passwd 该文件储存了该Linux系统中所有用户的一些基本信息,只有root权限才可以修改。其具体格式为 用户名:口令:用户标识号:组标识号:注释性描述:主目录:登录Shell(以冒号作为分隔符) /proc/self proc是一个伪文件系统…

C++和Python混合编程在数据采集程序中的应用

目录 一、引言 二、C和Python的特性及其在数据采集程序中的应用 1、C的特性及其在数据采集程序中的应用 2、Python的特性及其在数据采集程序中的应用 三、C和Python混合编程在数据采集程序中的实现方法 四、混合编程的优缺点以及未来发展趋势 五、代码示例 六、结论 一…

创业者如何在居家办公和固定办公场地办公中权衡利弊,选择合适的

创业者如何选择办公方式,可能要根据自己的创业项目、团队规模、资金情况、行业特点等多方面因素来考虑。一般来说,居家办公和固定办公场地各有优缺点,没有绝对的好坏,只有适合不适合。我为大家总结了一些相关的信息,希…

Spring-AOP与声明式事务

为什么要用AOP ①现有代码缺陷 针对带日志功能的实现类,我们发现有如下缺陷: 对核心业务功能有干扰,导致程序员在开发核心业务功能时分散了精力 附加功能分散在各个业务功能方法中,不利于统一维护 ②解决思路 解决这两个问题&…

原来字节跳动这么容易就能进····

“字节是大企业,是不是很难进去啊?” “在字节做软件测试,能得到很好的发展吗? 一进去就有12.5K,其实也没有想的那么难” 直到现在,心情都还是无比激动! 本人211非科班,之前在字节…

SpringBoot查询指定范围内的坐标点

使用Redis geo实现 redis geo是基于Sorted Set来实现的 Redis 3.2 版本新增了geo相关命令,用于存储和操作地理位置信息。提供的命令包括添加、计算位置之间距离、根据中心点坐标和距离范围来查询地理位置集合等,说明如下: geoadd:添加地理…

海光信息荣获ESG金牛科技引领奖!

11月24日,由中国证券报、中国国新控股有限责任公司、南通市人民政府联合主办的”2023金牛企业可持续发展论坛暨第一届国新杯ESG金牛奖颁奖典礼”在江苏南通举行,“国新杯ESG金牛奖”多项获奖名单同期揭晓。海光信息凭借卓越的技术创新实力与行业影响力&a…

Facebook的这份开源协议使React四面楚歌

如果你觉得一些科技公司看起来很美好,每天都在“改变世界”……你应该看看他们的用户条款和法律文书,藏污纳垢之严重令人震惊。 最近,百度和阿里巴巴内部的软件工程团队不约而同做了一件事——弃用 React。 解释下: React 是一个…

可爱IP挖掘方法论,打造内容界的“显眼包”

近些年,人们十分乐意为“可爱”买单。玲娜贝儿、loopy、卡皮巴拉……社交平台的萌系表情包大行其道,皆源于可爱事物抚慰心灵的奇效。小红书话题「大人也要玩玩具」近90天互动量突破13亿,一跃成为年轻人的热门趋势。 本期千瓜将从行业案例出发…

AI Agents 闭门研讨会报名丨CAMEL、AutoAgents、Humanoid agents作者参与

青源Workshop丨No.27 AI Agents主题闭门研讨会 所谓AI智能体(AI Agents),是一种能够感知环境、进行决策和执行动作的智能实体。它们拥有自主性和自适应性,可以依靠AI赋予的能力完成特定任务,并在此过程中不断对自我进行…

Apache Doris 整合 FLINK CDC 、Paimon 构建实时湖仓一体的联邦查询入门

1.概览 多源数据目录(Multi-Catalog)功能,旨在能够更方便对接外部数据目录,以增强Doris的数据湖分析和联邦数据查询能力。 在之前的 Doris 版本中,用户数据只有两个层级:Database 和 Table。当我们需要连…