基于MirrorMaker与火山引擎的Kafka数据同步

news2025/1/23 14:56:47

Kafka MirrorMaker是Kafka官网提供的跨数据中心流数据同步方案,其实现原理是通过从Source集群消费消息,然后将消息生产到Target集群从而完成数据迁移操作。用户只需要通过简单的consumer配置和producer配置,启动MirrorMaker,即可实现实时数据同步。

本文章主要聚焦跑通Kafka MirrorMaker数据迁移流程。实验中的Source Kafka版本为2.12,基于本地机器搭建。Sink集群为火山引擎Kafka中间件

步骤1:本地Kafka创建测试Topic

以下我们将以名称为“testTopic”的Topic为例演示。

创建Topic命令:

kafka-topics.sh \
--create \
--zookeeper localhost:2181 \ #根据实际情况填写
--replication-factor 1 \
--partitions 1 \
--topic testTopic

创建成功后可以通过以下命令对topic进行检查

bin/kafka-topics.sh \
--list \
--zookeeper localhost:2181 #根据实际情况填写

执行如下:

步骤2:同步创建火山kafka Topic

创建Kafka实例后,在“Topic管理”页签下,创建同名Topic。注意分区数最好保持与原集群分区保持一致。

步骤3:下载SASL_SSL证书

在下载SASL_SSL证书前,先确认用于访问的用户是否已经存在:

如果未建立,请先创建用户。

确认完成后,在“实例管理”页签下下载SASL_SSL证书

关于使用SASL_SSL可参考:文档中心-火山引擎

步骤4:修改Mirror Maker 生产者/消费者配置

consumer生产者的配置(consumer.properties)一般在kafka目录下的config目录下。修改如下:

bootstrap.servers=localhost:9092 # 需要根据实际情况修改
group.id=test-consumer-group # 需要根据实际情况修改

同样,producer消费者的配置(producer.properties)也在此config目录下,该文件有较大修改:

bootstrap.servers= SASL接入点(公网) # 需要根据实际情况修改

接入点获取途径如下:

外网访问,需要添加SASL认证信息:

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="替换用户名" password="替换密码";
sasl.mechanism=PLAIN security.protocol=SASL_SSL
ssl.truststore.location=/xxx/Kafka.client.truststore.jks #根据实际情况替换证书路径
ssl.truststore.password=KafkaTrustStorePass ssl.endpoint.identification.algorithm=

步骤5:启动MirrorMaker

文件配置修改完成后,通过下方命令启动MirrorMaker实现本地Kafka与火山Kafka联通

kafka-mirror-maker.sh \ 
--consumer.config ../config/consumer.properties \ #根据实际情况指定consumer.properteis 
--producer.config ../config/producer.properties \ #根据实际情况指定 
producer.properties --whitelist "testTopic"

步骤6:启动Producer生产数据

我们启动生产者对测试Topic进行消息生产

kafka-console-producer.sh \
--broker-list localhost:9092 \ #根据实际情况填写
--topic testTopic

步骤7:数据同步结果检查

在火山引擎Kafka实例“消息查询”页签,我们可以查询testTopic最近的数据,发现是有数据写入的。此时数量上和我们写入的数量一致。

由于火山对下载的消息进行了 Base64 编码传输,因此很难确认消息是否正确性、完整性。

可以通过客户端消费如下(客户端下载与使用可参考:文档中心-火山引擎):

经过检查,消息与发送端数据保持一致。

可能出现的问题:

(1)MirrorMakker启动报错:java.lang.OutOfMemoryError: Java heap space

解决方法:修改 /bin/kafka-run-class.sh,找到 Memory options处,默认设置是256M,将其修改为如下值:

if [ -z "$KAFKA_HEAP_OPTS" ]; then KAFKA_HEAP_OPTS="-Xmx1024M -Xms512M" fi

保存退出。

(2)Error while fetching metadata with correlation

修改 config\server.properties,修改内容如下:

listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092

重启kafka,启动程序进行入库操作

MirrorMaker的配置说明:

--consumer.config # 消费者配置,详情参考kafka consumer配置

--producer.config # 生产者配置,详情参考kafka producer配置

--whitelist #需要mirror的topic,支持Java正则表达式,例如'ABTestMsg,AppColdStartMsg’

--blacklist #不需要拷贝的topic,支持Java正则表达式

--num.producers #producer数量,默认为1

--num.streams #consumer数量,默认为1

--queue.size #consumer和producer之间缓存的queue size,默认10000

详情见:Kafka mirroring (MirrorMaker) - Apache Kafka - Apache Software Foundation

其他注意事项:

1)whitelist和blacklist支持正则表达式。比如需要包含两个topic可以这样写,--whitelist 'A|B' or --whitelist 'A,B' ,或者想迁移所有topic可以这样写 --whitelist '*'

2)注意在迁移之前创建好相关topic以及规划好partition数量。

3)老版本和新版本迁移主要考虑consumer和producer的兼容性

4)如果允许的话,建议将MirrorMaker部署在目标集群内,这是因为如果一旦发生网络分区,消费者与源集群断开连接比生产者与目标集群断开连接要安全。如果消费者断开连接,那么只是当前读取不到数据,但是数据仍然在源集群内,并不会丢失;而生产者断开连接,MirrorMaker便生产不了数据,如果MirrorMaker本身处理不当,可能会丢失数据。

5)开始之前配置好限流,防止影响原来集群的正常工作。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/185785.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NCP1654BD65R2G高性能软开关功率因数校正电路芯片

NCP1654BD65R2G是一款用于连续导通模式 (CCM) 功率因数校正 (PFC) 步升预转换器的控制器。它在固定频率模式下控制电源开关导通时间 (PWM),取决于瞬时线圈电流。采用SOIC-8封装,最大程度减少了外部部件数量,并极大简化了 PFC 实施。它还集成了…

灰色预测模型一文详解+Python实例代码

目录 前言 一、模型理论 特点 二、模型场景 1.预测种类 2.适用条件 三、建模流程 1.级比校验 2.数据累加和微分方程构造 3.系数求解 4.残差检验与级比偏差检验 四、Python实例实现 总结 前言 博主参与过大大小小十次数学建模比赛,也获得了不少建模奖项…

19 | 三方协议怎么签?

前言 前言:简介三方协议签约的相关内容。 文章目录前言一. 什么是就业协议书二. 签约流程1. 网签流程(线上签约)三. 参考链接一. 什么是就业协议书 就业协议书俗称三方协议,是《全国普通高等学校毕业生就业协议书》的简称。 它是…

b站黑马Vue2后台管理项目笔记——(2)主页布局(整体,Header,左侧菜单布局)

说明: 此项目中使用的是本地SQL数据库,Vue2。 其他功能请见本人后续的其他相关文章。 本文内容实现的最终效果如下图: e.g.点击二级菜单用户列表,就会跳转到用户列表对应的index的地址(用户列表的indexpath是users&…

2023年山东最新道路运输安全员考试真题题库及答案

百分百题库提供道路运输安全员考试试题、道路运输安全员考试预测题、道路运输安全员考试真题、道路运输安全员证考试题库等,提供在线做题刷题,在线模拟考试,助你考试轻松过关。 题干:客运驾驶员从业行为定期考核结果应与企业安全生产奖惩制度…

Gradle 编译Server returned HTTP response code: 401 for URL

Gradle编译项目,Error:Server returned HTTP response code: 401 for URL: http://xxxxxxxxxx 解决方案 打开gradle-wrapper.properties文件 方法一:使用http协议:distributionUrlhttp://repo.xiaoman.cc/repository/gradle/gradle-6.8.2-b…

MCM箱模型实践技术应用与O3形成途径、生成潜势、敏感性分析

查看原文>>>https://mp.weixin.qq.com/s?__bizMzAxNzcxMzc5MQ&mid2247578057&idx4&sn9253a074df9937db3d258df14dd563ed&chksm9be2aed9ac9527cfdf270275d499452afded7a165944fdbbe345a4cb53fcd53548969d39c0c2&token850102049&langzh_CN#rd目…

剑指 Offer II 003 前 n 个数字二进制中 1 的个数

给定一个非负整数 n ,请计算 0 到 n 之间的每个数字的二进制表示中 1 的个数,并输出一个数组。 示例 1: 输入: n 2 输出: [0,1,1] 解释: 0 --> 0 1 --> 1 2 --> 10 示例 2: 输入: n 5 输出: [0,1,1,2,1,2] 解释: 0 --> 0 1 --> 1 2 …

Appium基础 — 获取toast信息

1、toast介绍Android中的toast是一种简易的消息提示框,toast提示框不能被用户点击,会根据所设置的显示时间自动消失。toas要appium1.6.3以上版本才支持,appium1.4的版本就别浪费时间了。再来看下toast长什么样,如下图:…

快速幂的几种实现方式

目录快速幂算法快速幂原理代码实现常规计算次幂的方法快速幂(一般)递归求快速幂位运算求快速幂快速幂算法 快速幂 快速幂还是很常用的,例如codeforce上的这道题目: 快速幂就是快速计算底数的n次幂。其时间复杂度为O(log₂N)O(log_₂N)O(log₂​N),与朴素的O(N)相比效率有了极…

SSM配置(备忘)

SSMSSM需要配置的文件配置applicationContext.xml配置database.properties配置mappers/ExamDao.xml在java目录下创建controller、dao、pojo、service目录控制类接口类(dao)实体类(pojo)服务层serviceservice接口类服务层实现类SSM SSM包含框架 spring s…

Linux(centos7)基本操作---用户权限

用户权限基本权限(UGO)设置权限设置属主,属组基本权限(ACL)特殊权限基本权限(UGO) 设置权限 权限的对象分为个人(u),组(g)&#xff…

无货源模式,跨境电商时代的风向标

众所周知,说到电商,我们首先就会想到淘宝、天猫、京东等平台,这些平台近年来发展迅猛,红海一片,可以说已经趋向于饱和状态了。由于国内电商平台严重的同质化竞争,越来越多的卖家开始转战跨境电商。为什么加…

Canary保护机制及绕过

Canary基本介绍 在基本的栈溢出中,我们可以通过没有限制输入长度或限制不严格的函数等向栈中写入我们构造的数据,可写入的数据包括但不限于: 一段可执行的代码(关闭NX防护的前提下) 一段特意构造的返回地址等 … …

基于java SSM校园兼职平台系统设计和实现

基于java SSM校园兼职平台系统设计和实现 博主介绍:5年java开发经验,专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java毕设项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末获取源码联系方式 …

CI/CD | 大型企业与开发团队如何进行持续集成与持续发布

Jenkins是当今最流行的持续集成工具之一, 企业选择Jenkins,可以从它的灵活性和自动化能力中获益。但除此之外的其他需求呢?企业规模在不断增大,他们如何在不增加管理负担的情况下,让CI扩展到整个组织,并满足…

rabbit是否支持批量发送?

最近和rabbit一直在打交道, 也是有个问题 Rabbit是否支持批量发送消息 该问题笔者翻阅官方文档与三方博客也没有找到答案,后也是自己去翻阅源码后才大概找到一个不敢确定的答案: BatchingRabbitTemplate 批量rabbit模板 该模板在RabbitTemplate模板的基础上进行了…

springboot配置(备忘)

springboot配置新建项目配置application.properties成功Tips需要配置的东西设置SpringbootstuApplication配置欢迎界面在java目录下创建controller、dao、pojo、service目录(与ssm配置大致相同,注释不同)控制类接口类(dao)实体类(…

使用SysBench压测mysql8.x版本

yum install gcc gcc-c autoconf automake make libtool mysql-devel git mysql git clone https://github.com/akopytov/sysbench.git ##从Git中下载Sysbench cd sysbench ##打开sysbench目录 git checkout 1.0.18 ##切换到sysbench 1.0.18版本 ./autogen.sh ##运行autogen.sh…

读书笔记——上瘾:让用户养成使用习惯的四大产品逻辑

总结 书中核心逻辑就是下面这张图,上瘾的过程由四步组成: 下面以我自己为案例,从四个维度分析:魔兽世界、写博客,这两件事情。 1 触发、行动 行动的目标是获取酬劳。书中提到《福格行为模型》 福格行为模型&…