【ZooKeeper to KRaft Migration】kafka 3.4版本zookeeper数据迁移到kraft

news2025/1/13 3:10:10

引言

kafka在3.X版本后内置了kraft用来替代zookeeper管理集群,但是在升级的过程中发现,许多升级的文档都是只有新部署安装kraft版本,而没有涉及到数据迁移相关的资料,这样如果直接变更的话,会导致kakfa中的数据全部丢失,这在客户的生产环境中是不可接受的,所以一直在寻求数据迁移的方案,经过查阅大量资料后,基于官方的方案(略了太多步骤),终于折腾出了迁移的方案。

准备工作

  1. 数据迁移前必须先将kafka升级到3.4以上版本,即 kakfa 3.4 + zookeeper集群的方式。(升级较为简单,参照kafka升级)
  2. 目前的迁移方式仅支持controller和broker分开部署,也即如果原来是3台kafka机器,迁移后可能会变成3台controller和3台broker。(建议controller使用3台新机器,broker可以使用原有的这个已升级到3.4版本的kafka)
  3. 数据迁移前建议记录一下目前的数据和消费组的offset位置,用于迁移后的验证,看是否迁移成功
  4. 机器说明
192.168.0.5
192.168.0.6
192.168.0.7
1. 上面三台为kafka 3.4 + zookeeper集群机器,后续说的“旧的kafka”均指这3台。
2. 迁移后的broker也同样在这三台上面,只是旧的kafka启动时指定的配置文件是config/server.properties,新的broker启动时指定的配置文件是config/kraft/broker.properties

192.168.0.110
192.168.0.111
192.168.0.112
这三台为新的controller集群机器,后续说的"新的kafka"均指这3台。

迁移步骤

1. 获取旧有的kafka的cluster id

可以在旧的kafka的bin目录下执行下述语句查看

./bin/zookeeper-shell.sh localhost:2181 get /cluster/id

或者也可以在config/server.properties中配置的log.dirs目录中查看meta.properties文件。
在这里插入图片描述

2. 打开新的kafka机器controller的trace日志,用于后续观察迁移是否完成
cd /opt/kafka/config   (改为自己的kafka目录)
vim log4j.properties

添加如下配置保存后退出,注意将三台controller机器都添加上。

log4j.logger.org.apache.kafka.metadata.migration=TRACE

在这里插入图片描述

3. 编辑新的kafka的controller.properties文件

目录为 kafka/config/kraft/controller.properties
主要配置说明:

node.id   必须设置为与之前旧的kafka集群中的broker.id不一样(旧的broker.id可以在旧机器的config/server.properties查看)
controller.quorum.voters  需要配置所有的controller节点,格式为nodeid@ip:端口。如目前我所配置的node.id分别为3000 4000 5000
listeners  修改为自己想要监听的端口,默认为9093
log.dirs    controller数据存放的目录
zookeeper.metadata.migration.enable  数据迁移的开关
zookeeper.connect   设置zookeeper的集群地址

例子:

process.roles=controller
node.id=3000
controller.quorum.voters=3000@192.168.0.110:9093,4000@192.168.0.111:9093,5000@192.168.0.112:9093
listeners=CONTROLLER://:9093
controller.listener.names=CONTROLLER
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/krfdata
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.metadata.migration.enable =true
zookeeper.connect =192.168.0.5:2181,192.168.0.6:2181,192.168.0.7:2181

注意:
三台controller机器的node.id必须不一样,比如(3000、4000、5000)等,将三台controller机器全部修改完成后保存退出

4. 格式化controller集群的id

进入到controller机器的kafka bin目录下面,执行如下语句格式化

./bin/kafka-storage.sh format -t 第一步获取的cluster.id -c ./config/kraft/controller.properties

格式化完成后,会在controller.propertie中配置的log.dirs目录下,生成meta.properties文件
在这里插入图片描述

5. 三台全部执行完成后,依次启动三台机器的controller,将controller集群启动

附一个启动命令,注意必须指定使用controller.properties文件启动

nohup ./bin/kafka-server-start.sh ./config/kraft/controller.properties &

启动完成后记得看一下日志和kafka进程,确保启动成功

6. 修改旧的kafka集群的server.properties配置,打开迁移模式

在配置文件中添加如下配置:
注意controller.quorum.voters修改为controller集群的ID和IP地址,ID和IP地址必须与controller集群的匹配上

inter.broker.protocol.version=3.4
log.message.format.version=3.4
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
zookeeper.metadata.migration.enable=true
controller.quorum.voters=3000@192.168.0.110:9093,4000@192.168.0.111:9093,5000@192.168.0.112:9093
controller.listener.names=CONTROLLER
7. 将所有的三台旧的kafka的server.properties文件修改完成后,保存退出,重启三台旧的kakfa
8. 重启三台旧的kafka后,就会自动开始数据迁移,在controller机器的server.log日志中,会出现TRACE级别的日志,数据迁移完成后会打印如下语句。
Completed migration of metadata from Zookeeper to KRaft

可以使用grep "Completed migration" server.log过滤日志查看
在这里插入图片描述

注意: 有多台controller机器的集群时,该日志可能只在某一台的controller机器的server.log中出现,记得都检查一下

9. 数据迁移完成后,修改旧的kafka的config/kraft目录下的broker.properties文件,可以直接复制下面的内容,修改对应的参数即可

配置说明:

node.id      必须设置为与旧的kafka的broker.id一致,如果旧的broker.id分别为1、2、3,新的也必须分别为1、2、3
controller.quorum.voters    设置为controller集群的ID和IP地址
log.dirs 必须与旧的kafka的log.dirs保持一致
advertised.listeners 修改为本机的IP

补充比较容易混淆的点,新的controller的node.id必须与旧的kafka的broker.id不一样,新的broker的node.id必须与旧的kafka的broker.id一致

process.roles=broker
node.id=1
controller.quorum.voters=3000@192.168.0.110:9093,4000@192.168.0.111:9093,5000@192.168.0.112:9093
listeners=PLAINTEXT://:9092
inter.broker.listener.name=PLAINTEXT
advertised.listeners=PLAINTEXT://192.168.0.5:9092
controller.listener.names=CONTROLLER
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/kfdata
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
10. 修改完成后停掉旧的kafka和zookeeper,然后使用新的配置启动kafka,将所有的kafka启动后,就是开始使用kraft来管理集群。

附上一个启动命令

nohup ./bin/kafka-server-start.sh ./config/kraft/broker.properties &
11. 修改controller集群的配置,把迁移的开关关掉,退出迁移模式

注释掉最后两行

process.roles=controller
node.id=3000
controller.quorum.voters=3000@192.168.0.110:9093,4000@192.168.0.111:9093,5000@192.168.0.112:9093
listeners=CONTROLLER://:9093
controller.listener.names=CONTROLLER
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/kafka/krfdata
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#zookeeper.metadata.migration.enable =true
#zookeeper.connect =192.168.0.5:2181,192.168.0.6:2181,192.168.0.7:2181
12. 注释后依次重启controller机器,整体的迁移流程全部完成
13. 迁移完成后可以在broker机器(即旧的kafka机器)的bin目录下执行语句,验证数据以及consumer的offset位置是否还存在。

附上一条语句

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group 消费者组名称 -describe

最后附上官方的方案:

kraft_zk_migration

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/460435.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

升级gpt4-GPT最新版本怎么下载使用

怎么 让gpt-3的模型升级gpt4 GPT-4是OpenAI的未来版本,目前还未发布。因此,我们无法准确指导如何将GPT-3升级到GPT-4。要升级GPT-3,需要进行大量的研究和开发工作。如果OpenAI发行了GPT-4的预览版,那么可能需要花费大量的时间和资…

Redission实现分布式锁lock()和tryLock()方法的区别

lock.lock(30, TimeUnit.SECONDS); // 尝试获取锁30秒,如果获取不到则放弃 //尝试获取锁,等待5秒,持有锁10秒钟 boolean success lock.tryLock(0, 10, TimeUnit.SECONDS); Redisson 是一种基于 Redis 的分布式锁框架,提供了 lo…

国产蓝牙芯片OM6621P/HS6621系列门锁方案

在5G、物联网以及互联网家装市场的快速发展等多重因素的作用下,中国智能家居市场展现蓬勃发展态势。作为智能家居“入口”产品以及家庭智能安防产品的核心单品,智能门锁以其区别于传统机械锁更具安全性、便利性、可扩展性的优势,逐渐成为智能…

Java+GeoTools(开源的Java GIS工具包)快速入门-实现读取shp文件并显示

场景 GeoTools GeoTools 是一个开源的 Java GIS 工具包,可利用它来开发符合标准的地理信息系统。 GeoTools 提供了 OGC (Open Geospatial Consortium) 规范的一个实现来作为他们的开发。 官网地址: GeoTools The Open Source Java GIS Toolkit — GeoTools 参考…

Linux账号密码安全策略设置

前言 随着云计算厂商的兴起,云资源如ECS不再只有企业或者公司才会使用,普通人也可以自己买一台ECS来搭建自己的应用或者网站。虽然云计算厂商帮我们做了很多安全相关的工作,但并不代表我们的机器资源就绝对是安全的。 要知道有很多事情是云…

群策群力:组织效率,管理?沟通?协作?

你好,我是苏杰。今天让我们一起聊聊组织效率的话题。 团队大了,也能够自我造血以后,如何可持续发展就会成为我们关注的焦点。产品会衰退、行业有生命周期,但人的成长,以及人构成的组织,可以帮我们不断成功…

【大厂直通车】飞猪旅行日常实习_测开面经

📑哈喽,大家好,我是小浪;📱本专栏致力于持续更新最新各大厂面经,实习消息,招聘要求; 那么目前价格也仅仅是定到了29.9💰;非常的实惠,一杯奶茶钱🍵; 🧃对于订阅本专栏的同学们,博主在努力更新,那么最近忙于学校的考试,没来得及正常更新,非常抱歉,这几…

论文综述——DORE: Document Ordered Relation Extraction based on Generative Framework

DORE: Document Ordered Relation Extraction based on Generative Framework 文章的主要目标是对文档级的关系抽取。以往的研究主要是基于分类的研究,生成式关系抽取研究较少而且性能不佳。 文档级相比于句子级的关系抽取存在序列长度过长,以及实体定位…

消息队列选型

消息队列选型 大家好,我是易安!今天我们聊下消息队列常见选型。 消息队列作用 谈选型之前我们先讲下我们为什么需要消息队列。 消息队列是一种很流行的技术,自从系统间开始通信时,消息队列就出现了。然而,对消息队列给…

java--时间类实例2--毫秒/秒运算(instant、ZoneDateTime)

有些时候给出毫秒值来让我们计算时间该怎么办。 文章目录 介绍[蓝桥杯 2021 省 B] 时间显示题目描述代码 蓝桥杯–航班时间 介绍 将毫秒值直接转成日期的有 new Date(毫秒)Instant.ofEpochMilli(毫秒)Instant.ofEpochSecond(秒)Instant.ofEpochSecond&…

k8s知识整理(继续整理中)

文章目录 k8s组件master节点kube-apiserverkube-schedulerkube-controller-manageretcd work节点kube-proxykubeletcontainer runtime add onsCoreDNSingress controller Pod常用控制器k8s pod创建调度过程k8s pod删除过程k8s 灰度发布(金丝雀部署)k8s 蓝…

日撸 Java 三百行day37

文章目录 说明day37 十字链表1.思路整理1.1十字链表的数据结构:1.2 手动模拟十字链表 2.代码分析2.1 十字链表的构造2.2 单元测试 说明 闵老师的文章链接: 日撸 Java 三百行(总述)_minfanphd的博客-CSDN博客 自己也把手敲的代码放…

ambari的kafka服务开启sasl

添加 sasl 配置⽂件 集群部署 Kafka2.2下载地址 http://archive.apache.org/dist/kafka/2.2.1/kafka_2.11-2.2.1.tgz 解压安装包 tar -zxvf kafka_2.11-2.2.1.tgz 部署略 ambari 数据kafka服务 在kafka的conf目录下创建sasl_conf目录,将kafka_client_jaas.conf/kafka_se…

深入浅出MySQL——CRUD

文章目录 表的增删改查Create单行数据全列插入多行数据指定列插入插入否则更新替换——REPLACE RetrieveSELECT 列WHERE 条件结果排序筛选分页结果 UpdateDelete删除数据截断表 插入查询结果聚合函数group bywhere和having SQL查询中各个关键字的执行先后顺序函数日期函数字符串…

SQL优化(3):order by优化

MySQL的排序,有两种方式: Using filesort : 通过表的索引或全表扫描,读取满足条件的数据行,然后在排序缓冲区sort buffer中完成排序操作,所有不是通过索引直接返回排序结果的排序都叫 FileSort 排序。 Using index :…

IDEA 使用系列之 Alibaba Cloud Toolkit 一件部署

一、前文 做开发,免不了要往服务器部署前端后端,首先要用xftp把前后端所在文件夹打开,把jar、dist备份再上传,然后再打开xshell把前后端kill掉,然后再敲命令重新启动前后端,少则2、3分钟,多则10…

创新案例|探索 Snyk 的 PLG 团队1.6倍年度 ARR 增长背后的策略

组织架构不匹配、权责分配不清晰以及团队协作无机制是推进PLG业务面临的三大核心挑战,而安全软件公司Snyk以其指数级营收和估值增长的成功实践证明,构建合适且高效团队是助力PLG创新实现高速增长的关键,其经验值得借鉴。本文将通过分析Synk如…

Java+GeoTools实现WKT数据根据EPSG编码进行坐标系转换

场景 JavaGeoTools(开源的Java GIS工具包)快速入门-实现读取shp文件并显示: JavaGeoTools(开源的Java GIS工具包)快速入门-实现读取shp文件并显示_霸道流氓气质的博客-CSDN博客 在上面实现Java中集成Geotools之后,需求是将WKT数据转换成其他坐标系的W…

计算机网络-如何寻找目标主机

视频参考链接:计算机网络-如何寻找目标计算机?_哔哩哔哩_bilibili 在互联网中如果使计算机A与计算机B如何进行通信,又是如何找到目标的计算机主机呢? 首先最简单的通信就是两台计算机中间加一根网线,那么这两台计算机…

算法基础—哈希表散列表的构建和处理冲突

1 哈希表的构建 1. 直接寻址法 取关键字或者关键字的某个线性函数值作为哈希地址,即H(Key)Key或者H(Key)a*Keyb(a,b为整数),这种散列函数也叫做自身函数.如果H(Key)的哈希地址上已经有值了,那么就往下一个位置找,知道找到H(Key)的位置没有值了就把元素放进去. 2. 数字分析法…