Kafka集群数据迁移方案

news2025/1/13 15:54:56

概述

MirrorMaker2(后文简称 MM2)在 2019 年 12 月随 Kafka 2.4.0 一起推出。顾名思义,是为了解决 Kafka 集群之间数据复制和数据同步的问题而诞生的 Kafka 官方的数据复制工具。在实际生产中,经常被用来实现 Kafka 数据的备份,迁移和灾备等目的。

使用场景

Kafka MM2适用于下列场景:

  • 远程数据同步:通过MM2,Kafka数据可以在不同地域的集群进行传输复制。
  • 灾备场景:通过MM2,可以构建不同数据中心的主备两个集群容灾架构,MM2实时同步两个集群的数据。当其中一个集群不可用时,可以将上面的应用程序切换到另一个集群,从而实现异地容灾功能。
  • 数据迁移场景:在业务上云、混合云、集群升级等场景,存在数据从旧集群迁移到新集群的需求。此时,您可以使用MM2实现新旧数据的迁移,保证业务的连续性。
  • 聚合数据中心场景:通过MM2,可以将多个Kafka子集群的数据同步到一个中心Kafka集群,实现数据的汇聚。

功能

Kafka MM2作为数据复制工具,具有以下功能:

  • 复制topics数据以及配置信息。
  • 复制consumer groups及其消费topic的offset信息。
  • 复制ACLs。
  • 自动检测新的topic以及partition。
  • 提供MM2的metrics。
  • 高可用以及可水平扩展的框架。

任务执行方式

MM2任务有以下执行方式:

  • Distributed Connect集群的connector方式(推荐):在已有Connect集群执行MM2
    connector任务的方式。您可以参照本文使用Connect集群服务的功能来管理MM2任务。
  • Dedicated MirrorMaker集群方式:不需要使用Connect集群执行MM2
    connector任务,而是直接通过Driver程序管理MM2的所有任务。
  • Standalone Connect的worker方式:执行单个MirrorSourceConnector任务,适合在测试场景下使用。

说明

推荐在Distributed Connect集群上启动MM2 connector任务,可以借助Connect集群的Rest服务管理MM2任务。

使用限制

  1. 为保证生产集群的数据完整和安全,必须先在测试集群进行测试
  2. 源集群与目标集群的Kafka软件版本为2.12_2.4.1及以上。
  3. MM2 迁移任务会增加CPU和内存的占用,尽量停止客户端生产与消费,或根据数据量大小,选择在窗口时间进行迁移。

迁移方案

集群情况

集群配置

主题配置:确保目标集群中的主题配置(如分区数、副本数、保留策略等)与源集群一致,或根据业务需求进行调整。
Broker 配置:检查每个 broker 的配置参数,确保两者之间的兼容性。

数据分布与负载

分区分布:了解源集群中各主题的分区分布情况,以便在目标集群中合理安排分区。
负载评估:分析源集群的负载,确保目标集群有足够的能力来处理迁移后的数据流。

安全性和权限

认证与授权:检查源集群和目标集群的安全设置,确保数据迁移过程中涉及的用户和角色具有适当的权限。

兼容性和版本

Kafka 版本:确保两个集群的 Kafka 版本兼容,特别是在使用特性时,避免因版本差异引发的问题。
消息格式:验证消息格式和序列化机制在两个集群中的一致性。

创建测试topic

# 根据目标集群与业务需求进行调整目标topic副本与分区数,测试不做要求

kafka-topics.sh --bootstrap-server localhost:9092 --create --topic topictest --replication-factor 3

在这里插入图片描述

2.3 生产消息

bash-5.0$ kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topictest
>1
>2
>3
>4
>5
>6
>7
>8
>

在这里插入图片描述

2.4 记录偏移量

记录 Kafka 主题的偏移量信息

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic s.topictest --time  -1

在这里插入图片描述

MM2配置文件

# 指定两个集群,以及对应的host
clusters = s,d
s.bootstrap.servers = xxxx:9092
d.bootstrap.servers = yyyy:9092
# 指定同步备份的topic & consumer group,支持正则
s->d.topics = topictest
s->d.groups = .*
# 指定复制链条,可以是双向的
s->d.enabled = true
# us-east->us-west.enabled = true  # 双向,符合条件的两个集群的topic会相互备份

全量配置

如有其他需要,可添加其他使用参数

# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details
# Sample MirrorMaker 2.0 top-level configuration file
# Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
# specify any number of cluster aliases
clusters = A,B
#replication.policy.separator=""
#source.cluster.alias=""
#target.cluster.alias=""
# connection information for each cluster
# This is a comma separated host:port pairs for each cluster
# for e.g. "A_host1:9092, A_host2:9092, A_host3:9092"
A.bootstrap.servers = xxxx:9092
B.bootstrap.servers = yyyy:9092
A.security.protocol=SASL_PLAINTEXT
A.sasl.mechanism=SCRAM-SHA-512
A.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
 username="xxx" password="xxx";
B.security.protocol=SASL_PLAINTEXT
B.sasl.mechanism=SCRAM-SHA-512
B.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
 username="xxx" password="xxx";
# enable and configure individual replication flows
# 设置同步的流向
A->B.enabled = true
#A.producer.enable.idempotence = true
#B.producer.enable.idempotence = true
# regex which defines which topics gets replicated. For eg "foo-.*"
#A->B.topics = hadoopLogCollection,t_biz_act_mmetric
#设置同步的topic;支持正则
A->B.topics = xxxx,xxxx
#设置排除的topic:支持正则
A->B.topics.exclude= xxxx

#B->A.enabled = true
#B->A.topics = .*

# Setting replication factor of newly created remote topics
replication.factor=3

############################# Internal Topic Settings  #############################
# The replication factor for mm2 internal topics "heartbeats", "B.checkpoints.internal" and
# "mm2-offset-syncs.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
sync.topic.configs.enabled=true
#同步配置的时间频率
sync.topic.configs.enabled.interval.seconds=60
checkpoints.topic.replication.factor=2
heartbeats.topic.replication.factor=2
offset-syncs.topic.replication.factor=2
#offset-syncs.topic.location = target

#启动同步的Task数量----启用几个线程进行同步
tasks.max = 5

# The replication factor for connect internal topics "mm2-configs.B.internal", "mm2-offsets.B.internal" and
# "mm2-status.B.internal"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offset.storage.replication.factor=2
status.storage.replication.factor=2
config.storage.replication.factor=2

# customize as needed
# replication.policy.separator = _
sync.topic.acls.enabled = true
emit.heartbeats.interval.seconds = 5

#开启topic动态和消费者组 动态同步与同步的周期
refresh.topics.enabled = true
refresh.topics.interval.seconds = 60
refresh.groups.enabled = true
refresh.groups.interval.seconds = 60

# 开始消费者组offset同步;设置同步的周期---注意:仅仅同步idle中的消费者的offset
sync.group.offsets.enabled = true
sync.group.offsets.interval.seconds = 5

#设置同步的topic Name命名规则;3.0版本提供了两种topic同步命名规则,默认会带上前缀,也可以手动不带前缀的----此时不能做双向同步
replication.policy.class = org.apache.kafka.connect.mirror.IdentityReplicationPolicy

执行迁移进程

bash-5.0$ ./bin/connect-mirror-maker.sh mm2.properties

在这里插入图片描述

验证数据同步

topic迁移后会变为s.topic
在这里插入图片描述

./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic s.topictest --time  -1

在这里插入图片描述
与源集群偏移量相同,消费消息正常
在这里插入图片描述
确认消息偏移量,保证数据一致性。
源集群停止mm2迁移进程,并将业务连接到新集群中即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2226969.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

鼠标增强工具 MousePlus v5.3.9.0 中文绿色版

MousePlus 是一款功能强大的鼠标增强工具,它可以帮助用户提高鼠标操作效率和精准度。该软件可以自定义鼠标的各种功能和行为,让用户根据自己的习惯和需求来调整鼠标的表现。 详细功能 自定义鼠标按钮功能:可以为鼠标的各个按钮设置不同的功能…

【大模型系列】Mini-InternVL(2024.10)

Paper:https://arxiv.org/pdf/2410.16261Github:https://github.com/OpenGVLab/InternVL/tree/main/internvl_chat/shell/mini_internvlAuthor:Zhangwei Gao et al. 上海人工智能实验室 文章目录 0 总结(省流版)1 模型结构1.1 InternViT-300M…

探讨Facebook的AI研究:未来社交平台的技术前瞻

在数字时代,社交媒体已成为人们日常生活的重要组成部分。作为全球最大的社交网络之一,Facebook不断致力于人工智能(AI)的研究与应用,以提升用户体验、增强平台功能并推动技术创新。本文将探讨Facebook在AI领域的研究方…

一键导入Excel到阿里云PolarDB-MySQL版

今天,我将分享如何一键导入Excel到阿里云PolarDB-MySQL版数据库。 准备数据 这里,我们准备了一张excel表格如下: 连接到阿里云PolarDB 打开的卢导表,点击新建连接-选择阿里云PolarDB-MySQL版。如果你还没有这个工具,…

[NSSCTF 2nd]php签到 详细题解

知识点: linux文件后缀名绕过 表单文件上传 pathinfo 函数 file_put_contents()函数 命令执行 代码审计: <?phpfunction waf($filename){$black_list array("ph", "htaccess", "ini");$ext pathinfo($filename, PATHINFO_EXTENSION…

[0260].第25节:锁的不同角度分类

MySQL学习大纲 我的数据库学习大纲 从不同维度对锁的分类&#xff1a; 1.对数据操作的类型划分:读锁和写锁 1.1.读锁 与 写锁概述&#xff1a; 1.对于数据库中并发事务的读-读情况并不会引起什么问题。对于写-写、读-写或写-读这些情况可能会引起一些问题&#xff0c;需要使用…

云原生后端开发教程

云原生后端开发教程 引言 随着云计算的普及&#xff0c;云原生架构逐渐成为现代软件开发的主流。云原生不仅仅是将应用部署到云上&#xff0c;而是一种构建和运行应用的方式&#xff0c;充分利用云计算的弹性和灵活性。本文将深入探讨云原生后端开发的核心概念、工具和实践&a…

Docker 常用命令全解析:提升对雷池社区版的使用经验

Docker 常用命令解析 Docker 是一个开源的容器化平台&#xff0c;允许开发者将应用及其依赖打包到一个可移植的容器中。以下是一些常用的 Docker 命令及其解析&#xff0c;帮助您更好地使用 Docker。 1. Docker 基础命令 查看 Docker 版本 docker --version查看 Docker 运行…

常见的java开发面试题

目录 1.SpringBoot 打成的jar包和普通的jar包有什么区别&#xff1f; 如何让SpringBoot打的jar包可依赖&#xff1f; 2. http 和 https 的区别&#xff1f; 一、安全性 二、连接方式 三、性能影响 四、应用场景 五、总结&#xff1a; 3. GC是什么&#xff0c;为什么要使用…

信息安全入门——网络安全控制

目录 前言信息安全入门&#xff1a;网络安全控制基础1. 用户识别技术&#xff1a;确认你是谁2. 访问控制技术&#xff1a;定义你能做什么3. 访问控制列表&#xff08;ACL&#xff09;&#xff1a;精细的权限管理4. 漏洞控制&#xff1a;防范未然5. 入侵检测系统&#xff08;IDS…

北理工计算机考研难度分析

C哥专业提供——计软考研院校选择分析专业课备考指南规划 总体情况概述 北京理工大学计算机学院2024届考研呈现出学硕扩招、专硕稳定的特点。学硕实际录取27人(含非全统考)&#xff0c;复试线360分&#xff0c;复试录取率76%&#xff1b;计算机技术专硕(不含珠海)实际录取29人…

模拟算法 (算法详解+例题)

目录 一、什么是模拟二、模拟算法的特点和技巧三、模拟OJ题3.1、替换所有的问号3.2、提莫攻击3.3、N字形变换3.4、外观数列3.5、数青蛙 一、什么是模拟 模拟是对真实事物或者过程的虚拟。在编程时为了实现某个功能&#xff0c;可以用语言来模拟那个功能&#xff0c;模拟成功也…

php后端学习,Java转php

遇到前后端跨域 php解决跨域问题可以加上下面的代码&#xff1a; header(“Access-Control-Allow-Origin:*”); 并且查看自己的数据库信息是否连接成功。 从Java转php 个人感受php跟偏向前端&#xff0c; 写后端逻辑时没有像java又springboot工具方便。 但是和前端联调很方便…

一个简单的例子,说明Matrix类的妙用

在Android、前端或者别的平台的软件开发中&#xff0c;有时会遇到类似如下需求&#xff1a; 将某个图片显示到指定的区域&#xff1b;要求不改变图片本身的宽高比&#xff0c;进行缩放&#xff1b;要求最大限度的居中填充到显示区域。 以下示意图可以简单描绘该需求 以Androi…

Zookeeper分布式锁实现

文章目录 1、zk分布式锁的实现原理1_获取锁过程2_释放锁 2、代码实现1_创建客户端对象2_使用和测试案例 1、zk分布式锁的实现原理 Z o o k e e p e r Zookeeper Zookeeper 就是使用临时顺序节点特性实现分布式锁的&#xff0c;官网。 获取锁过程 &#xff08;创建临时节点&…

基于websocket简易封装一个全局消息通知组件

期望是&#xff0c;在用户登录后台后。新的任务到来时能够及时通知到并且去处理。 效果图 前端 <template><div></div> </template><script> import { updateRead } from "/api/system/approval-notification"; export default {dat…

Android Gradle

#1024程序员节&#xff5c;征文# Gradle 是一款强大的自动化构建工具&#xff0c;广泛应用于 Android 应用开发。它通过灵活的配置和丰富的插件系统&#xff0c;为项目构建提供了极大的便利。本文只是简单的介绍 Gradle 在 Android 开发中的使用&#xff0c;包括其核心概念、构…

【mysql进阶】5-事务和锁

mysql 事务基础 1 什么是事务 事务是把⼀组SQL语句打包成为⼀个整体&#xff0c;在这组SQL的执⾏过程中&#xff0c;要么全部成功&#xff0c;要么全部失败&#xff0c;这组SQL语句可以是⼀条也可以是多条。再来看⼀下转账的例⼦&#xff0c;如图&#xff1a; 在这个例⼦中&a…

中酱集团:黑松露酱油,天然配方定义健康生活

在如今的大健康时代&#xff0c;人们对于美食的要求越来越高。不仅美味&#xff0c;更要健康。在健康美食的生态链中&#xff0c;有一个名字正逐渐成为品质与美味的代名词——中酱集团。而当中酱集团与黑松露酱油相遇&#xff0c;一场味觉的革命就此拉开帷幕。 中酱集团&#x…

生成式UI 动态化SDK的研发--开篇

这里写目录标题 1. 背景2.名词解释2.1 DSL2.2 AI大模型2.3 生成式UI 3. 前言4.未来展望 1. 背景 随着AI大模型技术的兴起&#xff0c;UI界面的开发和交互必定会发生巨大的改变。在大模型技术出现之前&#xff0c;软件的界面是通过UI设计师和交互设计师先定义好软件的UI界面&am…