Debezium2.7 数据同步 MySQL/Oracle -- AI生成

news2025/1/11 5:49:45

        Debezium是Red Hat开源的一个工具,用于实时捕获多种数据源(包括MySQL、PostgreSQL、SQL Server、Oracle等)的变更数据,并将这些数据作为事件流输出到Kafka等消息中间件中。通过Debezium,可以实现数据的实时同步和变更数据捕获(CDC)。

        Debezium官网:Reference Documentation

一、环境准备

1、安装 MySQL

        确保 MySQL 版本至少为 5.7.6,因为 Debezium 需要此版本或更高版本的 MySQL 来支持 Binlog。安装 MySQL 并配置好基本的访问权限和网络设置。

# 编辑 MySQL 的配置文件(通常是 /etc/my.cnf 或 /etc/mysql/my.cnf),添加或修改以下配置:
[mysqld]  
server-id = 1  # 确保每个 MySQL 实例的 server-id 是唯一的  
log_bin = mysql-bin  # 开启 Binlog 并指定日志文件名前缀  
binlog_format = ROW  # 使用行格式记录 Binlog,以捕获详细的行级变更  
binlog_row_image = FULL  # 记录更详细的数据变更信息  
expire_logs_days = 10  # 设置 Binlog 的过期时间

        在 MySQL 中创建用于 Debezium 的用户,并授权访问数据库: 

CREATE USER 'debezium'@'%' IDENTIFIED BY 'password';  
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'debezium'@'%';  
FLUSH PRIVILEGES;

2、安装Oracle

        确保 Oracle 数据库已安装并配置好,归档日志(Archive Log)和补充日志(Supplemental Log Data)已开启。创建一个用户并授予足够的权限以访问需要同步的表和日志。

-- 1. 查看数据库是否处于归档模式
SELECT log_mode FROM v$database;
 
-- 如果结果显示为ARCHIVELOG,则数据库已处于归档模式,可以跳过转换步骤
 
-- 2. 如果不是归档模式,转换为归档模式
SHUTDOWN IMMEDIATE; -- 立即关闭数据库
STARTUP MOUNT; -- 启动实例并挂载数据库
ALTER DATABASE ARCHIVELOG; -- 更改数据库为归档模式
ALTER DATABASE OPEN; -- 打开数据库
 
-- 3. 指定归档日志的位置,可以是一个本地目录或一个网络位置
-- 首先查看归档日志的配置
SELECT destination FROM v$archived_log WHERE destination IS NOT NULL;
 
-- 如果需要更改归档日志的位置,可以使用以下命令
-- 例如,指定归档日志的位置为+ARCH/orcl
ALTER SYSTEM SET log_archive_dest_1='LOCATION=+ARCH/orcl' SCOPE=BOTH;
 
-- 确保归档进程正在运行
ALTER SYSTEM ARCHIVE LOG START;
 
-- 可以通过以下命令查看归档进程状态
SELECT status FROM v$instance WHERE status = 'ACTIVE';

3、安装 Kafka 和 Zookeeper

        Kafka 和 Zookeeper 是 Debezium 实现数据同步所必需的组件。

        安装 Kafka 和 Zookeeper,并确保它们能够正常运行,且 Kafka 集群的配置符合生产环境的要求。

4、下载 Debezium 插件

        访问 Debezium 官网((https://debezium.io/documentation/)下载对应版本的 MySQL/Oracle Connector 插件(这里以2.7版本为例),如 debezium-connector-mysql-x.y.z.Final-plugin.tar.gz

二、配置 Kafka Connect

1、解压 Debezium 插件

        将下载的 Debezium MySQL/Oracle Connector 插件解压到 Kafka 的插件目录中,例如 /opt/kafka/plugins ,没有plugins 目录,创建一个plugins 目录

2、修改 Kafka Connect 配置

        编辑 Kafka Connect 的配置文件(如 connect-distributed.properties),确保包含以下关键配置:

bootstrap.servers=localhost:9092  # Kafka 集群地址  
group.id=connect-cluster  # Kafka Connect 集群的组 ID  
key.converter=org.apache.kafka.connect.json.JsonConverter  # 键转换器  
value.converter=org.apache.kafka.connect.json.JsonConverter  # 值转换器  
key.converter.schemas.enable=false  # 禁用键的 schema  
value.converter.schemas.enable=false  # 禁用值的 schema  
offset.storage.topic=connect-offsets  # 偏移量存储主题  
config.storage.topic=connect-configs  # 配置存储主题  
status.storage.topic=connect-status  # 状态存储主题  
plugin.path=/opt/kafka/plugins  # 插件路径

3、启动 Kafka Connect

        使用命令启动 Kafka Connect 服务,并确保它以分布式模式运行:

/opt/kafka/bin/connect-distributed.sh -daemon /opt/kafka/config/connect-distributed.properties

三、配置 Debezium Connector

        创建一个 JSON 文件来定义 Debezium MySql/Oracle Connector 的配置,指定 MySql/Oracle 数据库的连接信息、Kafka 主题、以及需要同步的表。

1、创建 Debezium Mysql Connector 配置

        编写一个配置文件mysql-source.json,用于启动Debezium MySQL Connector。可参照官网例子:Debezium connector for MySQL :: Debezium Documentation

{
    "name": "mysql-connector", //在Kafka Connect服务中注册时的连接器名称。
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector"", 
        "database.hostname": "mysql_host", 
        "database.port": "3306", 
        "database.user": "debezium-user", 
        "database.password": "debezium-user-pw", 
        "database.server.id": "184054", 
        "topic.prefix": "fullfillment", 
        "database.include.list": "inventory", 
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092", 
        "schema.history.internal.kafka.topic": "schemahistory.fullfillment", 
        "include.schema.changes": "true" 
    }
}

 

        更多配置可查看:Debezium connector for MySQL :: Debezium Documentation 

        使用Kafka Connect REST API或CLI工具部署Debezium MySQL Connector。

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @mysql-source.json http://connect_cluster:8083/connectors

2、创建 Debezium Oracle Connector 配置

        编写一个配置文件oracle-source.json,用于启动Debezium Oracle Connector。

{
    "name": "oracle-connector",  
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",  
        "database.hostname" : "<ORACLE_IP_ADDRESS>",  
        "database.port" : "1521",  
        "database.user" : "c##dbzuser",  
        "database.password" : "dbz",   
        "database.dbname" : "ORCLCDB",  
        "topic.prefix" : "server1",  
        "tasks.max" : "1",  
        "database.pdb.name" : "ORCLPDB1",  
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092", 
        "schema.history.internal.kafka.topic": "schema-changes.inventory"  
    }
}

        还可以使用JDBC URL连接到数据库。

{
    "name": "oracle-connector",
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "topic.prefix" : "server1",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.url": "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(LOAD_BALANCE=OFF)(FAILOVER=ON)(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 1>)(PORT=1521))(ADDRESS=(PROTOCOL=TCP)(HOST=<oracle ip 2>)(PORT=1521)))(CONNECT_DATA=SERVICE_NAME=)(SERVER=DEDICATED)))",
        "database.dbname" : "ORCLCDB",
        "database.pdb.name" : "ORCLPDB1",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
}

         以上是容器数据库(CDB)配置方式,下面是非容器数据库(非CDB)配置方式。

{
    "config": {
        "connector.class" : "io.debezium.connector.oracle.OracleConnector",
        "tasks.max" : "1",
        "topic.prefix" : "server1",
        "database.hostname" : "<oracle ip>",
        "database.port" : "1521",
        "database.user" : "c##dbzuser",
        "database.password" : "dbz",
        "database.dbname" : "ORCLCDB",
        "schema.history.internal.kafka.bootstrap.servers" : "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes.inventory"
    }
}

当您配置Debezium Oracle连接器以与Oracle CDB一起使用时,必须为属性database.pdb.name指定一个值,该值命名了您希望连接器从中捕获更改的pdb。对于非CDB安装,不要指定database.pdb.name属性。

        更多配置可查看:Debezium Connector for Oracle :: Debezium Documentation

        使用Kafka Connect REST API或CLI工具部署Debezium Oracle Connector。 

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @oracle-source.json http://connect_cluster:8083/connectors

四、使用JDBC Sink Connector同步数据到数据库

        以JDBC Sink Connector为例,以下是一个简化的配置示例,用于将数据从Kafka订阅消费事件JSON:

{
    "name": "jdbc-connector",  //在Kafka Connect服务中注册连接器时分配给连接器的名称。
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",  //JDBC接收器连接器类的名称。
        "tasks.max": "1",  //为此连接器创建的最大任务数。
        "connection.url": "jdbc:postgresql://localhost/db",  //连接器用于连接到其写入的接收器数据库的JDBC URL。
        "connection.username": "pguser",  //用于身份验证的数据库用户的名称。
        "connection.password": "pgpassword",  //用于身份验证的数据库用户的密码。
        "insert.mode": "upsert",  // 插入模式,可以选择 `insert`, `update`, 或者 `upsert`
        "delete.enabled": "true",  //允许删除数据库中的记录
        "primary.key.mode": "record_key",  //指定用于解析主键列的方法。
        "schema.evolution": "basic",  //指定连接器如何演化目标表架构,可选择`none`, 或者 `basic`, basic:指定发生基本演变。连接器通过将传入事件的记录架构与数据库表结构进行比较,将缺失的列添加到表中。
        "database.time_zone": "UTC",  //指定写入时间字段类型时使用的时区。
        "topics": "orders" //要使用的主题列表,用逗号分隔。
    }
}

        更多配置可查看:Debezium connector for JDBC :: Debezium Documentation 

        使用Kafka Connect REST API或CLI工具部署Oracle Sink Connector。 

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @sink.json http://connect_cluster:8083/connectors

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2065140.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue组件库Element和Vue路由

目录 一、Vue组件库Element&#xff08;学会怎么CV&#xff09; 快速入门 ElementUI的常用组件 1.Table表格 &#xff08;1&#xff09;组件演示 &#xff08;2&#xff09;组件属性详解 2.Pagination分页 &#xff08;1&#xff09;组件演示 &#xff08;2&#xff0…

易企秀Html5场景秀系统源码 海量模版可以选择 带源代码包以及搭建部署教程

系统概述 易企秀 HTML5 场景秀系统源码是基于 PHPMySQL 组合开发的一套强大的 H5 页面制作系统。它旨在满足企业和个人对于个性化 H5 页面制作的需求&#xff0c;无论是企业宣传、活动推广、产品展示还是邀请函制作等&#xff0c;都能通过该系统轻松实现。 该系统的核心优势在…

智能优化算法-森林优化算法(FOA)(附源码)

目录 1.内容介绍 2.部分代码 3.实验结果 4.内容获取 1.内容介绍 森林优化算法 (Forest Optimization Algorithm, FOA) 是一种基于自然生态系统的元启发式优化算法&#xff0c;它模拟了森林生态系统中的植物生长、竞争和合作等行为&#xff0c;用于解决复杂的优化问题。 FOA的…

uniapp中 使用 VUE3 组合式API 怎么接收上一个页面传递的参数

项目是uniapp &#xff0c;使用了vue3 vite // 使用的组合式API 的 语法糖 <script setup> // 无法使用 onLoad <script> 使用不了下面方法获得上一个页面参数传递 onLoad(options){ } 解决方案1&#xff08;亲测Ok&#xff09;&#xff1a;消息通知与监听…

计算机毕业设计选题推荐-岗位招聘数据可视化分析-Python爬虫

✨作者主页&#xff1a;IT毕设梦工厂✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Py…

Android12 显示框架之Transaction----server端

目录&#xff1a;Android显示终极宝典 上篇讲完了在client端Transaction的内容&#xff0c;最后调用setTransactionState()把所有的参数都交给了surfaceflinger&#xff0c;那么任务就交给server来完成了。本节我们一起接着看看下面的内容。 setTransactionState() //framew…

SQL手工注入漏洞测试(MongoDB数据库)

此次靶场地址为&#xff1a;墨者学院 ⼀. 如下给出的源码...可以看到数据库查询的语句如下..构造回显测试... new_list.php?id1});return ({title:1,content:2 ⼆.成功显示“ 1” 和“ 2” 。可以在此来显示想要查询的数据。接下来开始尝试构造payload查询 当前数据库。通过…

Python基础知识学习总结(七)

文章目录 一. 函数1. 定义函数2. 语法及实例3. 函数调用4. 参数传递5. 可更改与不可更改对象6. 传可变对象实例 二. 文件I/O1. 定义2. File对象的属性3. open 函数4. write()方法5. read()方法6. 文件定位 一. 函数 函数是组织好的&#xff0c;可重复使用的&#xff0c;用来实…

【微服务】SpringCloud Alibaba 10-14章

10 SpringCloud Alibaba入门简介 10.1 是什么 诞生 2018.10.31&#xff0c;Spring Cloud Alibaba 正式入驻了 Spring Cloud 官方孵化器&#xff0c;并在 Maven 中央库发布了第一个版本。 Spring Cloud Alibaba 介绍 10.2 能干嘛 https://github.com/alibaba/spring-cloud-al…

芯片后端之 PT 使用 report_timing 产生报告 之 -nets 选项

今天&#xff0c;我们再学习一点点 后仿真相关技能。 那就是&#xff0c;了解 report_timing 中的 -nets 选项 。 如果我们仅仅使用如下命令&#xff0c;执行后会发现&#xff1a; pt_shell> report_timing -from start_point -to end_point -delay_type max report_ti…

JavaScript(31)——内置构造函数

构造函数 构造函数是一种特殊的函数&#xff0c;主要用于快速初始化对象 用大写字母开头只能由new操作符来执行 function Stu(name, age) {this.name namethis.age age}const xiaom new Stu(小明, 18)const xiaoh new Stu(小红, 19)console.log(xiaom);console.log(xiaoh…

catkin_make 编译报错CMake Error at /opt/ros/noetic/share/catkin/cmake/的最全解决办法,包治百病

检索&#xff08;解决安装了Anaconda后catkin_make不能用了&#xff0c;CMake Error at /opt/ros/noetic/share/catkin/cmake/catkinConfig.cmake:83 (find_package):Could not find a package configuration file provided by "serial" with any&#xff0c;Invokin…

Stable Diffusion AI绘画工具的安装与配置(MAC用户)

AI绘画的热潮席卷了整个创意行业&#xff0c;Stable Diffusion作为其中的翘楚&#xff0c;让艺术创作变得前所未有的简单。然而&#xff0c;对于使用Mac电脑用户来说&#xff0c;安装和配置Stable Diffusion可能显得有些棘手。别担心&#xff0c;这份详细的教程将手把手教你如何…

ARM——驱动——Linux启动流程和Linux启动

一、flash存储器 lash存储器&#xff0c;全称为Flash EEPROM Memory&#xff0c;又名闪存&#xff0c;是一种长寿命的非易失性存储器。它能够在断电情况下保持所存储的数据信息&#xff0c;因此非常适合用于存储需要持久保存的数据。Flash存储器的数据删除不是以单个的字节为单…

Cetos7安装详细流程

CentOS 7是一个流行的Linux发行版&#xff0c;广泛用于服务器和桌面环境。以下是在物理机或虚拟机上安装CentOS 7的详细步骤&#xff1a; 准备工作 下载CentOS 7 ISO&#xff1a;访问CentOS官方网站下载CentOS 7的ISO镜像文件。 创建启动介质&#xff1a;使用ISO镜像文件创建…

how to connect the VRTE to Internet

Are all gen_swp_fb python dependecies installed? In the build log it is present a fail of gen_swp_fb tool? Steps: Configure the VM according to proxy/no-proxy$ cd ../vrte/project/AraUCM_SwUpdate/gen_swp_fb/$ python3 -m pip install -r requirements.txtR…

普元EOS-利用热更新(热启动)提高开发效率

1 简介 在程序开发的时候&#xff0c;需要频繁的重启项目&#xff0c;随着项目复杂度增加&#xff0c;项目启动时间会越来越长。那么每次修改一点内容就重启项目&#xff0c;会极大影响开发效率。 EOS提供了热更新的手段&#xff0c;代码修改后无须重启&#xff0c;自动生效的…

扩展语言,扩展思维:LLM 词汇量缩放

“我的语言的极限意味着我的世界的极限。”——维特根斯坦 大型语言模型 (LLM)已实现可靠的性能。这一切都归功于 Transformer 及其以自监督方式从大量文本中学习的能力。显然&#xff0c;这种简单的方法允许模型学习越来越复杂的文本表示&#xff0c;而无需人工解释。这使得收…

考研资讯平台

TOC springboot0767考研资讯平台 绪论 1.1课题研究背景与意义 随着现代网络技术的快速发展&#xff0c;互联网的应用对学生的生活和工作有着很大的影响&#xff0c;特别是在当今计算机的应用下的人更加需要这样的环境&#xff0c;所以我们根据这个要求来开发了本课题。该课…

集团数字化转型方案(十)

集团数字化转型方案将通过全面部署云计算、大数据分析、人工智能和物联网技术&#xff0c;构建一个全方位的数据驱动平台&#xff0c;实现从战略规划到运营管理的数字化升级&#xff0c;以优化业务流程、提高决策效率、增强客户体验和提升运营灵活性。该方案包括智能化的供应链…