Opengauss到Oracle增量同步, 使用debezium

news2024/11/24 9:55:55

一、概述

PGOracle的同步方案使用debezium + kafka + kafka-connect-jdbcdebezium是一款开源的变更捕获软件,它以kafkaconnector形式运行,可以捕获PostgreSQLMySQLOracle中的变更数据,保存到kafkakafka-connect-jdbcconfluent公司的一款开源软件,以connector形式运行,可以从kafka读取变更数据,转换为PostgreSQLMySQLOracleSQLServerDB2等数据库的SQL语句,通过JDBC连接到数据库。

本方案用到的软件都是从开源代码编译而来,编译过程在第6章节。

二、kafkakafka connect

概述

kafka是一个分布式消息队列服务器。从逻辑角度讲,消息存储在称为topic的对象里(可以理解为一个topic就是一个消息队)kafka的客户端读/写消息时,需要指定topic名称。
可以用命令或代码创建topic、配置/查看属性、消息数。topic中的消息被读取后并没有删除,其它客户端仍可读取。客户端读取topic时,除了名称还需指定group id对象,它用于保存此客户端已经读到的消息的位置(offset)kafka是通过group id中的offset,来管理不同客户端对同一topic消费的不同进度。kafka把所有group idoffset保存在名为__consumer_offsettopic下。可以通过命令修改某个group idoffset。消息可以设置保存时间,超时自动删除,但无法用命令删除。topic中的消息只能以先进先出(FIFO)方式读取。

kafka的启动和配置

kafka的运行依赖zookeeper,集群中需要有zookeeper服务器,zookeeper是一个分布式数据库,一般用来存储集群中需要各节点共享的信息,保证信息的一致性,也可以用来实现分布式锁。

我下载的二进制kafka中包含了zookeeper服务器,在配置文件config/zookeeper.properties中,设置一下dataDir,作为实验其它参数默认即可:

注意,zookeeper服务器默认监听端口是2181

zookeeper和kafka都是java程序,注意配置一下java运行环境,我使用的是Java HotSpot 1.8,只要配置好JAVA_HOME和PATH两个环境变量即可,JAVA_HOME指向JDK解压后的目录,启动脚本会优先到JAVA_HOME里去找Java运行环境。

启动zookeeper,在kafka根目录下执行:

bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

kafka的配置文件是config/server.properties,设置log.dirs,这是kafka存消息数据的目录,会占用较大磁盘,作为实验其它参数默认:

启动kafka,在kafka根目录下执行:

bin/kafka-server-start.sh -daemon config/server.properties

注意,kafka服务器默认监听端口是9092,kafka和zookeeper,客户端和kafka直接都是通过网络读写数据的。

我们用到的关于zookeeper、kafka、connect的脚本都在kafka的bin目录下:

消息的读写:topicgroup id 相关命令

创建topic

bin/kafka-topics.sh --create --topic my-test-topic --bootstrap-server 127.0.0.1:9092 --partitions 1 --replication-factor 1

查看指定topic属性:

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 -describe --topic my-test-topic

修改topic属性:

bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --entity-type topics --alter --entity-name my-test-topic --add-config retention.ms=1000

bin/kafka-configs.sh --bootstrap-server 127.0.0.1:9092 --entity-type topics --alter --entity-name my-test-topic --delete-config retention.ms

列出所有topic

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list

删除topic

bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic my-test-topic

查看topic中有几条消息:

bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic my-test-topic

查看有哪些消费者组:

./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

查看消费者组的偏移信息:

./bin/kafka-consumer-groups.sh --describe --bootstrap-server 127.0.0.1:9092 --group connect-sink-oracle

可以看到connect-sink-oracle中记录了多个topic的偏移量信息,LOG-END-OFFSET表示这个topic中总共有多少消息,CURRENT_OFFSET表示消费者当前已读取的消息偏移量,LAG就是还剩多少条消息没读,这个命令可以查看同步的进度。 

设置某个topic的CURRENT_OFFSET:

./bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group connect-sink-oracle  --topic my-test-topic --execute --reset-offsets --to-earliest

可以使用bin目录下的kafka-console-producer.sh和kafka-console-consumer.sh,向kafka收发消息:

./bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic <topic-name>

./bin/kafka-console-consumer.sh --from-beginning --bootstrap-server localhost:9092 --topic <topic-name>

--from-beginning表示从topic的第一条消息开始读,否则只读最后一条消息之后的。

常用于调试,例如查看debezium在PG上做snapshot或捕获数据的状态:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic connect-offsets

kafka connect

kafka connectkafka的一个组件,是一个独立于kafka服务器的Java程序,它可以统一管理多个connector注意connectconnector不同),connector用于连接不同数据库,读取数据库写到kafka,或将kafka中的消息写到数据库。一个connectorkafka的生产者或消费者客户端,是jar形式的java程序,放在kafkalibs目录下。从编程角度讲connector实现了kafka connectsinksource接口,kafka connect启动时会加载它们。用户可以通过kafka connectREST API接口配置connector的参数。

实现source接口的connector,是kafka的生产者,连接源数据库,从源数据库获取数据写到kafka

实现sink接口的connector,是kafka的消费者,连接目标数据库,从kafka读取数据,写入目的数据库。

connector写到kafka前,数据可以序列化(压缩),从kafka读出后再反序列化(解压),以降低网络和存储开销,这个工作由称为converter的Java程序来做,它以jar形式被connect加载

一个kafka connect进程称为一个workersource connectorsink connector都运行在这个进程中。

配置和启动kafka connect

kafka connect的配置文件在config/connect-distributed.properties,作为实验参数使用默认值就行。

注意:

  1. bootstrap.serverskafka服务器的IP:PORT
  2. group.idconnect worker作为消费者所使用的group id不能与sink connectorgroup id相同。
  3. plugin.path指定了connect到哪些目录下寻找jar。jdbcconnectorconverter都是jar形式的程序。为了实验简单,只指定kafka根目录下的libs,把所有用到的jar都复制到这里,建议使用绝对路径。

启动connect

./bin/connect-distributed.sh -daemon config/connect-distributed.properties

​​​​​​​​​​​​​​配置和启动connector

kafka connect启动时connctorconverterjar已被加载,还需要通过REST API配置和启动connector

下面的命令创建/启动了一个connector

curl -H "Content-Type: application/json" -X POST http://127.0.0.1:8083/connectors -d '@sink-oracle-156.json'

connect启动以后在8083端口监听REST API命令,向这个端口以http post发送json格式的配置数据,'@sink-oracle-156.json'指定了文件路径为当前目录下sink-oracle-156.json文件,是json格式文本,内容如下:

是source类型还是sink类型,取决于connector.class指向的类是实现source接口还是sink接口。这个connector的名称是sink-oracle,实现了sink接口(类io.confluent.connect.jdbc.JdbcSinkConnector实现了sink类),key.converter和value.converter设定所使用的的converter。

删除/停止名为sink-oracle的connector:

curl -X DELETE http://127.0.0.1:8083/connectors/sink-oracle

查看所有connector:

curl -X GET http://127.0.0.1:8083/connectors

查看某个connector状态:

curl -X GET http://127.0.0.1:8083/connectors/<connector name>/status|jq

配置和部署avro-converterschema-registry

源数据库的每一条变更记录,默认被转换成了json字符串,包含了元数据和数据,然后发送到kafka,下面就是update emp set ename = 'TOMAS', job = 'ENGINEER' where empno=2 and ename='SMITH' 对应的json字符串:

这样有很大冗余,有两个办法可以降低冗余:
一、使用压缩的序列化格式;
二、使用元数据服务器共享元数据,用key值获得某个表的元数据,消息只携带数据和key值,消费者使用key值查询完整的元数据来解析消息。
avro-converterschema-registry共同完成这个工作。本方案的使用的avro-converterschema-registry来自confluentinc公司的开源代码,编译过程见后面章节。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1214311.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

stable diffusion到底是如何工作的

stable diffusion简单入门 stable diffusion是一个文生图模型&#xff0c;主要由CompVis、Stability AI和LAION的研究者们创建。这个模型主要是在512X512分辨率的图像上训练的&#xff0c;训练数据集是LAION-5B&#xff0c;该数据集是目前可访问的最大的多模态数据集。 在这篇…

springboot服务和python服务如何自定义启动banner

shigen日更文章的博客写手&#xff0c;擅长Java、python、vue、shell等编程语言和各种应用程序、脚本的开发。记录成长&#xff0c;分享认知&#xff0c;留住感动。 shigen最近在修改ruoyi的框架&#xff0c;看到了框架自带的banner图&#xff0c;就是一个不爽&#xff0c;于是…

基于JavaWeb+SSM+微信小程序基金优选系统的设计和实现

基于JavaWebSSM微信小程序基金优选系统的设计和实现 源码获取入口前言主要技术系统设计功能截图Lun文目录订阅经典源码专栏Java项目精品实战案例《500套》 源码获取 源码获取入口 前言 基金优选是金融机构的核心&#xff0c;是必不可少的一个部分。在金融机构的整个服务行业中…

怎么恢复删除的数据? 8个有效的数据恢复方法

无论您在保存备份方面多么小心&#xff0c;灾难有时还是会发生。有时您的备份软件无法按预期运行。 如果您的外部驱动器靠近您的设备&#xff0c;发生火灾/洪水/故障时&#xff0c;有时备份会与原始文件一起丢失。即使是云存储也不能避免故障。 还有一个事实是&#xff0c;我…

微信@all bug复现及原理分析

1、复现 条件&#xff1a; 1、Windows PC 端微信 2、自建一个群聊拥有群管权限可以所有人 废话不多说&#xff0c;直接上图 所有人 剪切后&#xff0c;到另一个群中&#xff0c;引用任意一个群里成员的消息&#xff0c;并将刚才剪切的粘贴至此&#xff0c;发送 便可完成非群…

Ubuntu搭建openvpn服务器

文章目录 一、基于ubuntu搭建openvpn服务器二、制作相关证书2.1 制作ca证书 ./build-ca2.2 制作Server端证书2.3 制作Client端证书 三、配置服务器3.1 配置Server端3.2. 配置Client端 四、安装openvpn客户端&#xff1a;http://build.openvpn.net/downloads/releases/ 一、基于…

电脑技巧:U盘装系统跟光盘装系统有什么区别,看完你就懂了!

目录 一、制作方法 二、优点比较 2.1 U盘 2.2 光盘 三、缺点比较 一、制作方法 U盘&#xff1a;是通过制作U盘系统盘&#xff0c;插在电脑上启动U盘&#xff0c;然后从U盘上启动PE系统&#xff0c;在PE系统里加载预先下载好的镜像&#xff0c;然后开始安装系统。 光盘&am…

算法实战:亲自写红黑树之二 完整代码

此文承接&#xff1a;算法实战&#xff1a;亲自写红黑树之一-CSDN博客 目录 一、项目结构 二、辅助代码a.h 三、红黑树代码rbtree.h 四、测试代码main.cpp 五、运行效果 六、代码详解 一、项目结构 这里给出的代码是实际可以运行的代码。 运行环境&#xff1a;VS2022&am…

Python 如何实现适配器设计模式?什么是适配器(Adapter)设计模式?

什么是适配器设计模式&#xff1f; 适配器&#xff08;Adapter&#xff09;设计模式是一种结构型设计模式&#xff0c;它允许接口不兼容的类之间进行合作。适配器模式充当两个不兼容接口之间的桥梁&#xff0c;使得它们可以一起工作&#xff0c;而无需修改它们的源代码。 主要…

Hive入门--学习笔记

1&#xff0c;Apache Hive概述 定义&#xff1a; Hive是由Facebook开源用于解决海量结构化日志的数据统计&#xff0c;它是基于大数据生态圈Hadoop的一个数据仓库工具。 作用&#xff1a; Hive可以用于将结构化的数据文件【映射】为一张表&#xff0c;并提供类SQL查询功能。 H…

什么是原生IP与广播IP?如何区分?为什么需要用原生IP?

在代理IP中&#xff0c;我们常常听到原生IP与广播IP&#xff0c;二者有何区别&#xff1f;如何区分呢&#xff1f;下面为大家详细讲解。 一、什么是原生IP 原生IP地址是互联网服务提供商&#xff08;ISP&#xff09;直接分配给用户的真实IP地址&#xff0c;无需代理或转发。此…

OpenGL_Learn12(光照)

续OpenGL_Learn11&#xff08;光照&#xff09;-CSDN博客 1. 镜面高光 和漫反射光照一样&#xff0c;镜面光照也决定于光的方向向量和物体的法向量&#xff0c;但是它也决定于观察方向&#xff0c;例如玩家是从什么方向看向这个片段的。镜面光照决定于表面的反射特性。 我们通…

Redis:详解5大数据类型及其常用命令

目录 Redis键&#xff08;key&#xff09;字符串&#xff08;String&#xff09;简介常用命令数据结构简介常用命令 列表&#xff08;List&#xff09;简介常用命令数据结构 集合&#xff08;Set&#xff09;简介常用命令数据结构 哈希&#xff08;Hash&#xff09;简介常用命令…

Java排序算法之希尔排序

希尔排序&#xff08;Shell Sort&#xff09;又称“缩小增量排序”&#xff0c;是直接插入排序算法的一种更高效的改进版本。它的基本思想是&#xff1a;首先将整个数组按照一定的间隔分成若干个子序列&#xff0c;然后对每个子序列分别进行插入排序&#xff0c;减小间隔&#…

异步注解@Async失效的原因

在方法上加上Async注解&#xff0c;然后去启动类加上EnableAsync启动注解开启异步Async失效的原因 1、注解Async的方法不是public方法 2、注解Async的返回值只能为void或者Future 3、注解Async方法使用static修饰也会失效 4、spring无法扫描到异步类&#xff0c;没加注解Async …

AI大模型低成本快速定制法宝:RAG和向量数据库

文章目录 1. 前言2. RAG和向量数据库3. 论坛日程4. 购票方式 1. 前言 当今人工智能领域&#xff0c;最受关注的毋庸置疑是大模型。然而&#xff0c;高昂的训练成本、漫长的训练时间等都成为了制约大多数企业入局大模型的关键瓶颈。 这种背景下&#xff0c;向量数据库凭借其独特…

【yolov5报错解决】ModuleNotFoundError: No module named‘ultralytics.yolo‘

今天跑yolov5遇见一个报错&#xff0c;具体内容如下&#xff1a; 上面显示我没有ultralytics.yolo这个模块&#xff0c;但是我已经安装了ultralytics&#xff0c;同时&#xff0c;我也尝试了网上的方法pip install ultralytics.yolo&#xff0c;但是仍然得不到解决&#xff0c…

ai语音电销机器人电销行业要怎么降低封号率?

工信部对电话营销电话的管控越来越严格&#xff0c;企业电销行业的发展受到了很多限制&#xff0c;因为电话销售人员在进行销售工作的时候&#xff0c;经常会因为各种原因触发封号机制&#xff0c;导致手机卡号被封&#xff0c;那企业电销行业要怎么降低封号率&#xff1f; 很多…

图像格式导致halcon读取失败

图像格式&#xff1a; JPEG (jpg)&#xff0c;文件头&#xff1a;FF D8 FF PNG (png)&#xff0c;文件头&#xff1a;89 50 4E 47 GIF (gif)&#xff0c;文件头&#xff1a;47 49 46 38 Windows Bitmap (bmp)&#xff0c;文件头&#xff1a;42 4D 打开软件“notepad”使用16进…

win11无损关闭系统更新

1、窗口键R&#xff0c;打开运行窗口&#xff0c;输入regedit。 2、打开地址&#xff1a;计算机\HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\WindowsUpdate\UX\Settings 3、新建DWORD&#xff08;32位&#xff09;值(D)&#xff0c;重命名“FlightSettingsMaxPauseDays” 4、…