Debezium 同步 PostgreSQL 数据到 RocketMQ 中

news2024/12/26 2:56:25

1.RocketMQ Connect概览

        RocketMQ Connect是RocketMQ数据集成重要组件,可将各种系统中的数据通过高效,可靠,流的方式,流入流出到RocketMQ,它是独立于RocketMQ的一个单独的分布式,可扩展,可容错系统, 它具备低延时,高可靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。

1.1Connector工作原理

        RocketMQ Connect是一个独立的的分布式,可伸缩,容错的系统,它主要为RocketMQ提供与各种外部系统的数据的流入流出能力。用户不需要编程,只需要简单的配置即可使用RocketMQ Connect,例如从MySQL同步数据到RocketMQ,只需要配置同步所需的MySQL的账号密码,链接地址,和需要同步的数据库,表名就可以了。

1.2Connector的使用场景

1)构建流式数据管道

        在业务系统中,利用MySQL完善的事务支持,处理数据的增删改,使用ElasticSearch,Solr等实现强大的搜索能力,或者将产生的业务数据同步到数据分析系统,数据湖中(例如hudi),对数据进一步处理从而让数据产生更高的价值。使用RocketMQ Connect很容易实现这样的数据管道的能力,只需要配置3个任务,第一个从MySQL获取数据的任务,第二,三个是从RocketMQ消费数据到ElasticSearch,Hudi的任务,配置3个任务就实现了从MySQL到ElasticSearch,MySQL到hudi的两条数据管道,既可以满足业务中事务的需求,搜索的需求,又可以构建数据湖。

2)CDC

        CDC作为ETL模式之一,可以通过近乎实时的增量捕获数据库的 INSERT、UPDATE,DELETE变化,RocketMQ Connect流试数据传输,具备高可用,低延时等特性,通过Connector很容易实现CDC。

1.3Connector部署

        在创建Connector时,一般是通过配置完成的,Connector一般包含逻辑的Connector连接器和执行数据复制的Task即物理线程,如下图所示,两个Connector连接器和它们对应的运行Task任务。

        一个Connector也可以同时运行多个任务,提高Connector的并行度,例如下图所示的Hudi Sink Connector有2个任务,每个任务处理不同的分片数据,从而Connector的并行度,进而提高处理性能。

        RocketMQ Connect Worker支持两种运行模式,集群和单机 集群模式,顾名思义,有多个Worker节点组成,推荐最少有2个Worker节点,组成高可用集群。集群间的配置信息,offset信息,status信息通过指定RocketMQ Topic存储,新增Worker节点也会获取到集群中的这些配置,offset,status信息,并且触发负载均衡,重新分配集群中的任务,使集群达到均衡的状态,减少Woker节点或者Worker宕机也会触发负载均衡,从而保障集群中所有的任务都可以均衡的在集群中存活的节点中正常运行。

单机模式,Connector任务运行在单机上,Worker本身没有高可用,任务offset信息持久化在本地。适合一些对高可没有什么要求或者不需要Worker保障高可用的场景,例如部署在k8s集群中,由k8s集群保障高可用。

2.Connector组件概念

1)Connector

        连接器,定义数据从哪复制到哪,是从源数据系统读取数据写入RocketMQ,这种是SourceConnector ,或从RocketMQ读数据写入到目标系统,这种是SinkConnector。Connector决定需要创建任务的数量,从Worker接收配置传递给任务。

2)Task

        是Connector任务分片的最小分配单位,是实际将源数据源数据复制数据到RocketMQ(SourceTask),或者将数据从RocketMQ读取数据写入到目标系统(SinkTask)真正的执行者,Task是无状态的可以动态的启停任务,多个Task是可以并行执行的,Connector复制数据的并行度主要体现在Task数量上。

        通过Connect的Api也可以看到Connector和Task各自的职责,Connector实现时就已经确定数据复制的流向,Connector接收数据源相关的配置,taskClass获取需要创建的任务类型,通过taskConfigs指定最大任务数量,并且为task分配好配置。task拿到配置以后从数据源取数据写入到目标存储。

        通过下面的两张图可以清楚的看到,Connecotr和Task处理基本流程。

3)Worker

        worker 进程是Connector和Task运行环境,它提供RESTFul能力,接受HTTP请求,将获取到的配置传递给Connector和Task。 除此之外它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,负载均衡能力,Connect集群高可用,扩缩容,故障处理主要依赖Worker的负载均衡能力实现的。

        从上面面这张图,看到Worker通过提供的REST Api接收http请求,将接收到的配置信息传递给配置管理服务,配置管理服务将配置保存到本地并同步给其它worker节点,同时触发负载均衡。

3.Debezium概览

3.1概念

        Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台。你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别的更改。只有已提交的更改才是可见的,所以你的应用不用担心事务或者更改被回滚。Debezium为所有的数据库更改事件提供了一个统一的模型,所以你的应用不用担心每一种数据库管理系统的错综复杂性。另外,由于Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。监控数据库,并且在数据变动的时候获得通知一直是很复杂的事情。关系型数据库的触发器可以做到,但是只对特定的数据库有效,而且通常只能更新数据库内的状态(无法和外部的进程通信)。一些数据库提供了监控数据变动的API或者框架,但是没有一个标准,每种数据库的实现方式都是不同的,并且需要大量特定的知识和理解特定的代码才能运用。确保以相同的顺序查看和处理所有更改,同时最小化影响数据库仍然非常具有挑战性.

3.2 基础架构

3.3 Debezium常见使用场景

1) 缓存失效

        在缓存中缓存的条目(entry)在源头被更改或者被删除的时候立即让缓存中的条目失效。如果缓存在一个独立的进程中运行(例如Redis,Memcache等),那么简单的缓存失效逻辑可以放在独立的进程或服务中,从而简化主应用的逻辑。在一些场景中,缓存失效逻辑可以更复杂一点,让它利用更改事件中的更新数据去更新缓存中受影响的条目。

2) 简化单体应用

        许多应用更新数据库,然后在数据库中的更改被提交后,做一些额外的工作:更新搜索索引,更新缓存,发送通知,运行业务逻辑,等等。这种情况通常称为双写(dual-writes),因为应用没有在一个事务内写多个系统。这样不仅应用逻辑复杂难以维护,而且双写容易丢失数据或者在一些系统更新成功而另一些系统没有更新成功的时候造成不同系统之间的状态不一致。使用捕获更改数据技术(change data capture,CDC),在源数据库的数据更改提交后,这些额外的工作可以被放在独立的线程或者进程(服务)中完成。这种实现方式的容错性更好,不会丢失事件,容易扩展,并且更容易支持升级。

3) 共享数据库

        当多个应用共用同一个数据库的时候,一个应用提交的更改通常要被另一个应用感知到。一种实现方式是使用消息总线,尽管非事务性的消息总线总会受上面提到的双写影响。但是,另一种实现方式,即Debezium,变得很直接:每个应用可以直接监控数据库的更改,并且响应更改。

4) 数据集成

        数据通常被存储在多个地方,尤其是当数据被用于不同的目的的时候,会有不同的形式。保持多系统的同步是很有挑战性的,但是可以通过使用Debezium加上简单的事件处理逻辑来实现简单的ETL类型的解决方案。

5) CQRS

        在CQRS(Command Query Responsibility Separation)架构模式中,更新数据使用了一种数据模型,读数据使用了一种或者多种数据模型。由于数据更改被记录在更新侧,这些更改将被处理以更新各种读展示。所以CQRS应用通常更复杂,尤其是他们需要保证可靠性和全序处理。

4. 构建Connector

        下载源码

git clone https://github.com/apache/rocketmq-connect.git

        注:官方Rocketmq-Connect源码还有很多问题,这里我对源码有做二次开发,传送门:

        RocketMQ-Connect 二次开发源码 


        编译

cd  rocketmq-connect
mvn -Prelease-connect -DskipTests clean install -U

        注意

#由于rocketmq-connect-debezium官方很多年没维护有bug,所以需将改好的jar包替换
​
/opt/rocketmq-connect/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT/lib/rocketmq-connect-runtime-0.0.1-SNAPSHOT.jar

        修改脚本参数

vim /opt/rocketmq-connect/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT/bin/runconnect.sh
​
....
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
....

        修改配置文件

vim /opt/rocketmq-connect/distribution/conf/connect-standalone.conf
​
workerId=standalone-worker
storePathRootDir=/tmp/standalone/storeRoot
​
## Http port for user to access REST API
httpPort=8083
​
# Rocketmq namesrvAddr
namesrvAddr=10.0.61.12:9876;10.0.61.21:9876
​
# RocketMQ acl
aclEnable=false
accessKey=rocketmq
secretKey=12345678
​
autoCreateGroupEnable=false
clusterName="DefaultCluster"
​
# Source or sink connector jar file dir,The default value is rocketmq-connect-sample
pluginPaths=/usr/local/connector-plugins/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar

        配置说明

#当前群集节点唯一标识
workerId=DEFAULT_WORKER_1
​
#用户访问REST API的端口
httpPort=8083
​
#配置存储的本地文件目录
storePathRootDir=/tmp/standalone/storeRoot
​
#需要修改为自己的rocketmq nameserver 接入点
# RocketMQ namesrvAddr
namesrvAddr=127.0.0.1:9876  
​
# RocketMQ acl
aclEnable=false
accessKey=rocketmq
secretKey=12345678
​
autoCreateGroupEnable=false
clusterName="DefaultCluster"
​
#用于加载Connector插件,类似于jvm启动加载jar包或者class类,这里目录目录用于放Connector相关的实现插件,
支持文件和目录
# Source or sink connector jar file dir
pluginPaths=/usr/local/connector-plugins/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
​
# 补充:将 Connector 相关实现插件保存到指定文件夹 
# pluginPaths=/usr/local/connector-plugins/*

        启动

cd /opt/rocketmq-connect/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
​
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

4.1 rocketmq-connect-debezium-postgresql安装

git clone git@github.com:apache/rocketmq-connect.git
​
cd rocketmq-connect/connectors/rocketmq-connect-debezium/
​
mvn clean package -Dmaven.test.skip=true

        将 Debezium postgresql RocketMQ Connector 编译好的包放入这个目录

mkdir -p /usr/local/connector-plugins
​
cp rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins

        注意

#由于rocketmq-connect-debezium官方很多年没维护有bug,所以需将改好的jar包替换
​
/usr/local/connector-plugins/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar

4.2 connector测试

        启动source connector

#当前目录创建测试文件 test-source-file.txt
echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt
​
curl -X POST -H "Content-Type: application/json" http://10.0.61.12:8083/connectors/fileSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"test-source-file.txt","connect.topicname":"fileTopic"}'
​
#返回值200成功
"status":200

        查看日志

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

        启动sink connector

curl -X POST -H "Content-Type: application/json" http://10.0.61.12:8083/connectors/fileSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"test-sink-file.txt","connect.topicnames":"fileTopic"}'
​
#返回值200成功
"status":200

        查看日志

         停止demo中的两个connector

curl http://10.0.61.12:8083/connectors/fileSinkConnector/stop
curl http://10.0.61.12:8083/connectors/fileSourceConnector/stop

5.  postgresql安装插件

1) 安装decoderbufs

        安装环境

cat /etc/redhat-release
CentOS Linux release 7.9.2009 (Core)

        安装依赖

#安装protobuf
wget https://github.com/google/protobuf/releases/download/v2.6.1/protobuf-2.6.1.tar.gz     #可以手动下载
tar -zxvf protobuf-2.6.1.tar.gz
cd protobuf-2.6.1
./configure   #不指定--prefix,会生成到/usr/local/lib和/usr/local/bin下面
make -j 8 
make install
​
#安装protobuf-c
wget https://github.com/protobuf-c/protobuf-c/releases/download/v1.2.1/protobuf-c-1.2.1.tar.gz
tar -zxvf protobuf-c-1.2.1.tar.gz
export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig    #指定protobuf.pc文件所在
./configure # 不指定--prefix
make -j 8
make install

        修改ld.so.conf

vim /etc/ld.so.conf
#添加以下内容 
/usr/local/lib
​
#执行
ldconfig -v

        安装decoderbufs

wget https://github.com/debezium/postgres-decoderbufsunzip postgres-decoderbufs-main.zip
cd 
make postgres-decoderbufs-main
make install

2) 安装wal2json

git clone https://github.com/eulerto/wal2json.git
cd wal2json
PATH=/path/to/pg/bin:$PATH
USE_PGXS=1 make
USE_PGXS=1 make install

3) 修改postgresql配置

        postgresql.conf

vim /usr/local/postgresql/data/postgresql.conf
​
····
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4
shared_preload_libraries = 'decoderbufs,wal2json'
····

        pg_hba.conf        

vim /usr/local/postgresql/data/pg_hba.conf
​
····
host all all all scram-sha-256
host replication start_data_engineer 0.0.0.0/0 trust

        重启postgresql

cd /usr/local/postgresql/bin
​
pg_ctl restart

6. 业务数据变更数据捕获相关

        PostgreSQL -> RocketMQ-Connect-Debezium -> RocketMQ -> Sys 的流程,增/删/改 21数据库会自动捕获变更后的全量数据,更新操作同时会捕获变更前的全量数据

6.1 SQL脚本

        监听的表的更新事件包含行中所有列的先前值,解决更新数据操作,before为null的问题

ALTER TABLE public.approval_abandoned_user REPLICA IDENTITY FULL;
ALTER TABLE public.approval_business_handling REPLICA IDENTITY FULL;

6.2 DebeziumPostgresConnector启动

curl -X POST -H "Content-Type: application/json" http://10.0.61.12:8083/connectors/postgres-connector -d  '{
  "connector.class": "org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector",
  "max.task": "1",
  "connect.topicname": "dev-debezium-postgres-source-03",
  "database.history.skip.unparseable.ddl": true,
  "database.server.name": "swkj",
  "database.port": 8832,
  "database.hostname": "10.0.61.21",
  "database.connectionTimeZone": "UTC",
  "database.user": "postgres",
  "database.dbname": "postgres",
  "database.password": "SwkjPgsql@397656",
  "table.whitelist": "public.approval_abandoned_user
    ,public.approval_business_handling",
  "key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
  "value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'

        RocketMq-Console查看会生成topic:dev-debezium-postgres-source-03

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/136152.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

字节二面:Redis 的大 Key 对持久化有什么影响?

Redis 的持久化方式有两种:AOF 日志和 RDB 快照。 所以接下来,针对这两种持久化方式具体分析分析。 大 Key 对 AOF 日志的影响 先说说 AOF 日志三种写回磁盘的策略 Redis 提供了 3 种 AOF 日志写回硬盘的策略,分别是: Always&am…

Git(四) - Git 分支操作

​​​​​​​ 一、什么是分支 在版本控制过程中,同时推进多个任务,为每个任务,我们就可以创建每个任务的单独分支。使用分支意味着程序员可以把自己的工作从开发主线上分离开来,开发自己分支的时候,不会影响主线分支…

前端面试常考 | js原型与原型链

文章目录一. 什么是原型?二. 什么是原型链?一. 什么是原型? 在js中所有的引用类型都有一个__proto__(隐式原型)属性,属性值是一个普通的对象。 而在js中的引用类型包括:Object,Array,Date,Function 而所有函数都有…

基于K8s的DevOps平台实践(二)

文章目录1. 流水线入门🍑 流水线基础语法🍑 脚本示例🍑 脚本解释🍑 Blue Ocean2. Jenkinsfile实践🍑 演示一🍑 演示二🍑 演示三🍑 演示四🍑 总结3. 多分支流水线实践&…

BEV视觉3D感知算法梳理

1. 基于BEV空间的自动驾驶感知任务 最近,基于BEV空间下的感知任务已经涌现出了众多优秀算法,并在多个自动驾驶公开数据集(KITTI,Waymo,nuScenes)上取得了非常不错的成绩。根据自动驾驶汽车上安装的传感器类…

【从零开始学习深度学习】37. 深度循环神经网络与双向循环神经网络简介

目录1. 深度循环神经网络2. 双向循环神经网络总结1. 深度循环神经网络 之前介绍的循环神经网络只有一个单向的隐藏层,在深度学习应用里,我们通常会用到含有多个隐藏层的循环神经网络,也称作深度循环神经网络。下图演示了一个有LLL个隐藏层的…

数字化时代,全方位解读商业智能BI

商业智能BI是一种通用的数据类技术解决方案,不会因为行业BI没有进行针对性开发而出现不适配、无法使用的情况。同时,也正因为商业智能BI核心是数据,只要企业有数据沉淀,不管是哪些行业BI商业智能都能发挥出作用。 不过考虑到不同…

文件IO操作开发笔记(一):使用Qt的QFile对磁盘文件存储进行性能测试以及测试工具

文为原创文章,转载请注明原文出处 本文章博客地址:https://hpzwl.blog.csdn.net/article/details/128438303 红胖子(红模仿)的博文大全:开发技术集合(包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬结…

portraiture2023智能磨皮修饰滤镜插件中文版

在人像后期修图的时候免不了需要进行磨皮处理,很多人在挑选磨皮软件的时候都不知道该如何选择,今天的文章就来带大家看看磨皮软件哪个好,能磨皮的修图软件和插件!借助磨皮软件即使是新手也能做出高级的人像图片,下面挑选了几款好用…

Java 并发编程知识总结【五】

6. 线程中断与 LockSupport 6.1 线程中断机制 大厂(蚂蚁金服)面试题: 什么是中断? 首先,一个线程不应该由其他线程来强制中断或停止,而是应该由线程自己自行停止。所以,Thread.stop, Thread.…

Exynos_4412——中断控制器

目录 一、中断控制器 中断控制器的作用: 二、Exynos_4412下的中断控制器 它支持三种类型的中断 可以编程设置: 三、中断控制器详解 四、中断控制器编程 一、中断控制器 外设产生的中断信号,先要经过中断控制器,中断是异常…

如何解决软件项目管理中的冲突?

1、项目干系人间的良好沟通 项目干系人之间保持良好的沟通交流,是减少项目管理中冲突的重要手段。甲乙双方签订合同后,为保障项目的成功,在项目发生矛盾和困难时,需要双方相互理解和沟通,共同协商解决问题。 为了及时解…

Git(八) - IDEA 集成 GitHub

一、设置 GitHub 账号 二、分享工程到 GitHub 来到GitHub中发现已经帮我们创建好了git-test的远程仓库。 三、push 推送本地库到远程库 注意:push是将本地库代码推送到远程库,如果本地库代码跟远程库代码版本不一致, push的操作是会被拒绝的…

go 性能分析pprof和trace

runtime/pprof:采集程序(非 Server)的运行数据进行分析,用于可结束的代码块,如一次编解码操作等net/http/pprof:采集 HTTP Server 的运行时数据进行分析。用于不可结束的代码块,如 web 应用等 使…

​工程师如何对待开源

工程师如何对待开源 本文是笔者作为一个在知名科技企业内从事开源相关工作超过 20 年的工程师,亲身经历或者亲眼目睹很多工程师对待开源软件的优秀实践,也看到了很多 Bad Cases,所以想把自己的一些心得体会写在这里,供工程师进行…

linux的shell的概述

Shell 教程 Shell 是一个用 C 语言编写的程序,它是用户使用 Linux 的桥梁。Shell 既是一种命令语言,又是一种程序设计语言。 Shell 是指一种应用程序,这个应用程序提供了一个界面,用户通过这个界面访问操作系统内核的服务。 Ke…

各种颜色的代码

颜色代码对照表如下: 关于16进制颜色代码: 这有必要了解一颜色系统的概念: RGB:RGB色彩模式是工业界的一种颜色标准,是通过对红(R)、绿(G)、蓝(B)三个颜色通道的变化以及它们相互之间的叠加来得到各式各样的颜色的&a…

MySQL的一些有意思的指令和函数

这个里面我准备记录一些比较有意思的MySQL的指令和函数,当然使用函数的时候我们要注意,会不会因为函数导致不走索引,走全表扫描的情况。 因为对索引字段做函数操作,可能会破坏索引值的有序性,因此优化器就决定放弃走树…

文本分类(LSTM+PyTorch)

本文的配套代码已上传至github,链接在文末,同时附带中文数据集。 一、传统方法的基本步骤 预处理:首先进行分词,然后是除去停用词;将文本表示成向量,常用的就是文本表示向量空间模型;进行特征…

回调函数、qsort函数、sort函数与lambda表达式

目录 目录 1、回调函数 2、sort函数 3、lambda表达式 4、qsort与sort函数使用lambda表达式 1、回调函数 回调函数就是一个通过函数指针调用的函数。如果你把函数的指针(地址)作为参数传递给另一 个函数,当这个指针被用来调用其所指向的函…