【Linux】环境部署kafka集群

news2025/1/11 8:41:51

目录

一、kafka简介

1. 主要特点

2.组件介绍

3.消息中间件的对比

二、环境准备

1.Java环境

2.Zookeeper环境

3.硬件环境集群

三、Zookeeper的集群部署

1.下载zookeeper

2.部署zookeeper集群

(1)node1节点服务器

(2)node2节点服务器和node3 节点服务器重复上述node1节点安装zookeeper步骤

(3)启动node1、node2、node3节点的zookeeper

​编辑 四、搭建kafka集群

1.下载kafka安装包

2.部署kafka集群

(1)node1节点部署

(2)node2节点服务器、node3 节点服务器重复上述node1节点安装kafka步骤

(3)启动kafka集群

(4)测试集群


一、kafka简介

Apache Kafka 是一个开源的分布式事件流平台,最初由LinkedIn开发,并于2011年贡献给了Apache软件基金会。Kafka被设计为一个高吞吐量、可扩展且容错的消息系统,它能够处理大量的实时数据流。Kafka广泛应用于构建实时数据管道和流应用。

1. 主要特点

(1)发布/订阅消息模型:Kafka支持传统的发布/订阅模式,允许多个消费者订阅并处理相同的消息流。
(2)持久化存储:消息在Kafka中被持久化到磁盘,并且可以根据配置保留一段时间(例如7天)。这使得Kafka可以作为可靠的存储系统来使用。
(3)高吞吐量:Kafka设计用于支持每秒百万级别的消息吞吐量,适用于大规模的数据处理场景。
(4)水平扩展性:可以通过增加更多的broker节点来轻松扩展Kafka集群,以处理更大的流量。
(5)多租户能力:支持多个生产者和消费者共享同一集群,同时保持隔离性和安全性。
(6)零拷贝原理:使用操作系统级别的零拷贝技术来优化网络传输效率,减少CPU使用率。
(7)分区机制:每个topic可以分成多个partition,这些partition可以分布在不同的broker上,从而实现负载均衡和故障转移。

(8)消费组:一组消费者可以组成一个消费者组,共同消费一个topic下的不同partition,确保每个partition只被组内的一个消费者消费。

2.组件介绍

        Broker:Kafka集群中的单个服务器,负责存储消息和处理客户端请求。

        Topic:特定类型的消息流,是消息的分类或主题。        

        Partition:每个topic可以分为多个partition,提高并行处理能力和吞吐量。

        Producer:向Kafka发送消息的应用程序。

        Consumer:从Kafka读取消息的应用程序。

        Consumer Group:一组消费者的集合,它们共同消费一个topic的所有消息。

        Offset:每个partition中的消息都有一个唯一的序列号,称为offset。

        Zookeeper:虽然不是Kafka的核心组件,但早期版本的Kafka依赖Zookeeper来管理集群元数据。新版本中已经内置了替代方案,减少了对Zookeeper的依赖。

3.消息中间件的对比

二、环境准备

1.Java环境

需要Java 8或更高版本(JDK)。Kafka是用Scala编写的,并且运行在JVM之上。

安装步骤参考:Linux环境下离线安装jdk1.8(内置最新的jdk安装包x64)_linux 离线安装jdk1.8-CSDN博客

2.Zookeeper环境

Kafka使用Zookeeper来管理集群元数据。

3.硬件环境集群

采用三台服务器部署kafka和zookeeper集群。

node1node2node3
ZookeeperZookeeperZookeeper
KafkaKafkaKafka

三、Zookeeper的集群部署

1.下载zookeeper

地址:Apache ZooKeeper

2.部署zookeeper集群

(1)node1节点服务器

下载zookeeper包之后上传服务器

# 1.解压 zookeeper
tar -zxvf apache-zookeeper-3.8.4-bin.tar.gz

# 2.移动到/usr/local/目录下方便管理
mv apache-zookeeper-3.8.4-bin /usr/local/zookeeper-3.8.4

# 3.切换到/usr/local/zookeeper-3.8.4/conf 配置目录下
cp zoo_sample.cfg zoo.cfg

# 4.编辑配置文件
vim zoo.cfg

zoo.cfg文件新增如下内容: 数据目录和日志目录需要提前创建

说明:server.A=B:C:D:其中 A 是一个数字,表示这个是第几号服务器;B 是这个服务器的 ip 地址;C 表示的是这个服务器与集群中的 Leader 服务器交换信息的端口;D 表示的是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

节点标记

myid配置:在/data/zookeeper/data设置myid, 这个myid的数字跟配置文件里面的server id对应

echo 1 > /data/zookeeper/data/myid

配置环境变量

# 打开环境变量的文件
vim /etc/profile

# 文件末尾添加如下内容 
export ZOOKEEPER_HOME=/usr/local/zookeeper-3.8.4
export PATH=$ZOOKEEPER_HOME/bin:$PATH

# 保存并退出 刷新环境变量
source /etc/profile

到此node1节点服务器zookeeper部署完成

(2)node2节点服务器和node3 节点服务器重复上述node1节点安装zookeeper步骤

注意:记得修改节点标记 myid 改为 2 和 3,若没改启动时会报错。

(3)启动node1、node2、node3节点的zookeeper
# 启动三台机器zookeeper
zkServer.sh start
 
# 查看集群状态
zkServer.sh status

# 停止命令
zkServer.sh stop

从下面的截图就能看出来 node1节点为主节点,其他两个是从节点。

 四、搭建kafka集群

1.下载kafka安装包

地址:Apache Kafka

2.部署kafka集群

(1)node1节点部署
# 1.将下载好的kafka包上传服务器
# 2.解压
tar -zxvf kafka_2.12-3.8.0.tgz

# 3.将解压后的目录移动到 /usr/local/下
mv kafka_2.12-3.8.0 /usr/local/kafka_2.12-3.8.0

# 4.切换到目录下 编辑kafka配置文件
cd /usr/local/kafka_2.12-3.8.0/config
vim server.properties

 主要修改如下所示几个配置信息

#--------------------------------配置文件-------------------------------------------
# broker的全局唯一编号,不能重复,多个节点需要修改 0、1、2
broker.id=0
# 监听
listeners=PLAINTEXT://:9092  #开启此项
# 日志目录
log.dirs=/data/kafka/kafka_logs      #修改日志目录(需提前创建)
# 配置zookeeper的连接(如果不是本机,需要该为ip或主机名)多个地址 用,隔开
zookeeper.connect=cong11:2181,cong12:2181,cong13:2181
#--------------------------------配置文件-------------------------------------------

 配置环境变量

# 打开环境变量的文件
vim /etc/profile

# 文件末尾添加如下内容 
export KAFKA_HOME=/usr/local/kafka_2.12-3.8.0
export PATH=$KAFKA_HOME/bin:$PATH

# 保存并退出 刷新环境变量
source /etc/profile
(2)node2节点服务器、node3 节点服务器重复上述node1节点安装kafka步骤

或者 你直接将节点1的kafka目录分别复制到节点2和节点3上,修改内容如下

node2节点:

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://ip:9092

node3节点

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=2
#broker对外暴露的IP和端口 (每个节点单独配置)
advertised.listeners=PLAINTEXT://ip:9092

在添加环境变量 刷新环境变量。

(3)启动kafka集群
# 后台启动服务(3台都启动)
kafka-server-start.sh -daemon /usr/local/kafka_2.12-3.8.0/config/server.properties 
 

可以通过 jps 或者 ps -ef|grep kafka 查看进程

(4)测试集群

1)创建一个测试主题(node1)

kafka-topics.sh --create --topic test --partitions 3 --replication-factor 3 --bootstrap-server ip1:9092
  • --replication-factor:指定副本数量
  • --partitions:指定分区数量
  • --topic:主题名称

注意:在node1节点上创建之后,会同步到node2、node3节点主题信息。说明你集群搭建成功。

2)查看刚刚创建的主题信息

kafka-topics.sh --describe --topic test --bootstrap-server IP1:9092

说明

  • Partition: 0:分区编号为0。
  • Leader: 0:当前分区的leader broker是broker 0。
  • Replicas: 0,2,1:该分区的所有副本分布在broker 0、broker 2和broker 1上。
  • Isr: 0,2,1:与leader保持同步的副本列表(In-Sync Replicas),即broker 0、broker 2和broker 1。
  • Elr: N/A:选举中的leader(Elected Leader Replica),这里没有显示。
  • LastKnownElr: N/A:上次已知的选举中的leader,这里也没有显示。

 3)查看所有的topic命令和查看指定的topic命令

# 列出指定的topic
kafka-topics.sh --bootstrap-server ip:9092 --list --topic test
 
# 列出所有的topic
kafka-topics.sh --bootstrap-server ip:9092 --list 

4)在node1上创建生产者(producer)

kafka-console-producer.sh --broker-list ip:9092 --topic test

随便写一些测试消息

5) 在node2上测试消费消息

kafka-console-consumer.sh --bootstrap-server IP1:9092,IP2:9092,IP3:9092 --topic test --from-beginning
  • --bootstrap-server IP1:9092,IP2:9092,IP3:9092

    指定Kafka集群的多个broker地址。这里列出了三个broker:JClouds:9092, Book:9092, 和 Client01:9092。这些broker地址用于初始化连接。Kafka客户端会使用这些地址来发现整个集群的其他broker。
  • --from-beginning

    从主题的最早可用偏移量(offset)开始消费消息。如果没有这个选项,消费者将从最新的偏移量开始消费。

6)删除topic

kafka-topics.sh --delete --bootstrap-server IP1:9092 --topic test

到此kafka验证完毕,有兴趣可试试kafka其他相关命令

参考:Kafka 集群部署_kafka集群安装部署-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2150811.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

排序----希尔排序

void ShellSort(int* a, int n) {int gap n;while (gap > 1){// 1保证最后一个gap一定是1// gap > 1时是预排序// gap 1时是插入排序gap gap / 3 1;for (size_t i 0; i < n - gap; i){int end i;int tmp a[end gap];while (end > 0){if (tmp < a[end]){…

6.C_数据结构_查询_哈希表

概述 哈希表的查询是通过计算的方式获取数据的地址&#xff0c;而不是依次比较。在哈希表中&#xff0c;有一个键值key&#xff0c;通过一些函数转换为哈希表的索引值。 其中&#xff1a;这个函数被称为哈希函数、散列函数、杂凑函数&#xff0c;记为&#xff1a;H(key) 哈希…

MySQL 中的锁定粒度:理解与应用

在 MySQL 数据库的使用中&#xff0c;锁定粒度是一个至关重要的概念。它决定了数据库在并发控制中锁定的范围和程度&#xff0c;对数据库的性能和并发能力有着深远的影响。今天&#xff0c;我们就来深入了解一下 MySQL 中的锁定粒度是什么意思&#xff0c;并通过实际案例来更好…

鸿蒙开发之ArkUI 界面篇 十五 交叉轴对其方式

鸿蒙界面有两个容器一个是Colum、一个是Row&#xff0c;Colum主轴是垂直方向&#xff0c;交叉轴是水平方向&#xff0c;Row的主轴是水平方向&#xff0c;交叉轴是垂直方向&#xff0c;对应方向调整子控件的话&#xff0c;justifyContent调整的是主轴方向的子控件距离&#xff0…

论文阅读-《Attention is All You Need》

注意力就是一切 【要点】&#xff1a;论文提出了一种全新的网络架构——Transformer&#xff0c;完全基于注意力机制&#xff0c;无需使用循环和卷积&#xff0c;实现了在机器翻译任务上的性能提升和训练效率的显著提高。 【方法】&#xff1a;通过构建一个仅使用注意力机制的…

【高分系列卫星简介】

高分系列卫星是中国国家高分辨率对地观测系统&#xff08;简称“高分工程”&#xff09;的重要组成部分&#xff0c;旨在提供全球范围内的高分辨率遥感数据&#xff0c;广泛应用于环境监测、灾害应急、城市规划、农业估产等多个领域。以下是对高分系列卫星及其数据、相关参数和…

Java流程控制语句——条件控制语句详解(附有流程图)#Java条件控制语句有哪些?#if-else、switch

在 Java 编程中&#xff0c;条件控制语句用于控制程序的执行路径&#xff0c;决定根据某些条件来选择执行某段代码或跳过某段代码。它们是 Java 编程的重要组成部分&#xff0c;帮助开发者根据不同的输入、状态或数据流来编写更加灵活和动态的代码。在本文中&#xff0c;我们将…

利用git将项目上传到github

采用git而不是在pycharm中共享的原因&#xff1a;可能会出现上图报错 目录 1、创建github仓库2、在 git bash 中初始化Git仓库&#xff0c;添加文件&#xff0c;上传代码 1、创建github仓库 2、在 git bash 中初始化Git仓库&#xff0c;添加文件&#xff0c;上传代码

【C++】STL----list常见用法

&#x1f525;个人主页&#x1f525;&#xff1a;孤寂大仙V &#x1f308;收录专栏&#x1f308;&#xff1a;C从小白到高手 &#x1f339;往期回顾&#x1f339;&#xff1a;[C]vector常见用法 &#x1f516; 流水不争&#xff0c;争的是滔滔不息。 文章目录 一、list的介绍li…

【C++】list容器的基本使用

一、list是什么 list的底层结构是带头双向循环链表。 相较于 vector 的连续线性空间&#xff0c;list 就显得复杂很多&#xff0c;它是由一个个结点构成&#xff0c;每个结点申请的空间并不是连续的&#xff0c;它的好处是每次插入或删除一个数据&#xff0c;就配置或释放一个…

WebServer:log

超时锁的编写 这个问题处于blockqueue.h文件中&#xff0c;内容如下&#xff1a; template<class T> bool BlockDeque<T>::pop(T& item, int timeout) {std::unique_lock<std::mutex> locker(mtx_);while(deq_.empty()) {if(condConsumer_.wait_for(lo…

内存泄漏

文章目录 内存泄漏发现问题topVisualVMArthas 原因分析代码层面并发请求 诊断问题MAT原理 –支配树获取运行时快照 内存泄漏 内存泄漏&#xff08;memory leak&#xff09;&#xff1a;在Java中如果不再使用一个对象&#xff0c;但是该对象依然在GC ROOT的引用链上&#xff0c;…

12.第二阶段x86游戏实战2-CE找基地址

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 本次游戏没法给 内容参考于&#xff1a;微尘网络安全 本人写的内容纯属胡编乱造&#xff0c;全都是合成造假&#xff0c;仅仅只是为了娱乐&#xff0c;请不要…

全网最全:企业微信用户授权登录对接完整流程

Hello&#xff01;欢迎各位新老朋友来看小弟博客&#xff0c;祝大家事业顺利&#xff0c;财源广进&#xff01;&#xff01; 主题&#xff1a;企业微信用户授权与校验完整对接流程 一&#xff1a;构造第三方应用授权链接 如果第三方应用需要在打开的网页里面携带用户的身份信息…

吸尘器制造5G智能工厂物联数字孪生平台,推进制造业数字化转型

吸尘器制造行业&#xff0c;作为传统制造业的重要组成部分&#xff0c;也在积极探索如何通过先进技术实现生产模式的创新升级。5G智能工厂与物联数字孪生平台的融合应用&#xff0c;为吸尘器制造业的数字化转型铺设了一条高速通道&#xff0c;不仅极大地提升生产效率&#xff0…

华为---代理ARP简介及示例配置

目录 1. 概念 2. 前提条件 3. 使用环境 4. 工作过程 5. 优点 6. 缺点 7. 示例配置 7.1 示例场景 7.2基本配置 7.3 配置端口隔离 7.4 开启代理ARP 7.4.1 VLAN内代理ARP 7.4.2 VLAN间代理ARP 7.4.3路由式ARP代理 1. 概念 代理ARP&#xff08;Proxy ARP&#xff09;&…

GAMES202 作业1

参考&#xff1a;games202作业1 SM 首先是利用shadow map去生成尝试生成硬阴影。根据作业的要求 我们完成光源对物体的mvp矩阵 CalcLightMVP(translate, scale) {let lightMVP mat4.create();let modelMatrix mat4.create();let viewMatrix mat4.create();let projection…

Bigemap GIS Office 2024注册机 全能版地图下载软件

对于需要利用GIS信息进行编辑、设计的用户来说&#xff0c;Bigemap GIS Office占有重要地位。用户可以使用Bigemap GIS Office作为工具进行设计、分析、共享、管理和发布地理信息。Bigemap GIS Office能实现多种数据流转、嵌入、融合以及更多地为用户提供数据的增强处理及多种分…

文心一言 VS 讯飞星火 VS chatgpt (351)-- 算法导论24.1 2题

二、证明推论24.3。推论 24.3 的内容是设 G ( V , E ) G(V,E) G(V,E)是一带权重的源结点为 s s s的有向图&#xff0c;其权重函数为 ω : E → R ω:\boldsymbol{E→R} ω:E→R。假定图 G G G不包含从源结点 s s s可以到达的权重为负值的环路&#xff0c;则对于所有结点 v ∈ …

完美转发、C++11中与线程相关的std::ref

目录 模板中的万能引用 std::forward实现完美转发 C11中与线程相关的std::ref 线程函数参数 用函数指针作为线程函数 用lambda表达式作为线程函数 模板中的万能引用 void Func(int& x) {cout << "左值引用" << endl; } void Func(int&&am…