1.RocketMQ Connect概览
RocketMQ Connect是RocketMQ数据集成重要组件,可将各种系统中的数据通过高效,可靠,流的方式,流入流出到RocketMQ,它是独立于RocketMQ的一个单独的分布式,可扩展,可容错系统, 它具备低延时,高可靠性,高性能,低代码,扩展性强等特点,可以实现各种异构数据系统的连接,构建数据管道,ETL,CDC,数据湖等能力。
1.1Connector工作原理
RocketMQ Connect是一个独立的的分布式,可伸缩,容错的系统,它主要为RocketMQ提供与各种外部系统的数据的流入流出能力。用户不需要编程,只需要简单的配置即可使用RocketMQ Connect,例如从MySQL同步数据到RocketMQ,只需要配置同步所需的MySQL的账号密码,链接地址,和需要同步的数据库,表名就可以了。
1.2Connector的使用场景
1)构建流式数据管道
在业务系统中,利用MySQL完善的事务支持,处理数据的增删改,使用ElasticSearch,Solr等实现强大的搜索能力,或者将产生的业务数据同步到数据分析系统,数据湖中(例如hudi),对数据进一步处理从而让数据产生更高的价值。使用RocketMQ Connect很容易实现这样的数据管道的能力,只需要配置3个任务,第一个从MySQL获取数据的任务,第二,三个是从RocketMQ消费数据到ElasticSearch,Hudi的任务,配置3个任务就实现了从MySQL到ElasticSearch,MySQL到hudi的两条数据管道,既可以满足业务中事务的需求,搜索的需求,又可以构建数据湖。
2)CDC
CDC作为ETL模式之一,可以通过近乎实时的增量捕获数据库的 INSERT、UPDATE,DELETE变化,RocketMQ Connect流试数据传输,具备高可用,低延时等特性,通过Connector很容易实现CDC。
1.3Connector部署
在创建Connector时,一般是通过配置完成的,Connector一般包含逻辑的Connector连接器和执行数据复制的Task即物理线程,如下图所示,两个Connector连接器和它们对应的运行Task任务。
一个Connector也可以同时运行多个任务,提高Connector的并行度,例如下图所示的Hudi Sink Connector有2个任务,每个任务处理不同的分片数据,从而Connector的并行度,进而提高处理性能。
RocketMQ Connect Worker支持两种运行模式,集群和单机 集群模式,顾名思义,有多个Worker节点组成,推荐最少有2个Worker节点,组成高可用集群。集群间的配置信息,offset信息,status信息通过指定RocketMQ Topic存储,新增Worker节点也会获取到集群中的这些配置,offset,status信息,并且触发负载均衡,重新分配集群中的任务,使集群达到均衡的状态,减少Woker节点或者Worker宕机也会触发负载均衡,从而保障集群中所有的任务都可以均衡的在集群中存活的节点中正常运行。
单机模式,Connector任务运行在单机上,Worker本身没有高可用,任务offset信息持久化在本地。适合一些对高可没有什么要求或者不需要Worker保障高可用的场景,例如部署在k8s集群中,由k8s集群保障高可用。
2.Connector组件概念
1)Connector
连接器,定义数据从哪复制到哪,是从源数据系统读取数据写入RocketMQ,这种是SourceConnector ,或从RocketMQ读数据写入到目标系统,这种是SinkConnector。Connector决定需要创建任务的数量,从Worker接收配置传递给任务。
2)Task
是Connector任务分片的最小分配单位,是实际将源数据源数据复制数据到RocketMQ(SourceTask),或者将数据从RocketMQ读取数据写入到目标系统(SinkTask)真正的执行者,Task是无状态的可以动态的启停任务,多个Task是可以并行执行的,Connector复制数据的并行度主要体现在Task数量上。
通过Connect的Api也可以看到Connector和Task各自的职责,Connector实现时就已经确定数据复制的流向,Connector接收数据源相关的配置,taskClass获取需要创建的任务类型,通过taskConfigs指定最大任务数量,并且为task分配好配置。task拿到配置以后从数据源取数据写入到目标存储。
通过下面的两张图可以清楚的看到,Connecotr和Task处理基本流程。
3)Worker
worker 进程是Connector和Task运行环境,它提供RESTFul能力,接受HTTP请求,将获取到的配置传递给Connector和Task。 除此之外它还负责启动Connector和Task,保存Connector配置信息,保存Task同步数据的位点信息,负载均衡能力,Connect集群高可用,扩缩容,故障处理主要依赖Worker的负载均衡能力实现的。
从上面面这张图,看到Worker通过提供的REST Api接收http请求,将接收到的配置信息传递给配置管理服务,配置管理服务将配置保存到本地并同步给其它worker节点,同时触发负载均衡。
3.Debezium概览
3.1概念
Debezium是一个开源项目,为捕获数据更改(change data capture,CDC)提供了一个低延迟的流式处理平台。你可以安装并且配置Debezium去监控你的数据库,然后你的应用就可以消费对数据库的每一个行级别的更改。只有已提交的更改才是可见的,所以你的应用不用担心事务或者更改被回滚。Debezium为所有的数据库更改事件提供了一个统一的模型,所以你的应用不用担心每一种数据库管理系统的错综复杂性。另外,由于Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,因此,你的应用可以随时停止再重启,而不会错过它停止运行时发生的事件,保证了所有的事件都能被正确地、完全地处理掉。监控数据库,并且在数据变动的时候获得通知一直是很复杂的事情。关系型数据库的触发器可以做到,但是只对特定的数据库有效,而且通常只能更新数据库内的状态(无法和外部的进程通信)。一些数据库提供了监控数据变动的API或者框架,但是没有一个标准,每种数据库的实现方式都是不同的,并且需要大量特定的知识和理解特定的代码才能运用。确保以相同的顺序查看和处理所有更改,同时最小化影响数据库仍然非常具有挑战性.
3.2 基础架构
3.3 Debezium常见使用场景
1) 缓存失效
在缓存中缓存的条目(entry)在源头被更改或者被删除的时候立即让缓存中的条目失效。如果缓存在一个独立的进程中运行(例如Redis,Memcache等),那么简单的缓存失效逻辑可以放在独立的进程或服务中,从而简化主应用的逻辑。在一些场景中,缓存失效逻辑可以更复杂一点,让它利用更改事件中的更新数据去更新缓存中受影响的条目。
2) 简化单体应用
许多应用更新数据库,然后在数据库中的更改被提交后,做一些额外的工作:更新搜索索引,更新缓存,发送通知,运行业务逻辑,等等。这种情况通常称为双写(dual-writes),因为应用没有在一个事务内写多个系统。这样不仅应用逻辑复杂难以维护,而且双写容易丢失数据或者在一些系统更新成功而另一些系统没有更新成功的时候造成不同系统之间的状态不一致。使用捕获更改数据技术(change data capture,CDC),在源数据库的数据更改提交后,这些额外的工作可以被放在独立的线程或者进程(服务)中完成。这种实现方式的容错性更好,不会丢失事件,容易扩展,并且更容易支持升级。
3) 共享数据库
当多个应用共用同一个数据库的时候,一个应用提交的更改通常要被另一个应用感知到。一种实现方式是使用消息总线,尽管非事务性的消息总线总会受上面提到的双写影响。但是,另一种实现方式,即Debezium,变得很直接:每个应用可以直接监控数据库的更改,并且响应更改。
4) 数据集成
数据通常被存储在多个地方,尤其是当数据被用于不同的目的的时候,会有不同的形式。保持多系统的同步是很有挑战性的,但是可以通过使用Debezium加上简单的事件处理逻辑来实现简单的ETL类型的解决方案。
5) CQRS
在CQRS(Command Query Responsibility Separation)架构模式中,更新数据使用了一种数据模型,读数据使用了一种或者多种数据模型。由于数据更改被记录在更新侧,这些更改将被处理以更新各种读展示。所以CQRS应用通常更复杂,尤其是他们需要保证可靠性和全序处理。
4. 构建Connector
下载源码
git clone https://github.com/apache/rocketmq-connect.git
注:官方Rocketmq-Connect源码还有很多问题,这里我对源码有做二次开发,传送门:
RocketMQ-Connect 二次开发源码
编译
cd rocketmq-connect
mvn -Prelease-connect -DskipTests clean install -U
注意
#由于rocketmq-connect-debezium官方很多年没维护有bug,所以需将改好的jar包替换
/opt/rocketmq-connect/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT/lib/rocketmq-connect-runtime-0.0.1-SNAPSHOT.jar
修改脚本参数
vim /opt/rocketmq-connect/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT/bin/runconnect.sh
....
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
....
修改配置文件
vim /opt/rocketmq-connect/distribution/conf/connect-standalone.conf
workerId=standalone-worker
storePathRootDir=/tmp/standalone/storeRoot
## Http port for user to access REST API
httpPort=8083
# Rocketmq namesrvAddr
namesrvAddr=10.0.61.12:9876;10.0.61.21:9876
# RocketMQ acl
aclEnable=false
accessKey=rocketmq
secretKey=12345678
autoCreateGroupEnable=false
clusterName="DefaultCluster"
# Source or sink connector jar file dir,The default value is rocketmq-connect-sample
pluginPaths=/usr/local/connector-plugins/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
配置说明
#当前群集节点唯一标识
workerId=DEFAULT_WORKER_1
#用户访问REST API的端口
httpPort=8083
#配置存储的本地文件目录
storePathRootDir=/tmp/standalone/storeRoot
#需要修改为自己的rocketmq nameserver 接入点
# RocketMQ namesrvAddr
namesrvAddr=127.0.0.1:9876
# RocketMQ acl
aclEnable=false
accessKey=rocketmq
secretKey=12345678
autoCreateGroupEnable=false
clusterName="DefaultCluster"
#用于加载Connector插件,类似于jvm启动加载jar包或者class类,这里目录目录用于放Connector相关的实现插件,
支持文件和目录
# Source or sink connector jar file dir
pluginPaths=/usr/local/connector-plugins/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
# 补充:将 Connector 相关实现插件保存到指定文件夹
# pluginPaths=/usr/local/connector-plugins/*
启动
cd /opt/rocketmq-connect/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
4.1 rocketmq-connect-debezium-postgresql安装
git clone git@github.com:apache/rocketmq-connect.git
cd rocketmq-connect/connectors/rocketmq-connect-debezium/
mvn clean package -Dmaven.test.skip=true
将 Debezium postgresql RocketMQ Connector 编译好的包放入这个目录
mkdir -p /usr/local/connector-plugins
cp rocketmq-connect-debezium-postgresql/target/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar /usr/local/connector-plugins
注意
#由于rocketmq-connect-debezium官方很多年没维护有bug,所以需将改好的jar包替换
/usr/local/connector-plugins/rocketmq-connect-debezium-postgresql-0.0.1-SNAPSHOT-jar-with-dependencies.jar
4.2 connector测试
启动source connector
#当前目录创建测试文件 test-source-file.txt
echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt
curl -X POST -H "Content-Type: application/json" http://10.0.61.12:8083/connectors/fileSourceConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"test-source-file.txt","connect.topicname":"fileTopic"}'
#返回值200成功
"status":200
查看日志
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
启动sink connector
curl -X POST -H "Content-Type: application/json" http://10.0.61.12:8083/connectors/fileSinkConnector -d '{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"test-sink-file.txt","connect.topicnames":"fileTopic"}'
#返回值200成功
"status":200
查看日志
停止demo中的两个connector
curl http://10.0.61.12:8083/connectors/fileSinkConnector/stop
curl http://10.0.61.12:8083/connectors/fileSourceConnector/stop
5. postgresql安装插件
1) 安装decoderbufs
安装环境
cat /etc/redhat-release
CentOS Linux release 7.9.2009 (Core)
安装依赖
#安装protobuf
wget https://github.com/google/protobuf/releases/download/v2.6.1/protobuf-2.6.1.tar.gz #可以手动下载
tar -zxvf protobuf-2.6.1.tar.gz
cd protobuf-2.6.1
./configure #不指定--prefix,会生成到/usr/local/lib和/usr/local/bin下面
make -j 8
make install
#安装protobuf-c
wget https://github.com/protobuf-c/protobuf-c/releases/download/v1.2.1/protobuf-c-1.2.1.tar.gz
tar -zxvf protobuf-c-1.2.1.tar.gz
export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig #指定protobuf.pc文件所在
./configure # 不指定--prefix
make -j 8
make install
修改ld.so.conf
vim /etc/ld.so.conf
#添加以下内容
/usr/local/lib
#执行
ldconfig -v
安装decoderbufs
wget https://github.com/debezium/postgres-decoderbufsunzip postgres-decoderbufs-main.zip
cd
make postgres-decoderbufs-main
make install
2) 安装wal2json
git clone https://github.com/eulerto/wal2json.git
cd wal2json
PATH=/path/to/pg/bin:$PATH
USE_PGXS=1 make
USE_PGXS=1 make install
3) 修改postgresql配置
postgresql.conf
vim /usr/local/postgresql/data/postgresql.conf
····
wal_level = logical
max_wal_senders = 4
max_replication_slots = 4
shared_preload_libraries = 'decoderbufs,wal2json'
····
pg_hba.conf
vim /usr/local/postgresql/data/pg_hba.conf
····
host all all all scram-sha-256
host replication start_data_engineer 0.0.0.0/0 trust
重启postgresql
cd /usr/local/postgresql/bin
pg_ctl restart
6. 业务数据变更数据捕获相关
PostgreSQL -> RocketMQ-Connect-Debezium -> RocketMQ -> Sys 的流程,增/删/改 21数据库会自动捕获变更后的全量数据,更新操作同时会捕获变更前的全量数据
6.1 SQL脚本
监听的表的更新事件包含行中所有列的先前值,解决更新数据操作,before为null的问题
ALTER TABLE public.approval_abandoned_user REPLICA IDENTITY FULL;
ALTER TABLE public.approval_business_handling REPLICA IDENTITY FULL;
6.2 DebeziumPostgresConnector启动
curl -X POST -H "Content-Type: application/json" http://10.0.61.12:8083/connectors/postgres-connector -d '{
"connector.class": "org.apache.rocketmq.connect.debezium.postgres.DebeziumPostgresConnector",
"max.task": "1",
"connect.topicname": "dev-debezium-postgres-source-03",
"database.history.skip.unparseable.ddl": true,
"database.server.name": "swkj",
"database.port": 8832,
"database.hostname": "10.0.61.21",
"database.connectionTimeZone": "UTC",
"database.user": "postgres",
"database.dbname": "postgres",
"database.password": "SwkjPgsql@397656",
"table.whitelist": "public.approval_abandoned_user
,public.approval_business_handling",
"key.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter",
"value.converter": "org.apache.rocketmq.connect.runtime.converter.record.json.JsonConverter"
}'
RocketMq-Console查看会生成topic:dev-debezium-postgres-source-03