Linux Kafka 3.5 KRaft模式集群部署

news2025/1/16 15:51:44

这里是weihubeats,觉得文章不错可以关注公众号小奏技术,文章首发。拒绝营销号,拒绝标题党

背景

kafkaKIP-500引入了KRaft替代Zookeeper来实现自我管理元数据

详细信息可以看原文链接

  • KIP-500

KRaft简介

KRaft是kafka用来取代zookeeper的分布式协调管理组件。

架构改变

原先依赖于Zookeeper选举出一个controller
现在由KRaft集群中自己选举,产生一个controller

优点

  • Kafka不用再依赖外部框架,能够做到独立运行
  • Kafka集群扩展时不用再受到Zookeeper读写能力的限制

更多优点和缺点这里暂时不太多讨论主要以部署为主

部署3节点kafaka集群

KRaft部署方式支持controllerbroker在同一进程。也支持分开部署
线上推荐分开部署。这里由于是测试集群,打算controllerbroker在同一进程部署

记得所有机器90929093端口打开

下载Kafka

wget https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz

这里第一次下载报错说证书已过期,添加证书忽略下载

wget --no-check-certificate https://dlcdn.apache.org/kafka/3.5.0/kafka_2.13-3.5.0.tgz

发现国内服务器下载国外软件还是非常慢。最终决定找国内镜像。

  • 阿里云Kafka镜像:http://mirrors.aliyun.com/apache/kafka/3.5.0/?spm=a2c6h.25603864.0.0.3c7d126emg02YS

使用国内镜像下载

wget http://mirrors.aliyun.com/apache/kafka/3.5.0/kafka_2.13-3.5.0.tgz

三台机器都执行

解压

tar -xzf kafka_2.13-3.5.0.tgz

三台机器都执行

给集群生成一个UUID

我们进入到解压的bin目录,我这里是/data/kafka_2.13-3.5.0/bin
然后执行如下命令

kafka_2.13-3.0.0/bin/kafka-storage.sh random-uuid

单台机器生成即可

执行完会生产一个字符串,类似这样xgK3spReSO7ijVK4rEbbbQ

格式化存储路径

sh kafka-storage.sh format -t xgK3spReSO7ijVK4rEbbbQ  -c ../config/kraft/server.properties

三台机器都执行

修改配置

我们这里要修改三台机器的server.properties配置
我这里的路径是在/data/kafka_2.13-3.5.0/config/kraft/server.properties

  • node1
node.id = 1
controller.quorum.voters = 1@192.168.1.1:9093,2@92.168.1.2:9093,3@92.168.1.3:9093
process.roles = broker,controller
listeners=PLAINTEXT://192.168.1.1:9092,CONTROLLER://92.168.1.1:9093
log.dirs=/data/kakfa01/logs
  • node2
node.id = 2
controller.quorum.voters = 1@192.168.1.1:9093,2@92.168.1.2:9093,3@92.168.1.3:9093
process.roles = broker,controller
listeners=PLAINTEXT://192.168.1.2:9092,CONTROLLER://92.168.1.2:9093
log.dirs=/data/kakfa02/logs
  • node3
node.id = 3
controller.quorum.voters = 1@192.168.1.1:9093,2@92.168.1.2:9093,3@92.168.1.3:9093
process.roles = broker,controller
listeners=PLAINTEXT://192.168.1.3:9092,CONTROLLER://92.168.1.3:9093
log.dirs=/data/kakfa03/logs

启动集群

export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G"&&nohup sh /data/kafka_2.13-3.5.0/bin/kafka-server-start.sh /data/kafka_2.13-3.5.0/config/kraft/server.properties &

三台机器都执行

启动完我们就有了一个三节点的kafka集群

测试

创建topic

sh kafka-topics.sh --create --topic xiaozou --partitions 1 --replication-factor 1 --bootstrap-server 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092

查看topic

sh kafka-topics.sh --list --bootstrap-server 192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092

代码测试

  • 生产消息
public class KafkaProducer {

    private static final String TOPIC = "xiaozou";
    private static final String BOOTSTRAP_SERVERS = "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092";

    public static void main(String[] args) {
        // 生产消息
        produceMessage();
    }

    private static void produceMessage() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        Producer<String, String> producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);

        try {
            for (int i = 0; i < 10; i++) {
                String message = "小奏message " + i;
                System.out.println("开始发送消息");
                Future<RecordMetadata> send = producer.send(new ProducerRecord<>(TOPIC, message));
                RecordMetadata recordMetadata = send.get();
                System.out.println("Produced message: " + message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }

}

  • 消费消息
public class KafkaConsumerExample {

    private static final String TOPIC_NAME = "xiaozou";

    private static final String GROUP_ID = "xiaozou_gid";

    private static final String BOOTSTRAP_SERVERS = "192.168.1.1:9092,192.168.1.2:9092,192.168.1.3:9092";


    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);



        // 创建消费者实例
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅主题
        consumer.subscribe(Collections.singletonList(TOPIC_NAME));

        // 消费消息
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("接收到消息:key = " + record.key() + ", value = " + record.value() +
                        ", partition = " + record.partition() + ", offset = " + record.offset());
                }
                consumer.commitSync(); // 手动提交偏移量
            }
        } finally {
            consumer.close();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/983429.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

号外号外,桓峰基因单细胞生信分析免费培训课程即将开始,快来报名吧!

单细胞生信分析教程 桓峰基因公众号推出单细胞生信分析教程并配有视频在线教程&#xff0c;目前整理出来的相关教程目录如下&#xff1a; Topic 6. 克隆进化之 Canopy Topic 7. 克隆进化之 Cardelino Topic 8. 克隆进化之 RobustClone SCS【1】今天开启单细胞之旅&#xff0c;述…

FPGA实战小项目3

基于FPGA的波形发生器 基于FPGA的波形发生器 基于FPGA的beep音乐播放器设计 基于FPGA的beep音乐播放器设计 基于FPGA的cordic算法实现DDS sin和cosine波形的产生 基于FPGA的cordic算法实现DDS sin和cosine波形的产生

PY32F003F18端口复用功能映射

PY32F003F18端口复用功能映射&#xff0c;GPIO引脚可配置为"输入&#xff0c;输出,模拟或复用功能。 一、端口A复用功能映射 端口A复用功能映射表里&#xff0c;每个引脚都有AF0~AF15&#xff0c;修改AF0~AF15的值&#xff0c;就可以将对应复用用能引脚映射到CPU引脚上。…

【pthreads】支持vs2022构建

新增了一个vc16的目录 https://github.com/BrianGladman/pthreads/commit/60179353ef753ca171dee5199ec5fa54580835b0 官方已经支持直接支持v143 下载最新版,直接打开build.vs目录的sln vs 会提示安装python调试环境 安装过程自己失败取消了 再次打开就没弹出了。 直接构建静态…

2023全国大学生数学建模A题B题C题D题E题竞赛选题建议,思路模型

目录 国赛数学建模思路模型代码&#xff1a;9.7开赛后第一时间更新&#xff0c;完整思路获取见文末名片 一、题目选择 二、国赛摘要及论文写作技巧 1、国赛摘要 2、论文写作技巧 三、历年国赛真题及对应算法模型 完整国赛题思路模型获取见此 国赛数学建模思路模型代码&am…

企业架构LNMP学习笔记20

Nginx Location匹配规则&#xff1a; URI&#xff1a;统一资源标识符。 URN&#xff1a;统一资源名称。 URL&#xff1a;统一资源定位符。URL是更细化一点。 1&#xff09;精确匹配&#xff1a; location / {#规则 } 则匹配到 Example Domain 这种请求。 2&#xff09;~ 大…

RocketMQ consumer 和 queue 对应关系

参考 Consumer and Consumer Group Load Balancing https://rocketmq.apache.org/docs/4.x/consumer/01concept2 旧版本MQ结论 消费者应用和topic队列一对多的关系。 &#xff08;一个消费组consumer group里&#xff0c;一个消费者应用可以消费多个队列的消息。一个队列的消…

OpenMLDB 基于 Kubernetes 的部署全攻略

简介 Kubernetes 作为当前工业界流行的云原生容器编排和管理工具&#xff0c;在大量项目实践中被使用。目前&#xff0c;OpenMLDB 的离线引擎和在线引擎&#xff0c;均已经完整支持了基于 Kubernetes 的部署&#xff0c;可以实现更为方便的管理功能。本文将分别介绍离线和在线…

超级好用的10个思维导图模板

思维导图是一种非常有用的工具&#xff0c;可以被广泛应用于不同领域的人群。学生可以用思维导图来整理知识&#xff0c;老师可以用思维导图规划教学内容&#xff0c;设计课堂活动&#xff0c;还可以帮助学生梳理知识结构。各行各业的人都可以运用思维导图处理自己工作中的问题…

openWRT SFTP 远程文件传输

文章目录 前言 1. openssh-sftp-server 安装2. 安装cpolar工具3.配置SFTP远程访问4.固定远程连接地址 前言 本次教程我们将在OpenWRT上安装SFTP服务&#xff0c;并结合cpolar内网穿透&#xff0c;创建安全隧道映射22端口&#xff0c;实现在公网环境下远程OpenWRT SFTP&#xf…

如何阻止事件冒泡(event bubbling)?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ 原生 JavaScript⭐ jQuery⭐ React⭐Vue.js⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#xff1a;探索Web开发的奇妙世界 记得点击上方或者右侧链接订阅本专栏哦 几何带你启航前端之旅 欢迎来到前端入门之旅&#xff01;这个专栏是为那些对…

龙蜥白皮书精选:龙蜥安全漏洞管理体系介绍

文/安全委员会 近日&#xff0c;龙蜥社区联合启明星辰、绿盟、360、阿里云、统信软件、浪潮信息、中兴通讯&#xff5c;中兴新支点、Intel、中科院软件所等 23 家单位正式成立了龙蜥社区安全联盟&#xff08;OASA&#xff0c;OpenAnolisSecurityAlliance&#xff09;。龙蜥社区…

解决charles只能使用30分钟

问题描述 Charles 30分钟会自动关闭&#xff0c;弹出一个弹窗。 解决步骤 1.网上查找后发现是需要注册一下。 2.打开Charles&#xff0c;如图的操作顺序 3.框内输入 Registered Name: https://zhile.io License Key: 48891cf209c6d32bf4 4.重启即可

momentjs实现DatePicker时间禁用

momentjs是一个处理时间的js库&#xff0c;简洁易用。 浅析一下&#xff0c; momentjs 在vue中对DatePicker时间组件的禁用实践。 一&#xff0c;npm下载 npm install moment --save二&#xff0c;particles.json中 "dependencies": {"axios": "^…

【C++技能树】多态解析

Halo&#xff0c;这里是Ppeua。平时主要更新C&#xff0c;数据结构算法&#xff0c;Linux与ROS…感兴趣就关注我bua&#xff01; 文章目录 0.多态的概念0.1 多态的定义 1. 重写2.Final与Override3.抽象类4.多态中的内存分布.4.1虚表存在哪里? 5.多态调用原理5.1 动态绑定与静…

解锁前端Vue3宝藏级资料 第一章 Vue3项目创建 3 (Vite 创建 vue项目 )

目前&#xff0c;Vue.js 官网建议在创建新项目的时候要使用 Vite 而不是 Vue CLI&#xff0c;尽量在开发环境中以 Vite 它作为 Vue.js 的编译基础来使用。Vite 是 Vue.js 作者Evan You 制作的 webpack 的无捆绑替代品&#xff0c;Vite vue 方式很可能会成为未来的vue项目主流方…

Spring学习|Spring配置:别名、import、依赖注入:构造器注入、Set方式注入(重点)、拓展方式注入

Spring配置 别名 我们可以在bean.xml中用alias标签给bean对象起一个别名&#xff0c;当我们在客户端通过context对象使用getBean方法获取对象时&#xff0c;可以通过这个别名获取&#xff0c;另一种方式是&#xff0c;可以在<bean标签后面加一个name&#xff0c;这个name后…

js 根据键判断值

最原始的写法&#xff1a; 改进后的写法&#xff1a; const DeviceTypeObj {SO2: "SO<sub>2</sub>",CO: "CO",NO: "NO",NO2: "NO<sub>2</sub>",O3: "O<sub>3</sub>", let value Dev…

亿发软件:智慧门店商超系统,2023新零售POS数字运营一体化管理

2023年9月6日&#xff0c;山东济宁一家超市因为酸奶价格标签错误而引发了广泛关注。标签原本显示几十个人为9.9元&#xff0c;但特价销售价却标为10元。这一小小的错误却在社交媒体上引发了轩然大波&#xff0c;让超市一度处于舆论的风口浪尖。超市工作人员回应&#xff0c;表示…

Python基础语法:数据分析利器

⭐️⭐️⭐️⭐️⭐️欢迎来到我的博客⭐️⭐️⭐️⭐️⭐️ &#x1f434;作者&#xff1a;秋无之地 &#x1f434;简介&#xff1a;CSDN爬虫、后端、大数据领域创作者。目前从事python爬虫、后端和大数据等相关工作&#xff0c;主要擅长领域有&#xff1a;爬虫、后端、大数据…