【Kafka】MM2同步Kafka集群时如何自定义复制策略(ReplicationPolicy)

news2025/1/10 17:02:58

文章目录

  • 需求
  • 准备工作
  • 自定义复制策略
  • 编译代码

需求

使用MM2同步集群数据,topic名称不能变,默认的复制策略为:DefaultReplicationPolicy,这个策略会把同步至目标集群的topic都加上一个源集群别名的前缀,比如源集群别名为A,topic为:bi-log,该topic同步到目标集群后会变成:A.bi-log,为啥这么做呢,就是为了避免双向同步的场景出现死循环。

官方也给出了解释:

这是 MirrorMaker 2.0 中的默认行为,以避免在复杂的镜像拓扑中重写数据。 需要在复制流设计和主题管理方面小心自定义此项,以避免数据丢失。 可以通过对“replication.policy.class”使用自定义复制策略类来完成此操作,所以本文主要记录一下自定义复制策略的流程。

准备工作

下载源码

https://kafka.apache.org/downloads

kafka源码是使用Gradle编译的,需要安装Gradle,具体安装操作不赘述了,可以百度。

源码使用IDEA打开后,在connect模块下找到接口:org.apache.kafka.connect.mirror.ReplicationPolicy

自定义复制策略

ReplicationPolicy这个接口主要有几个方法:

  • formatRemoteTopic:重命名topic名称
  • topicSource:根据topic获取source集群别名
  • upstreamTopic:获取topic在source集群中的名称
  • originalTopic:获取topic原始的名称(针对多次同步过程中,被重命名过多次的topic)
  • isInternalTopic:判断是否为内部topic

根据我们的需求,自定义策略需要满足:

  • 不重命名source集群中topic的名称
  • 能返回source集群别名

实现很简单,就是保证topic原封不动即可,完整代码如下:

package org.apache.kafka.connect.mirror;

import org.apache.kafka.common.Configurable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.regex.Pattern;

/**
 * Defines remote topics like "us-west.topic1". The separator is customizable and defaults to a period.
 */
public class CustomReplicationPolicy implements ReplicationPolicy, Configurable {

    // In order to work with various metrics stores, we allow custom separators.
    public static final String SEPARATOR_CONFIG = MirrorClientConfig.REPLICATION_POLICY_SEPARATOR;
    public static final String SEPARATOR_DEFAULT = ".";
    private static final Logger log = LoggerFactory.getLogger(CustomReplicationPolicy.class);
    private String separator = SEPARATOR_DEFAULT;
    private Pattern separatorPattern = Pattern.compile(Pattern.quote(SEPARATOR_DEFAULT));

    @Override
    public void configure(Map<String, ?> props) {
        if (props.containsKey(SEPARATOR_CONFIG)) {
            separator = (String) props.get(SEPARATOR_CONFIG);
            log.info("Using custom remote topic separator: '{}'", separator);
            separatorPattern = Pattern.compile(Pattern.quote(separator));
        }
    }

    /**
     * 拼接Topic名(if you need)
     *
     * @param sourceClusterAlias 源集群标识
     * @param topic              源Topic名称
     * @return java.lang.String
     * @date 2023/03/03 4:28 下午
     */
    @Override
    public String formatRemoteTopic(String sourceClusterAlias, String topic) {
        return topic;
    }

    /**
     * 获取源集群标(source.cluster.alias)
     *
     * @param topic Topic nameMirrorSourceConnector
     * @return source alias
     */
    @Override
    public String topicSource(String topic) {
        // 和source.cluster.alias配置的一致,可通过读取配置,为了方便直接返回
        return "source";
    }

    /**
     * 截取上游真实Topic名称
     *
     * @param topic Topic name
     * @return java.lang.String
     * @date 2023/03/03 4:22 下午
     */
    @Override
    public String upstreamTopic(String topic) {
        return topic;
    }

    /**
     * 获取原始Topic名,没做过加工,直接返回即可
     *
     * @param topic 源Topic名
     * @return java.lang.String
     * @date 2023/03/03 6:42 下午
     */
    @Override
    public String originalTopic(String topic) {
        return topic;
    }
}

还需要修改一个地方:org.apache.kafka.connect.mirror.MirrorSourceConnector#isCycle

这个方法是判断是否出现循环复制,会递归调用,如果不修改会死循环:

原始代码:

修改为:

    // Recurse upstream to detect cycles, i.e. whether this topic is already on the target cluster
    boolean isCycle(String topic) {
        String source = replicationPolicy.topicSource(topic);
        if (source == null) {
            return false;
        } else {
            return source.equals(sourceAndTarget.target());
        }
    }

不改的话,后果如下:

编译代码

只需要编译connect模块即可,从Gradle视图中找到对应模块的build方法,修改参数,跳过单元测试(不跳过的话电脑得卡死):

build完成之后,在项目目录下找到对应jar文件,用这两个jar文件替换掉你执行脚本所使用kafka的libs目录下的jar即可:(原jar文件记得备份,以防万一)

libs目录示例:

完成上述操作之后,修改MM2配置文件中的:

replication.policy.class=org.apache.kafka.connect.mirror.CustomReplicationPolicy

还有一种方法是直接将class文件上传到classpath下,这种方式我没试。

再次执行脚本,可以看到同步后的topic已经保持原来的名称了,大功告成!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/403411.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

设计模式-第13章(状态模式)

状态模式状态模式状态模式的好处和用处工作状态状态模式 状态模式&#xff08;State&#xff09;&#xff0c;当一个对象的内在状态改变时允许改变其行为&#xff0c;这个对象看起来像是改变了其类。 状态模式主要解决的是当控制一个对象状态转换的条件表达式过于复杂时的情况…

【大数据】HDFS客户端命令行(hdfs dfs)详细使用说明

DFS命令使用概览使用说明lsdfducountappendToFilecatchecksumchgrpchmodchownconcatcopyFromLocalcopyToLocalcpcreateSnapshotdeleteSnapshotexpungefindgetgetfaclgetfattrgetmergeheadmkdirmoveFromLocalmoveToLocalmvputrenameSnapshotrmrmdirsetfaclsetfattrsetrepstattai…

实现VOC数据集与COCO数据集格式转换

实现VOC数据集与COCO数据集格式转换2、将voc数据集的xml转化为coco数据集的json格式2、COCO格式的json文件转化为VOC格式的xml文件3、将 txt 文件转换为 Pascal VOC 的 XML 格式<annotation><folder>文件夹目录</folder><filename>图片名.jpg</file…

2020蓝桥杯真题凯撒加密 C语言/C++

题目描述 给定一个单词&#xff0c;请使用凯撒密码将这个单词加密。 凯撒密码是一种替换加密的技术&#xff0c;单词中的所有字母都在字母表上向后偏移 3 位后被替换成密文。即 a 变为 d&#xff0c;b 变为 e&#xff0c;⋯&#xff0c;w 变为z&#xff0c;x 变为 a&#xff0…

【QT网络编程】实现UDP协议通信

文章目录概要&#xff1a;本期主要讲解QT中对UDP协议通信的实现。一、UDP协议通信二、Qt中UDP协议的处理1.QUdpSocket三、Qt实现UDP通信1.客户端2.服务器端结尾概要&#xff1a;本期主要讲解QT中对UDP协议通信的实现。 一、UDP协议通信 Internet 协议集支持一个无连接的传输协…

SprintBoot打包及profile文件配置

打成Jar包 需要添加打包组件将项目中的资源、配置、依赖包打到一个jar包中&#xff0c;可以使用maven的package&#xff1b;运行: java -jar xxx(jar包名) 操作步骤 第一步: 引入Spring Boot打包插件 <!--打包的插件--> <build><!--修改jar的名字--><fi…

认识vite_vue3 初始化项目到打包

从0到1创建vite_vue3的项目背景效果vite介绍&#xff08;对比和vuecli的区别&#xff09;使用npm创建vitevitevuie3创建安装antdesignvite自动按需引入&#xff08;vite亮点&#xff09;请求代理proxy打包背景 vue2在使用过程中对象的响应式不好用新增属性的使用$set才能实现效…

FPGA 20个例程篇:20.USB2.0/RS232/LAN控制并行DAC输出任意频率正弦波、梯形波、三角波、方波(二)

通过上面的介绍相信大家对数字变频已经有了一个较为整体性的认识&#xff0c;下面笔者来对照XILINX的DDS IP核对数字变频技术展开更进一步的说明&#xff0c;做到了理论和实践很好地结合&#xff0c;这样大家再带入Modelsim进行仿真测试就不仅掌握了数字变频的理论知识&#xf…

【Linux】网络原理

本篇博客让我们一起来了解一下网络的基本原理 1.网络发展背景 关于网络发展的历史背景这种东西就不多bb了&#xff0c;网上很容易就能找到参考资料&#xff0c;我的专业性欠缺&#xff0c;文章参考意义也不大。这里只做简单说明。 网络发展经过了如下几个模式 独立模式&…

创建线程的三种方法

文章目录1、创建一个类实现Runnable接口&#xff0c;并重写run方法。2、创建一个类继承Thread类&#xff0c;并重写run方法。3、实现Callable接口&#xff0c;重写call()方法&#xff0c;这种方式可以通过FutureTask获取任务执行的返回值。4、run()方法和start()方法有什么区别…

14 Day:同步锁与操作系统输入输出

前言&#xff1a;在上一期的线程章节中&#xff0c;我们的线程输出貌似有大问题&#xff0c;今天我们便要来学习同步锁来解决这个问题&#xff0c;同时再次基础上拿下键盘输入&#xff0c;实现操作系统的输入和输出。从今天开始我们的操作系统不在是一块“看板”了&#xff01;…

Python|数学|贪心|数组|动态规划|单选记录:实现保留3位有效数字(四舍六入五成双规则)|用Python来创造一个提示用户输入数字的乘法表|最小路径和

1、实现保留3位有效数字&#xff08;四舍六入五成双规则&#xff09;&#xff08;数学&#xff0c;算法&#xff09; 贡献者&#xff1a;weixin_45782673 输入&#xff1a;1234 输出&#xff1a;1234 12 12.0 4 4.00 0.2 0.200 0.32 0.320 1.3 1.30 1.235 1.24 1.245 1.24 1.…

Docker 入门建议收藏 第一部分

一、Docker 是什么&#xff1f; Docker&#xff0c;翻译过来就是码头工人 Docker是一个开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可抑制的容器中&#xff0c;然后发布到任何流行的Linux机器上&#xff0c;也可以实现虚拟化。容器完全使用沙盒…

JVM概览:内存空间与数据存储

核心的五个部分虚拟机栈&#xff1a;局部变量中基础类型数据、对象的引用存储的位置&#xff0c;线程独立的。堆&#xff1a;大量运行时对象都在这个区域存储&#xff0c;线程共享的。方法区&#xff1a;存储运行时代码、类变量、常量池、构造器等信息&#xff0c;线程共享。程…

ClassMix: Segmentation-Based Data Augmentation for Semi-Supervised Learning学习笔记

ClassMix相关介绍主要思想方法Mean-Teacher损失函数交叉熵损失标签污染实验实验反思参考资料相关介绍 从DAFormer溯源到这篇文章&#xff0c;ClassMix主要是集合了伪标签和一致性正则化&#xff0c;思想来源于CutMix那条研究路线&#xff0c;但是优化了CutMix中的标签污染的情…

使用 HTML5 轻松验证表单插件

下载:https://download.csdn.net/download/mo3408/87559594 效果图: 当您通过表单从人们那里收集信息时,必须应用某种验证。如果不这样做,可能会导致客户流失、数据库中的垃圾数据甚至网站的安全漏洞。从历史上看,构建表单验证一直很痛苦。在服务器端,全栈框架会为您处理…

【AI绘图学习笔记】深度前馈网络(一)

有关深度前馈网络的部分知识&#xff0c;我们已经在吴恩达的机器学习课程中有过了解了&#xff0c;本章主要是对《深度学习》花书中第六章&#xff1a;深度前馈网络的总结笔记。我希望你在看到这一章的时候&#xff0c;能回忆起机器学习课程中的一些环节或者细节&#xff0c;这…

【现代机器人学】学习笔记十一:抓握与操作

本章是比较独特的一章&#xff0c;相对于前面的内容&#xff0c;内容较为独立&#xff0c;主要描述的是力学相关的一些理论。因此&#xff0c;读者也完全不必根据题目产生一些不必要的幻想&#xff0c;认为似乎看完这章我就可以学会机器人抓取。不过&#xff0c;我仍然认为这章…

新入职的项目经理,如何击破权力微薄的困境?

“从此找到了上班的意义”这个话题最近登上了热搜&#xff0c;在“铜三铁四”的招聘季&#xff0c;大家停止了内卷&#xff0c;给自己安排得明明白白&#xff0c;每天上班的动力就是&#xff1a;充电、蹭网、干饭、灌水、睡午觉、上厕所。但咱项目经理们却没办法Get这些动力&am…

【UEFI基础】HOB介绍

综述 HOB的全称是Hand-Off Block&#xff0c;从名字上也可以看出来&#xff0c;它表示的是一种用于交接的数据。按照HOB的使用情况&#xff0c;可以将BIOS的启动阶段分为两个部分&#xff1a; HOB生成阶段&#xff08;HOB producer phase&#xff09;&#xff0c;用来创建和修…