kafka跨地区跨集群同步工具MirrorMaker2 —— 筑梦之路

news2025/1/10 3:45:38

MM2简介 

KIP-382: MirrorMaker 2.0 - Apache Kafka - Apache Software Foundation

有四种运行MM2的方法:

As a dedicated MirrorMaker cluster.(作为专用的MirrorMaker群集)
As a Connector in a distributed Connect cluster.(作为分布式Connect群集中的连接器)
As a standalone Connect worker.(作为独立的Connect工作者)
In legacy mode using existing MirrorMaker scripts.(在旧模式下,使用现有的MirrorMaker脚本。)

MM2集群模式

配置文件示例1:

name = event-center-connector
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 2

# 定义集群别名
clusters = event-center, event-center-new

# 设置event-center集群的kafka地址列表
event-center.bootstrap.servers = source:9193
event-center.security.protocol=SASL_PLAINTEXT
event-center.sasl.mechanism=PLAIN
event-center.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 设置event-center-new集群的kafka地址列表
event-center-new.bootstrap.servers = target:29092
event-center-new.security.protocol=SASL_PLAINTEXT
event-center-new.sasl.mechanism=PLAIN
event-center-new.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# 开启event-center集群向event-center-new集群同步
event-center->event-center-new.enabled = true
# 允许同步topic的正则
event-center->event-center-new.topics = projects.*
event-center->event-center-new.groups = .*

# MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

# 自定义参数
# 是否同步源topic配置
sync.topic.configs.enabled=true
# 是否同步源event-centerCL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=60
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=60
# DefaultReplicationPolicy / CustomReplicationPolicy
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
# 远端创建新topic的replication数量设置
replication.factor=3

注意:

replication.policy.class 默认为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。

官方也给出了解释:

这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“replication.policy.class”使用自定义复制策略类来完成此操作.

参考资料:

【如何保持topic名称一致】

【Kafka】MM2同步Kafka集群时如何自定义复制策略(ReplicationPolicy)_kafka mm2同步的目标topic名不一样-CSDN博客

配置文件示例2:

#定义集群别名
clusters = A, B
A.bootstrap.servers = A_host1:9092, A_host2:9092, A_host3:9092 # 设置A集群的kafka地址列表
B.bootstrap.servers = B_host1:9092, B_host2:9092, B_host3:9092 # 设置B集群的kafka地址列表
A->B.enabled = true # 开启A集群向B集群同步
A->B.topics = .* # 允许同步topic的正则

B->A.enabled = true # 开启B集群向A集群同步
B->A.topics = .* # 允许同步topic的正则

#MM2内部同步机制使用的topic,replication数量设置
checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

#自定义参数
sync.topic.configs.enabled=true #是否同步源topic配置信息
sync.topic.acls.enabled=true #是否同步源ACL信息
emit.heartbeats.enabled=true #连接器是否发送心跳
emit.heartbeats.interval.seconds=5 #心跳间隔
emit.checkpoints.enabled=true #是否发送检查点
refresh.topics.enabled=true #是否刷新topic列表
refresh.topics.interval.seconds=5 #刷新间隔
refresh.groups.enabled=true #是否刷新消费者组id
refresh.groups.interval.seconds=5 #刷新间隔
readahead.queue.capacity=500 #连接器消费者预读队列大小
replication.policy.class=org.apache.kafka.connect.mirror.DefaultReplicationPolicy #使用LegacyReplicationPolicy模仿MM1
heartbeats.topic.retention.ms=1 day #首次创建心跳主题时,设置心跳数据保留时长
checkpoints.topic.retention.ms=1 day #首次创建检查点主题时,设置检查点数据保留时长
offset.syncs.topic.retention.ms=max long #首次创建偏移量主题时,设置偏移量数据保留时长
replication.factor=2 #远端创建新topic的replication数量设置

Standalone模式运行

需要两个配置文件,一个是作为worker的kafka集群信息(worker.properties),另一个是同步数据的配置(connector.properties)

worker.properties配置文件

bootstrap.servers=worker:29092
security.protocol=PLAINTEXT
sasl.mechanism=PLAIN

key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
value.converter = org.apache.kafka.connect.converters.ByteArrayConverter

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

connector.properties配置文件

name = MirrorSourceConnector
topics = projects.*
groups = *
connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasks.max = 1

# source
# 这个配置会使同步之后的Topic都加上一个前缀,慎重
source.cluster.alias = old
source.cluster.bootstrap.servers = source:9193
source.cluster.security.protocol=SASL_PLAINTEXT
source.cluster.sasl.mechanism=PLAIN
source.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin-pwd";
# target
target.cluster.alias = new
target.cluster.bootstrap.servers = target:29092
target.cluster.security.protocol=SASL_PLAINTEXT
target.cluster.sasl.mechanism=PLAIN
target.cluster.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="Admin" password="hMOPbmZE";

# 是否同步源topic配置信息
sync.topic.configs.enabled=true
# 是否同步源ACL信息
sync.topic.acls.enabled=true
sync.group.offsets.enabled=true
# 连接器是否发送心跳
emit.heartbeats.enabled=true
# 心跳间隔
emit.heartbeats.interval.seconds=5
# 是否发送检查点
emit.checkpoints.enabled=true
# 是否刷新topic列表
refresh.topics.enabled=true
# 刷新间隔
refresh.topics.interval.seconds=30
# 是否刷新消费者组id
refresh.groups.enabled=true
# 刷新间隔
refresh.groups.interval.seconds=30
# 连接器消费者预读队列大小
# readahead.queue.capacity=500
# 使用自定义策略
replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy
replication.factor = 3

 启动命令

./connect-standalone.sh worker.properties connector.properties

参考资料:

kafka 异步双活方案 mirror maker2 深度解析 - 知乎

Apache Kafka MirrorMaker 2.0 指南 - Azure HDInsight | Microsoft Learn

简述 Kafka Mirror Maker v2 – K's Blog

Getting up to Speed with MirrorMaker 2 - Confluent

9.4. 使用 MirrorMaker 2.0 在 Kafka 集群间同步数据 Red Hat AMQ 2021.q3 | Red Hat Customer Portal

https://blog.51cto.com/u_14049791/5703235

KafkaMirrorMaker2.0架构与实战全解《三》 - 墨天轮

2.4. Kafka MirrorMaker 2 集群配置 Red Hat AMQ Streams 2.4 | Red Hat Customer Portal

kafka:MirrorMaker-V1(MM1)到MirrorMaker-V2(MM2)-腾讯云开发者社区-腾讯云

【Kafka】记录一次基于connect-mirror-maker做的Kafka集群迁移完整过程_kafka数据同步到另外一个kafka-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1693051.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基本IO接口

引入 基本输入接口 示例1 示例2:有数据保持能力的外设 #RD端由in指令控制:将数据由端口传输到CPU内存中 #CS244信号由译码电路实现 示例3: a)图中由于输出端口6有连接到端口1,当开关与端点1闭合时期间,仍能维持3端口…

开放式耳机2024超值推荐!教你如何选择蓝牙耳机!

开放式耳机的便利性让它在我们的日常生活中变得越来越重要。它让我们摆脱了传统耳机的限制,享受到了更多的自由。不过,市面上的开放式耳机种类繁多,挑选一款既实用又实惠的产品确实需要一些小窍门。作为一位对开放式耳机颇有研究的用户&#…

柯桥职场人出差必备的商务口语-职场差旅口语提问篇

May I reconfirm my flight? 我可以确认我的班机15857575376吗? Where can I make a reservation? 我到哪里可以预订? Do I have to make a reconfirmation? 我还要再确认吗? Is there any discount for the USA Railpass? 火车通行…

【设计模式】JAVA Design Patterns——Bytecode(字节码模式)

🔍目的 允许编码行为作为虚拟机的指令 🔍解释 真实世界例子 一个团队正在开发一款新的巫师对战游戏。巫师的行为需要经过精心的调整和上百次的游玩测试。每次当游戏设计师想改变巫师行为时都让程序员去修改代码这是不妥的,所以巫师行为以数据…

安全态势管理的六大挑战:态势感知

德迅云安全鉴于如今的安全威胁不断变幻,企业对实施态势管理策略至关重要,可以让安全团队根据需要进行安全策略的动态调整。如果企业在研究构建态势感知管理,需要特别关注以下六个方面的挑战。 如果企业正在使用一个或多个平台,那么…

《计算机网络微课堂》课程概述

​ 课程介绍 本专栏主要是 B 站课程《计算机网络微课堂》的文字版,作者是湖南科技大学的老师。 B 站地址:https://www.bilibili.com/video/BV1c4411d7jb 该课程好评如潮,包含理论课,实验课,考研真题分析课&#xf…

GDB 调试器

GDB 功能 在程序启动之前指定一些可以影响程序行为的变量或条件。在某个指定的地方或条件下暂停程序,在程序停止时检查已经发生了什么。 调试信息与调试原理 一般要调试某个程序,为了能清晰地看到调试的每一行代码、调用的堆栈信息、变 量名和函数名等…

qt5core.dll怎么下载,qt5core.dll下载安装详细教程

不知道大家有没有遇到过qt5core.dll丢失这个问题?目前这个问题还是比较常见的,一般使用电脑比较多的的人,有很大几率遇到这种qt5core.dll丢失的问题。今天主要针对这个问题,来给大家讲解一下一键修复qt5core.dll的方法。 Qt5Core.…

图论(从数据结构的三要素出发)

文章目录 逻辑结构物理结构邻接矩阵定义性能分析性质存在的问题 邻接表定义性能分析存在的问题 十字链表(有向图)定义性能分析 邻接多重表(无向图)定义性能分析 数据的操作图的基本操作图的遍历广度优先遍历(BFS)算法思想和实现性能分析深度优先最小生成…

打开服务器远程桌面连接不上,可能的原因及相应的解决策略

在解决远程桌面连接不上服务器的问题时,我们首先需要从专业的角度对可能的原因进行深入分析,并据此提出针对性的解决方案。以下是一些可能的原因及相应的解决策略: 一、网络连接问题 远程桌面连接需要稳定的网络支持,如果网络连接…

【一步一步了解Java系列】:何为数组,何为引用类型

看到这句话的时候证明:此刻你我都在努力加油陌生人个人主页:Gu Gu Study专栏:一步一步了解Java 喜欢的一句话: 常常会回顾努力的自己,所以要为自己的努力留下足迹 喜欢的话可以点个赞谢谢了。 数组 数组是一推相同数据…

JavaSE--基础语法(第一期)

Java是一种优秀的程序设计语言,它具有令人赏心悦目的语法和易于理解的语义。不仅如此,Java还是一个有一系列计算机软件和规范形成的技术体系,这个技术体系提供了完整的用于软件开发和 跨平台部署的支持环境,并广泛应用于嵌入式系统…

特殊变量笔记

执行demo4.sh文件,输入输出参数itcast itheima的2个输入参数, 观察效果 特殊变量:$# 语法 $#含义 获取所有输入参数的个数 案例需求 在demo4.sh中输出输入参数个数 演示 编辑demo4.sh, 输出输入参数个数 执行demo4.sh传入参数itcast, itheima, 播仔 看效果…

上位机图像处理和嵌入式模块部署(mcu之串口控制gpio)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing 163.com】 前面我们陆续学习了gpio输入、输出,串口输入、输出。其实有了这两个接口,就可以做产品了。因为我们可以通过发送串口命令&a…

门禁-jenkins的构建状态同步到gitlab提交流水线

API接口文档 https://docs.gitlab.cn/jh/api/commits.html 配置pipline流水线 生成http请求代码: 使用HttpRequest插件生成 - sharelibs内容 //这是share libs里的 package devopsdef httpReq(reqType, reqUrl, reqBody, accessToken){def gitServer "…

人才测评的应用:人才选拔,岗位晋升,面试招聘测评

人才测评自诞生以来,就被广泛应用在各大方面,不仅是我们熟悉的招聘上,还有其他考核和晋升,都会需要用到人才测评。不知道怎么招聘?或者不懂得如何实现人才晋升?都可以参考人才测评,利用它帮我们…

Amesim基础篇-元件详解-H型膨胀阀四象限解析

一 膨胀阀简介 膨胀阀的主要功能是节流和调节过热度,库内膨胀阀包含节流管、H型膨胀阀、T型膨胀阀三种: 节流管:一根内径较小的管路,当制冷剂通过他时发生等等焓降压降温,具有成本低,内径不可变的特点,因此普遍在家用空调中使用,在汽车空调上使用较少。当我们建模过程…

深入理解CPU缓存一致性

存储体系结构 速度快的存储硬件成本高、容量小,速度慢的成本低、容量大。为了权衡成本和速度,计算机存储分了很多层次,有寄存器、L1 cache、L2 cache、L3 cache、主存(内存)和硬盘等。 根据程序的空间局部性和时间局…

Java开发大厂面试第22讲:Redis 是如何保证系统高可用的?它的实现方式有哪些?

高可用是通过设计,减少系统不能提供服务的时间,是分布式系统的基础也是保障系统可靠性的重要手段。而 Redis 作为一款普及率最高的内存型中间件,它的高可用技术也非常的成熟。 我们今天分享的面试题是,Redis 是如何保证系统高可用…

Mac上安装OpenLDAP服务器详细教程(Homebrew安装和自带的ldap)

目录 前言 一、安装 Homebrew(如果尚未安装) 二、使用 Homebrew 安装 OpenLDAP 三、配置 OpenLDAP 步骤一:更新PATH和环境变量 步骤二:配置slapd.conf 步骤三:初始化和启动 OpenLDAP 服务 1.创建数据库目录 2…