kafka 集群 KRaft 模式搭建

news2025/1/23 4:12:28

Apache Kafka是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序

Kafka 官网:https://kafka.apache.org/

Kafka 在2.8版本之后,移除了对Zookeeper的依赖,将依赖于ZooKeeper的控制器改造成了基于Kafka Raft的Quorm控制器,因此可以在不使用ZooKeeper的情况下实现集群

本文讲解 Kafka KRaft 模式集群搭建

笔者使用3台服务器,它们的 ip 分别是 192.168.3.232、192.168.2.90、192.168.2.11

目录

1、官网下载 Kafka

2、配置 Kafka

3、创建 KRaft 集群

4、启动 Kafka KRaft 集群

5、关闭 Kafka KRaft 集群

6、测试 KRaft 集群


1、官网下载 Kafka

这里笔者下载最新版3.6.0

下载完成

将kafka分别上传到3台linux

在3台服务器上分别创建 kafka 安装目录

mkdir /usr/local/kafka

在3台服务器上分别将 kafka 安装包解压到新创建的 kafka 目录

tar -xzf kafka_2.13-3.6.0.tgz -C /usr/local/kafka

2、配置 Kafka

进入配置目录

cd /usr/local/kafka/kafka_2.13-3.6.0/config/kraft

编辑配置文件

vi server.properties

server.properties 配置说明

node.id 是kafka的broker节点id

controller.quorum.voters 配置的是 kafka 集群中的其他节点,kafka Controller的投票者配置,定义了一组Controller节点,其中包括它们各自的 id 和网络地址

advertised.listeners 是节点自己的监听地址

192.168.3.232 节点配置

node.id = 1

192.168.2.90 节点配置

node.id = 2

192.168.2.11节点配置

node.id = 3

3、创建 KRaft 集群

生成集群id

在任意一个节点上执行就行,笔者使用 192.168.3.232 节点

进入bin 目录

cd /usr/local/kafka/kafka_2.13-3.6.0/bin

执行生成集群 id 命令

./kafka-storage.sh random-uuid

生成后保存生成的字符串    82vqfbdSTO2QzS_M0Su1Bw

然后分别在3台机器上执行下面命令

为方便执行命令,先回到 kafka安装目录

cd /usr/local/kafka/kafka_2.13-3.6.0

再执行命令,完成集群元数据配置

bin/kafka-storage.sh format -t 82vqfbdSTO2QzS_M0Su1Bw -c config/kraft/server.properties

192.168.3.232 节点

192.168.2.90 节点

192.168.2.11节点

上面命令执行完成后,开放防火墙端口

kafka 需要开放 9092 端口和 9093 端口

3台机器上分别开放 9092 和 9093 端口

查看开放端口

firewall-cmd --zone=public --list-ports

 开放9092 端口

firewall-cmd --zone=public --add-port=9092/tcp --permanent

  开放9093 端口

firewall-cmd --zone=public --add-port=9093/tcp --permanent

更新防火墙规则(无需断开连接,动态添加规则)

firewall-cmd --reload

4、启动 Kafka KRaft 集群

在3台机器上分别启动

下面2个命令均可启动

bin/kafka-server-start.sh -daemon config/kraft/server.properties

bin/kafka-server-start.sh config/kraft/server.properties

笔者使用第二个启动命令 启动,效果看下图

当 3 个节点都出现 Kafka Server started,集群启动成功

5、关闭 Kafka KRaft 集群

关闭命令

bin/kafka-server-stop.sh

在 3 个节点上分别执行关闭命令

6、测试 KRaft 集群

新建 maven 项目,添加 Kafka 依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

笔者新建 maven项目 kafka-learn

kafka-learn 项目 pom 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wsjzzcbq</groupId>
    <artifactId>kafka-learn</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.6.0</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>11</source>
                    <target>11</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

新建生产者 ProducerDemo

package com.wsjzzcbq;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * Demo
 *
 * @author wsjz
 * @date 2023/11/24
 */
public class ProducerDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        //配置集群节点信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");
        //配置序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<>(properties);

        //topic 名称是demo_topic
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>("demo_topic", "明月别枝惊鹊");
        RecordMetadata recordMetadata = producer.send(producerRecord).get();
        System.out.println(recordMetadata.topic());
        System.out.println(recordMetadata.partition());
        System.out.println(recordMetadata.offset());

    }
}

新建消费者 ConsumerDemo

package com.wsjzzcbq;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

/**
 * ConsumerDemo
 *
 * @author wsjz
 * @date 2023/11/24
 */
public class ConsumerDemo {

    public static void main(String[] args) {
        Properties properties = new Properties();
        // 配置集群节点信息
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.3.232:9092,192.168.2.90:9092,192.168.2.11:9092");

        // 消费分组名
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo_group");
        // 序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);
        // 消费者订阅主题
        consumer.subscribe(Arrays.asList("demo_topic"));

        while (true) {
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String,String> record:records) {
                System.out.printf("收到消息:partition=%d, offset=%d, key=%s, value=%s%n",record.partition(),
                        record.offset(),record.key(),record.value());
            }
        }
    }
}

运行测试

效果图

消息成功发送并成功消费

至此完

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1251712.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL 库操作 | 表操作

文章目录 一.MySQL库的操作1.创建数据库2.字符集和校验规则3.操纵数据库 二.MySQL表的操作1.创建表2.操作表3.删除表 一.MySQL库的操作 1.创建数据库 创建数据库 创建数据库的SQL如下&#xff1a; CREATE DATABASE [IF NOT EXISTS] db_name [[DEFAULT] CHARSETcharset_name…

AT89S52单片机智能寻迹小车自动红外避障趋光检测发声发光设计

wx供重浩&#xff1a;创享日记 对话框发送&#xff1a;寻迹 获取完整说明报告源程序数据 小车具有以下几个功能&#xff1a;自动避障功能&#xff1b;寻迹功能&#xff08;按路面的黑色轨道行驶&#xff09;&#xff1b;趋光功能&#xff08;寻找前方的点光源并行驶到位&…

数据查询,让表单之间“联动”起来!丨三叠云

数据查询 路径 表单设计 >> 字段属性 功能简介 「数据查询」增加触发「数据联动」功能。本次对「数据查询」字段的功能进行优化&#xff0c;这次升级包含「编辑关联数据」、「导入数据」「拷贝数据」&#xff0c;以提高数据操作时的便利。 适用场景&#xff1a; 销…

.NET6 开发一个检查某些状态持续多长时间的类

📢欢迎点赞 :👍 收藏 ⭐留言 📝 如有错误敬请指正,赐人玫瑰,手留余香!📢本文作者:由webmote 原创📢作者格言:新的征程,我们面对的不仅仅是技术还有人心,人心不可测,海水不可量,唯有技术,才是深沉黑夜中的一座闪烁的灯塔 !序言 在代码的世界里,时常碰撞…

操作系统:操作系统教程第六版(骆斌、葛季栋、费翔林)习题二处理器管理

目录 前言1. 简答题2. 应用题 前言 本系列文章是针对操作系统教程第六版&#xff08;骆斌、葛季栋、费翔林&#xff09;的习题解答&#xff0c;其中简答题部分为博主自己搜索整理的&#xff0c;错漏之处在所难免。应用题部分有答案为依据。 1. 简答题 &#xff08;1&#xf…

1.前端--基本概念【2023.11.25】

1.网站与网页 网站是网页集合。 网页是网站中的一“页”&#xff0c;通常是 HTML 格式的文件&#xff0c;它要通过浏览器来阅读。 2.Web的构成 主要包括结构&#xff08;Structure&#xff09; 、表现&#xff08;Presentation&#xff09;和行为&#xff08;Behavior&#xff…

如何学习VBA:3.2.8 OnTime方法与OnKey方法

我给VBA的定义&#xff1a;VBA是个人小型自动化处理的有效工具。利用好了&#xff0c;可以大大提高自己的劳动效率&#xff0c;而且可以提高数据处理的准确度。我推出的VBA系列教程共九套和一部VBA汉英手册&#xff0c;现在已经全部完成&#xff0c;希望大家利用、学习。 如果…

[架构之路-250]:目标系统 - 设计方法 - 软件工程 - 需求工程 - 需求开发:如何用图形表达需求,面向对象需求分析OOA与UML视图

目录 一、面向对象需求分析 1.1 面向对象的基本概念 1.2 什么是面向对象的需求分析 2.3 什么是UML图 2.4 UML视图 2.4 UML图与UML视图的关系 2.5 UML图与面向对象需求分析的关系 二、需求分析相关的UML图形与视图&#xff1a;14视图 2.1 用例模型与用例图&#xff1a;…

2016年11月10日 Go生态洞察:七年的Go语言旅程

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

解决No module named ‘ultralytics‘

win10Python3.7环境运行yolov5的程序时&#xff0c;程序语句from ultralytics.utils.plotting import Annotator, colors, save_one_box报错。 报错如下图&#xff1a; Exception has occurred: ModuleNotFoundError No module named ultralytics 解决方法&#xff1a; 在c…

Guitar Pro软件8.0官方最新版本下载

Guitar Pro 8是一款由法国Arobas Music公司开发的吉他学习与MIDI音序制作辅助软件&#xff0c;它具有丰富的功能&#xff0c;包括吉他谱、六线谱、四线谱绘制、打印、查看、试听等方面&#xff0c;能够帮助音乐爱好者更方便地进行音乐学习和创作。Guitar Pro 8拥有独特的gtp格式…

JVM的小知识总结

加载时jvm做了这三件事&#xff1a; 1&#xff09;通过一个类的全限定名来获取该类的二进制字节流 什么是全限定类名&#xff1f; 就是类名全称&#xff0c;带包路径的用点隔开&#xff0c;例如: java.lang.String。 即全限定名 包名类型 非限定类名也叫短名&#xff0c;就…

C语言你爱我么?(ZZULIOJ 1205:你爱我么?)

题目描述 LCY买个n束花准备送给她暗恋的女生&#xff0c;但是他不知道这个女生是否喜欢他。这时候一个算命先生告诉他让他查花瓣数&#xff0c;第一个花瓣表示"爱"&#xff0c;第二个花瓣表示"不爱"&#xff0c;第三个花瓣表示"爱"..... 为了使最…

Unity UGUI的自动布局-LayoutGroup(水平布局)组件

Horizontal Layout Group | Unity UI | 1.0.0 1. 什么是HorizontalLayoutGroup组件&#xff1f; HorizontalLayoutGroup是Unity UGUI中的一种布局组件&#xff0c;用于在水平方向上对子物体进行排列和布局。它可以根据一定的规则自动调整子物体的位置和大小&#xff0c;使它们…

间接法加窗分析信号的功率谱

本篇文章是博主在通信等领域学习时&#xff0c;用于个人学习、研究或者欣赏使用&#xff0c;并基于博主对通信等领域的一些理解而记录的学习摘录和笔记&#xff0c;若有不当和侵权之处&#xff0c;指出后将会立即改正&#xff0c;还望谅解。文章分类在 通信领域笔记&#xff…

【图解系列】一张图带你了解 DevOps 生态工具

一张图带你了解 DevOps 生态工具 ✅ 协作&#xff08;Collaborate&#xff09;&#xff1a;JIRA、Confluence 大家肯定不陌生了&#xff0c;我之前也写过利用 Jekyll 搭建个人博客的帖子。✅ 构建&#xff08;Build&#xff09;&#xff1a;常用的 SCM&#xff08;Software Con…

每日一题--删除链表的倒数第 N 个结点

破阵子-晏殊 燕子欲归时节&#xff0c;高楼昨夜西风。 求得人间成小会&#xff0c;试把金尊傍菊丛。歌长粉面红。 斜日更穿帘幕&#xff0c;微凉渐入梧桐。 多少襟情言不尽&#xff0c;写向蛮笺曲调中。此情千万重。 目录 题目描述&#xff1a; 思路分析&#xff1a; 方法及…

01 _ 高并发系统:它的通用设计方法是什么?

我们知道&#xff0c;高并发代表着大流量&#xff0c;高并发系统设计的魅力就在于我们能够凭借自己的聪明才智设计巧妙的方案&#xff0c;从而抵抗巨大流量的冲击&#xff0c;带给用户更好的使用体验。这些方案好似能操纵流量&#xff0c;让流量更加平稳地被系统中的服务和组件…

全面(16万字)深入探索深度学习:基础原理到经典模型网络的全面解析

前言 Stacking(堆叠) 网页调试 学习率&#xff1a;它决定了模型在每一次迭代中更新参数的幅度激活函数-更加详细 激活函数的意义: 激活函数主要是让模型具有非线性数据拟合的能力&#xff0c;也就是能够对非线性数据进行分割/建模 如果没有激活函数&#xff1a; 第一个隐层: l…

php订单发起退款(余额和微信支付)

index.html <a class="btn btn-danger btn-change btn-tuikuan btn-disabled" href="javascript:;"><i class="fa fa-tuikuan"></i> 订单退款</a>-->order.js // 为表格绑定事件Table.api.bindevent(table);//退款…