前一段时间,面向新一代数据架构的数据库增强引擎 SphereEx-DBPlusEngine 正式发布 1.5.0 版本,新增 CDC 和数据库防火墙两大重磅功能,本篇文章笔者就给大家详细介绍 CDC 功能的实现逻辑和技术细节。
CDC
CDC(Change Data Capture)增量数据捕捉。数据库日志(以 MySQL 为例) binlog 是 MySQL 记录变更数据的"二进制日志",它可以看做是一个消息队列,队列中按顺序保存了 MySQL 中详细的增量变更信息,通过消费队列中的变更条目,下游系统或工具实现了与 MySQL 的实时数据同步。
在实际应用中,CDC主要用于数据同步,数据备份和恢复等方面。通常情况下,CDC都应用于数据流转量较大的场景中,如大规模数据仓库、数据中心、云计算等领域。
01 概述
SphereEx-DBPlusEngine 数据库增强引擎既可以帮助数据库实现单体到分布式转换,又可以实现数据在数据库中的加密存储,进而帮助数据库提升安全性。SphereEx-DBPlusEngine 的 CDC 功能,可以实现对分布式和加密存储等场景下的数据变化的捕捉。加密场景可以通过设置加解密参数,指定明文或密文传输到指定目标,满足不同目标端对数据展现形式的需求。分布式场景支持分布式事务,可以保障源端和目标端分布式事务的一致性。
目前 SphereEx-DBPlusEngine 数据迁移插件所支持的数据库产品为 MySQL、MySQL分支版本、PostgreSQL 和 openGauss,具体版本、环境要求和权限要求如下。
数据库 | 版本支持 | 环境要求 | 权限要求 | |
---|---|---|---|---|
1 | MySQL | 5.1.15 ~ 8.x | my.cnf 配置 log-bin=mysql-bin binlog-format=row binlog-row-image=full --确认命令如下 show variables like '%log_bin%'; show variables like '%binlog%'; | GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ${username}@${host} --确认命令如下 SHOW GRANTS FOR 'user'; |
2 | Aurora MySQL | 3.02.2 | my.cnf 配置 log-bin=mysql-bin binlog-format=row binlog-row-image=full --确认命令如下 show variables like '%log_bin%'; show variables like '%binlog%'; | GRANT REPLICATION SLAVE, REPLICATION CLIENT ON . TO ${username}@${host} --确认命令如下 SHOW GRANTS FOR 'user'; |
3 | PostgreSQL | 9.4 或以上版本 | postgresql.conf 配置: wallevel = logical maxwalsenders = 10 maxreplicationslots = 10 maxconnections = 600 | pghba.conf 配置: host replication replacct 0.0.0.0/0 md5 |
4 | openGauss | 2.0.1 ~ 3.0.0 | postgresql.conf 配置: wallevel = logical maxwalsenders = 10 maxreplicationslots = 10 maxconnections = 600 | pghba.conf 配置: host replication replacct 0.0.0.0/0 md5 |
架构图
核心功能 | 全量数据采集 | 增量数据采集 |
增量数据和全量数据的衔接 | 支持事务 | |
数据解密传输 | 故障自动切换 | |
支持资源调度 | 断点续传 |
02 基本概念
-
CDC 进程
-
CDC(Change Data Capture)是一种数据采集技术,CDC 可以监控 SphereEx-DBPlusEngine 的存储节点中的数据变化,捕捉到数据操作事件,过滤并提取有用信息,最终将这些变化数据发送到指定的目标上。
-
-
集群
-
为了提供特定服务而集合在一起的多个节点。
-
-
源端
-
原始数据所在的集群。
-
-
目标端
-
原始数据将要迁移的目标存储集群。
-
-
订阅
-
目标端配置接收源端的变化数据。
-
-
存量数据
-
配置 CDC 功能前,存储节点中已有的数据。
-
-
增量数据
-
CDC 运行过程中,业务系统所产生的新数据。
-
03 适用场景
CDC主要用于数据同步、数据备份和恢复等方面。通常情况下,CDC都应用于数据流转量较大的场景中,如大规模数据仓库、数据中心、云计算等。同时 CDC 支持将数据写入 Kafka,所有可以消费 Kafka 的目标端都可以作为 SphereEx-DBPlusEngine CDC 的目标端。
04 使用前提
已完成 SphereEx-DBPlusEngine 和目标端数据库环境部署,网络通畅且服务正常。
CDC 需要依赖治理中心(如 Zookeeper),需提前完成部署。
05 使用限制
目前只有 openGauss 的增量数据支持全局排序。由于其它数据库没有提供全局有序的 CSN,还无法做到。
-
支持 openGauss 全局排序
-
支持 Kafka
-
不支持 DDL 变更
-
不支持除 openGauss 以外数据库的全局排序
-
不支持存在外键的表
-
不支持在进行 CDC 服务期间触发了扩缩容,CDC 将停止对该表的变化数据的采集
06 注意事项
在 CDC 运行期间,请务必确保数据库日志不能被清除。
07 原理介绍
技术架构图
主要分为 3 大组成部分:
-
CDC 服务端:提供增量数据订阅服务。使用网络通道的情况下,会开启服务端监听端口。
-
CDC 协议:使用网络通道的情况下需要。协议使用 protobuf3 定义,数据压缩率和编解码性能较高,不容易出现数据编解码问题。
-
CDC 客户端:使用网络通道的情况下需要。连接登录 CDC 服务端之后,订阅并消费增量数据,写入客户自己的目标端。需要引用系统提供的 cdc-client Maven 包。
08 部署 CDC 服务端
目前 CDC 和 Proxy 在相同进程内。一个 Proxy 实例可以同时提供原有的 Proxy 服务以及新的 CDC 服务。
CDC 配置只是在server.yaml新增了 1 个属性
cdc-server-port: 33071
-
说明
配置了这个属性代表需要开启 CDC 服务端口,在所有 proxy 实例,也就是任意一个 proxy 实例都可以提供 CDC 服务。不配置则不开启。
CDC 的状态和 Proxy 类似,重要的数据都在注册中心(比如 ZK),进程里面有对应的状态。
CDC 服务端可以认为是无状态的。
-
CDC 服务端参数
参数名称 :cdc_decrypt_enabled
参数含义:CDC 是否开启将密文解析为明文推送到客户端,支持在线修改
参数值:true/false
默认值:false
09 CDC 消费端示例
需要使用 CDC 客户端和 CDC 服务端进行通信
如下为 CDC 消费端的配置示例
1、首先需要引入 CDC 客户端的 maven 依赖
<dependency>
<groupId>org.apache.shardingsphere</groupId>
<artifactId>shardingsphere-data-pipeline-cdc-client</artifactId>
<version>${project.version}</version>
</dependency>
2、基于 CDCClient 进行 CDC 数据同步
import com.google.protobuf.EmptyProto;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.TimestampProto;
import com.google.protobuf.TypeRegistry;
import com.google.protobuf.WrappersProto;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartCDCClientParameter;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import org.apache.shardingsphere.data.pipeline.cdc.protocol.response.DataRecordResult.Record;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@Slf4j
public final class CDCDemo {
public static void main(final String[] args) {
StartCDCClientParameter parameter = new StartCDCClientParameter();
// Proxy的地址
parameter.setAddress("127.0.0.1");
// CDC 端口,和配置文件中一致
parameter.setPort(33071);
// Proxy的用户名
parameter.setUsername("root");
// Proxy的密码
parameter.setPassword("root");
// 订阅的逻辑库
parameter.setDatabase("sharding_db");
// 是否开启全量
parameter.setFull(true);
// 需要订阅的表
parameter.setSchemaTables(Collections.singletonList(SchemaTable.newBuilder().setTable("t_order").build()));
// 第二个参数是数据消费的逻辑,这里是简单打印下,也可以写到数据库或MQ
Printer printer = JsonFormat.printer().omittingInsignificantWhitespace().usingTypeRegistry(TypeRegistry.newBuilder().add(EmptyProto.getDescriptor().getMessageTypes()).add(TimestampProto.getDescriptor().getMessageTypes()).add(WrappersProto.getDescriptor().getMessageTypes()).build());
CDCClient cdcClient = new CDCClient(parameter, records -> {
List<String> list = new ArrayList<>();
for (Record record : records) {
String print;
try {
print = printer.print(record);
} catch (final InvalidProtocolBufferException ex) {
throw new RuntimeException(ex);
}
list.add(print);
}
System.out.printf("%s , received records: %s%n", LocalDateTime.now(), list);
});
cdcClient.start();
}
}
FQA
Q:分片表经过 CDC 之后是分片表还是单表
A:单表,CDC 仅是数据捕获和传输,不会捕获和传输结构的变更。