【星海随笔】云解决方案学习日志篇(二) kafka、Zookeeper、Fielbeat

news2025/1/11 5:52:32

Elastic 中国社区官方博客
https://blog.csdn.net/ubuntutouch/category_9209092.html

Kafka

kafka的源代码是基于Scala语言编写的,运行在Java虚拟机(即:JVM)上。因此,在安装kafka之前需要先安装JDK

Kafka 为什么依赖 Zookeeper

  • 1.协调分布式系统:Kafka是一个分布式系统,各个节点之间需要进行协调和同步,而Zookeeper正是为分布式系统提供协调和同步的服务的。
  • 2.元数据管理:Kafka的元数据包括了集群的配置、broker的状态等信息,而这些信息需要被所有的Kafka节点共享和维护。Zookeeper提供了一个分布式的文件系统,可以方便地存储和管理这些元数据信息。
  • 3.领导选举:Kafka的一个分区只会分配给一个broker进行读写,而这个broker就是该分区的leader。当leader宕机后,需要从剩余的broker中选举一个新的leader。而Zookeeper可以提供分布式锁和选举的功能,因此Kafka可以利用Zookeeper来实现leader选举。

综上所述,Kafka依赖Zookeeper主要是为了协调分布式系统、元数据管理和领导选举。


ZK安装
来源于apache

1.下载
下载地址:https://zookeeper.apache.org/releases.html

2.解压安装包

 tar -zxf apache-zookeeper-3.7.1-bin.tar.gz -C /usr/local/
/usr/local/apache-zookeeper-3.7.1-bin/ /usr/local/zookeeper-3.7.1/

3.拷贝配置文件,

cp /usr/local/zookeeper-3.7.1/conf/zoo_sample.cfg /usr/local/zookeeper-3.7.1/conf/zoo.cfg

4.修改配置文件

#在配置文件中加一行监听本机 IP 即可
clientPortAddress=10.0.5.163

zookeeper默认会占用8080端口,如果你本机已有服务在使用8080,可以把下面参数添加到zoo.cfg 文件里,自定义端口
admin.serverPort=8001

5.启动zk

/usr/local/zookeeper-3.7.1/bin/zkServer.sh start

6.查看端口是否监听

netstat -lntp |grep 2181

如果服务未监听,请查看日志排查问题
more zookeeper-root-server-VM-5-163-centos.out


kafka 部署

1.下载

下载地址:https://kafka.apache.org/downloads

2.解压安装包

tar -zxf kafka_2.12-3.4.0.tgz -C /usr/local/

3.修改kafka配置

vim /usr/local/kafka_2.12-3.4.0/config/server.properties 
#修改 zk 的IP
zookeeper.connect=10.0.5.163:2181
 
#修改监听地址
listeners=PLAINTEXT://10.0.5.163:9092

4.启动kafka

nohup /usr/local/kafka_2.12-3.4.0/bin/kafka-server-start.sh /usr/local/kafka_2.12-3.4.0/config/server.properties >/tmp/kafka.log 2>&1 &

5.查看端口是否监听

netstat -lntp |grep 9092

Flebeat部署

原理流程如下:
首先是input输入,可以指定多个数据输入源,然后通过通配符进行日志文件的匹配
匹配到日志后,就会使用Harvester(收割机),将日志源源不断的读取到来
然后收割机收割到的日志,就传递到Spooler(卷轴),然后卷轴就在将他们传到对应的地方

1.下载

wget https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.16.1-linux-x86_64.tar.gz

2.解压二进制包

tar zxf filebeat-7.16.1-linux-x86_64.tar.gz -C /usr/local/
mv /usr/local/filebeat-7.16.1-linux-x86_64/ /usr/local/filebeat-7.16.1

3.创建 Filebeat 配置文件

#备份模板文件
mv /usr/local/filebeat-7.16.1/filebeat.yml /usr/local/filebeat-7.16.1/filebeat.yml.bak
#创建配置文件
cat > /usr/local/filebeat-7.16.1/filebeat.yml << "EOF"
filebeat.inputs:
- type: log
  tail_files: true
  backoff: "1s"
  paths:
      - /var/log/nginx/access.json.log
  fields:
    type: access
  fields_under_root: true
- type: log
  tail_files: true
  backoff: "1s"
  paths:
      - /var/log/messages
  fields:
    type: messages
  fields_under_root: true
output:
  kafka:
    hosts: ["10.0.5.163:9092"]
    topic: hosts_10-0-5-163
EOF

4.启动Fielbeat

#查看是否已存在进程,将其停止
ps -ef |grep filebeat |grep -v grep |awk '{print $2}' |xargs kill -9
 
#启动Filebeat
nohup /usr/local/filebeat-7.16.1/filebeat  -e -c /usr/local/filebeat-7.16.1/filebeat.yml >/tmp/filebeat.log 2>&1 &
 
#查看进程
ps -ef |grep filebeat
 
#查看是否与ZK建立连接
netstat -ntp |egrep -w '9092|filebeat'

Fielbeat使用

启动

./filebeat -e -c shengxia.yml

yaml文件介绍

filebeat.inputs: # filebeat input输入
- type: stdin    # 标准输入
  enabled: true  # 启用标准输入
setup.template.settings: 
  index.number_of_shards: 3 # 指定下载数
output.console:  # 控制台输出
  pretty: true   # 启用美化功能
  enable: true

输送至ElasticSearch或者Logstash,在Kibana中实现可视化
然后我们在控制台输入hello,就能看到我们会有一个json的输出,是通过读取到我们控制台的内容后输出的,内容如下

{
  "@timestamp": "2023-05-31T22:57:58.700Z",
  "@metadata": {#元数据信息
    "beat": "filebeat",
    "type": "_doc",
    "version": "8.8.1"
  },
  "log": {
    "offset": 0,
    "file": {
      "path": ""
    }
  },
  "message": "hello",#元数据信息
  "input": {#控制台标准输入
    "type": "stdin"#元数据信息
  },
  "ecs": {
    "version": "8.0.0"
  },
  "host": {
    "name": "elk-node1"
  },
  "agent": {#版本以及主机信息
    "id": "5d5e4b99-8ee3-42f5-aae3-b0492d723730",
    "name": "elk-node1",
    "type": "filebeat",
    "version": "8.8.1",
    "ephemeral_id": "24b4fd16-5466-4d7e-b4b8-b73d41f77de0"
  }
}
参考文档:https://blog.csdn.net/qq_52589631/article/details/131216188

再次创建一个文件,叫 shengxia-log.yml,然后在文件里添加如下内容

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /opt/elk/logs/*.log
setup.template.settings:
  index.number_of_shards: 3
output.console:
  pretty: true
  enable: true

添加完成后,我们在到下面目录创建一个日志文件

# 创建文件夹
mkdir -p /opt/elk/logs

# 进入文件夹
cd /opt/elk/logs

# 追加内容
echo "hello world" >> test.log

然后再次启动filebeat

 ./filebeat -e -c shengxia-log.yml

能够发现,它已经成功加载到了我们的日志文件 test.log
同时我们还可以继续往文件中追加内容
追加后,我们再次查看filebeat,也能看到刚刚我们追加的内容
检测到日志文件有更新,立刻就会读取到更新的内容,并且输出到控制台。

自定义字段
   当我们的元数据没办法支撑我们的业务时,我们还可以自定义添加一些字段
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /opt/elk/logs/*.log
  tags: ["web", "test"]  #添加自定义tag,便于后续的处理
  fields:  #添加自定义字段
    from: web-test
  fields_under_root: true #true为添加到根节点,false为添加到子节点中
setup.template.settings:
  index.number_of_shards: 3
output.console:
  pretty: true
  enable: true

添加完成后,重启 filebeat

./filebeat -e -c shengxia-log.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /opt/elk/logs/*.log
  tags: ["web", "test"]
  fields:
    from: web-test
  fields_under_root: false 
setup.template.settings:
  index.number_of_shards: 1
output.elasticsearch:
  hosts: ["192.168.40.150:9200","192.168.40.137:9200","192.168.40.138:9200"]
Logstash 配置

1.修改Logstash 配置文件(下面 output 将日志打印到本地,观察日志是否采集到,日志格式是否正确)

cat > /usr/local/logstash-7.16.1/config/logstash.conf << "EOF"
input {
  kafka {
    bootstrap_servers => "10.0.5.163:9092"
    topics => ["hosts_10-0-5-163"]
    group_id => "test"
    codec => "json"
  }
}
 
filter {
  if [type] == "access" {
    json {
      source => "message"
      remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
    }
  }
}
 
output {
  stdout {
    codec=>rubydebug
  }
}
EOF

2.执行前台启动命令

#查看是否已存在进程,将其停止
ps -ef |grep logstash |grep -v grep |awk '{print $2}' |xargs kill -9
 
#启动 Logstash
logstash -f /usr/local/logstash-7.16.1/config/logstash.conf 

3.查看kafka Group 和队列信息

#进入kafka 安装目录
cd /usr/local/kafka_2.12-3.4.0/bin
#查看所有topic
./kafka-topics.sh  --bootstrap-server 10.0.5.163:9092 --lis
#查看Group
./kafka-consumer-groups.sh  --bootstrap-server 10.0.5.163:9092 --list
#查看队列
./kafka-consumer-groups.sh  --bootstrap-server 10.0.5.163:9092 --group test --describe

在这里插入图片描述
4.修改配置文件,将output 将日志写入elasticsearch

cat > /usr/local/logstash-7.16.1/config/logstash.conf << "EOF"
input {
  kafka {
    bootstrap_servers => "10.0.5.163:9092"
    topics => ["hosts_10-0-5-163"]
    group_id => "test"
    codec => "json"
  }
}
filter {
  if [type] == "access" {
    json {
      source => "message"
      remove_field => ["message","@version","path","beat","input","log","offset","prospector","source","tags"]
    }
  }
}
 
output{
  if [type] == "access" {
    elasticsearch {
      hosts => ["http://127.0.0.1:9200"]
      user => "elastic"
      password => "elk@2023"
      index => "access-%{+YYYY.MM.dd}"
    }
  }
  else if [type] == "messages" {
    elasticsearch {
      hosts => ["http://127.0.0.1:9200"]
      user => "elastic"
      password => "elk@2023"
      index => "messages-%{+YYYY.MM.dd}"
    }
  }
}
EOF

4.后台启动 Logstash

#查看是否已存在进程,将其停止
ps -ef |grep logstash |grep -v grep |awk '{print $2}' |xargs kill -9
 
#启动 Logstash
nohup logstash -f /usr/local/logstash-7.16.1/config/logstash.conf  >/tmp/logstash.log 2>&1 &

查看服务日志是否正常

查看日志是否有 ERROR 持续输出
tailf /tmp/logstash.log
 
#查看logstash 端口是否监听
netstat -lntp |grep 9600

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1828351.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

WebSocket 入门教程

什么是 WebSocket&#xff1f; WebSocket 是一种通信协议&#xff0c;它在单个 TCP 连接上提供全双工通信。与传统的 HTTP 不同&#xff0c;WebSocket 允许服务器主动向客户端推送数据&#xff0c;而不仅仅是客户端请求数据。这使得 WebSocket 非常适用于需要低延迟和实时通信…

【星座运势】本周财运分析,巨蟹座财富潜力大开!

大家好&#xff01;今天我们来谈谈巨蟹座本周的财富运势。经过调查和数据分析&#xff0c;我发现巨蟹座这周的财运潜力很大&#xff01;接下来&#xff0c;我将用通俗易懂的语言&#xff0c;通过代码说明&#xff0c;向大家展示巨蟹座的财富运势。 首先&#xff0c;我们需要通…

通俗范畴论2 有向图与准范畴

退一步海阔天空&#xff0c;在正式进入范畴论之前&#xff0c;我们可以重新审视一下我们是如何认识世界的&#xff0c;有了这个对人类认识世界过程的底层理解&#xff0c;可以帮助我们更好地理解范畴论。 对于人类认识世界&#xff0c;最神奇的一点就是这个世界居然是可以认识…

【SpringBoot】SpringBoot:实现文件上传和下载功能

文章目录 引言项目初始化添加依赖 配置文件存储位置实现文件上传功能创建文件上传控制器创建上传页面 实现文件下载功能创建文件下载控制器 安全性和最佳实践文件大小限制文件类型验证文件名和路径验证文件下载时的安全性 测试与部署示例&#xff1a;编写单元测试 部署结论 引言…

C++的异常捕获

目录 C语言的异常处理方式 C的异常处理方式 异常的抛出与捕获 抛出与捕获原则 自定义异常体系 异常安全 异常规范 异常的优缺点 优点 缺点 C语言的异常处理方式 1、终止程序 常见形式&#xff1a;assert 缺陷&#xff1a;太过强硬&#xff0c;如果发生内存错误&#…

文心一言 VS 讯飞星火 VS chatgpt (282)-- 算法导论20.4 3题

三、在 CONNECTED-COMPONENTS 作用于一个有 k 个连通分量的无向图 G(V&#xff0c;E) 的过程中&#xff0c;FIND-SET 需要调用多少次&#xff1f; UNION 需要调用多少次&#xff1f;用 |V| 、 |E| 和 k 来表示你的答案。如果要写代码&#xff0c;请用go语言。 文心一言&#x…

【Java】过滤器/拦截器

文章目录 两者区别request链路全过程 在实际开发中&#xff0c;过滤器和拦截器都是经常使用的技术&#xff0c;但一被提及到其区别时&#xff0c;整个人就愣住了&#xff0c;好像没有认真地对两者进行区别和总结&#xff0c;这两者之间也确实很容易混淆&#xff0c;因此结合了很…

C++ 54 之 继承中同名的静态成员处理

#include <iostream> using namespace std;// 父类 class Base07{ public:static int m_a; // 静态成员&#xff0c;类内定义static void fun(){cout << "Base中的fun"<< endl;}static void fun(int a){cout << "Base中的fun(int a)&qu…

53. QT插件开发--插件(动态库so)的调用与加载

1. 说明 在使用QT进行插件库的开发之后,还需要将这个插件库程序生成的so动态链接库加载到主程序框架中进行使用,才能达到主程序的模块化开发的效果。在前一篇文章插件创建中介绍了如何在QT中开发插件库,并提供外部接口调用。本篇博客的主要作用是模拟在主程序框架中加载动态…

EasyRecovery2024数据恢复神器#电脑必备良品

EasyRecovery数据恢复软件&#xff0c;让你的数据重见天日&#xff01; 大家好&#xff01;今天我要给大家种草一个非常实用的软件——EasyRecovery数据恢复软件&#xff01;你是不是也曾经遇到过不小心删除了重要的文件&#xff0c;或者电脑突然崩溃导致数据丢失的尴尬情况呢&…

element-ui input输入框和多行文字输入框字体不一样

页面中未作样式修改&#xff0c;但是在项目中使用element-ui input输入框和多行文字输入框字体不一样&#xff0c;如下图所示&#xff1a; 这是因为字体不一致引起的&#xff0c;如果想要为Element UI的输入框设置特定的字体&#xff0c;你可以在你的样式表中添加以下CSS代码…

快速UDP网络连接之QUIC协议介绍

文章目录 一、QUIC协议历史1.1 问题&#xff1a;QUIC为什么在应用层实现1.2 QUIC协议相关术语1.3 QUIC和TCP对比1.4 QUIC报文格式1.4.1 QUIC报文格式-Stream帧11.4.2 QUIC报文格式-Stream帧2 二、QUIC的特点2.1 连接建立低时延&#xff0c;2.2 多路复用流复用-HTTP1.1流复用-HT…

SpringBoot整合H2数据库并将其打包成jar包、转换成exe文件二(补充)

SpringBoot整合H2数据库并将其打包成jar包、转换成exe文件二&#xff08;补充&#xff09; 如果你想在cmd命令窗口内看到程序运行&#xff0c;即点开弹出运行窗口&#xff0c;关闭时exe自动关闭。 需要再launch4j上进行如下操作&#xff1a; 这样转换好的exe就可以有控制台了…

springboot + Vue前后端项目(第十六记)

项目实战第十六记 写在前面1 第一个bug1.1 完整的Role.vue 2 第二个bug2.1 修改路由router下面的index.js 总结写在最后 写在前面 发现bug&#xff0c;修复bug 1 第一个bug 分配菜单时未加入父id&#xff0c;导致分配菜单失效 <!-- :check-strictly"true" 默…

人工智能对零售业的影响

机器人、人工智能相关领域 news/events &#xff08;专栏目录&#xff09; 本文目录 一、人工智能如何改变零售格局二、利用人工智能实现购物体验自动化三、利用人工智能改善库存管理四、通过人工智能解决方案增强客户服务五、利用人工智能分析消费者行为六、利用 AI 打造个性化…

C++前期概念(重)

目录 命名空间 命名空间定义 1. 正常的命名空间定义 2. 命名空间可以嵌套 3.头文件中的合并 命名空间使用 命名空间的使用有三种方式&#xff1a; 1:加命名空间名称及作用域限定符&#xff08;::&#xff09; 2:用using将命名空间中某个成员引入 3:使用using namespa…

TCP协议报头详解

目录 前言 TCP特点 TCP报头 1.源端口和目的端口 2.序号 3.确认号 4.数据偏移 5.保留 6.控制位 ① 紧急URG&#xff08;URGent&#xff09; ② 确认ACK&#xff08;ACKnowledgment&#xff09; ③ 推送PSH&#xff08;PuSH&#xff09; ④复位RST&#xff08;ReSeT&…

【数据结构】初识集合深入剖析顺序表(Arraylist)

【数据结构】初识集合&深入剖析顺序表&#xff08;Arraylist&#xff09; 集合体系结构集合的遍历迭代器增强for遍历lambda表达式 List接口中的增删查改List的5种遍历ArrayList详解ArrayList的创建ArrayList的增删查改ArrayList的遍历ArrayList的底层原理 &#x1f680;所属…

UnityAPI学习之 播放游戏音频的类(AudioSource)

播放游戏音频的类&#xff08;AudioSource&#xff09; using System.Collections; using System.Collections.Generic; using UnityEngine;public class NO17AudioSource : MonoBehaviour {private AudioSource audioSource;//音频组件public AudioClip clip;//音频文件public…

预编译、函数变量提升

函数声明会覆盖变量的声明&#xff0c;也就是会提升到最前面。 形参传进来相当于变量声明&#xff0c;所以当有函数声明时&#xff0c;会被覆盖。