Debezium SchemaNameAdjuster 分析
目录
- 1. 概述
- 2. 核心功能
- 3. 实现原理
- 4. 应用场景
- 5. 扩展示例
- 6. 总结
1. 概述
SchemaNameAdjuster 是 Debezium 中的一个工具类,主要用于确保 Schema 名称符合 Avro 命名规范。在数据库变更事件被转换为 Kafka 消息时,需要为每个表和字段创建相应的 Avro Schema,而这些名称必须符合 Avro 的命名规则。
2. 核心功能
-
名称校验:
- 检查 Schema 名称是否符合 Avro 命名规范
- 验证首字符和非首字符的合法性
-
名称调整:
- 将不合法字符替换为合法字符(默认使用下划线’_')
- 保持名称的语义性和可读性
-
冲突处理:
- 检测并处理名称冲突
- 支持自定义冲突处理策略
3. 实现原理
3.1 核心接口
public interface SchemaNameAdjuster {
/**
* 调整提议的名称使其符合 Avro 命名规范
*/
String adjust(String proposedName);
}
3.2 名称验证规则
- 首字符规则:
public static boolean isValidFullnameFirstCharacter(char c) {
return (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || c == '_';
}
- 非首字符规则:
public static boolean isValidFullnameNonFirstCharacter(char c) {
return c == '.' || isValidFullnameFirstCharacter(c) || (c >= '0' && c <= '9');
}
3.3 调整策略
- 默认策略:
SchemaNameAdjuster adjuster = SchemaNameAdjuster.create("_", (original, replacement, conflict) -> {
LOGGER.warn("Schema name '{}' is invalid, using '{}' instead", original, replacement);
});
- 自定义替换:
SchemaNameAdjuster customAdjuster = SchemaNameAdjuster.create(
c -> c == '-' ? "_" : String.valueOf(c),
(original, replacement, conflict) -> {
// 自定义冲突处理逻辑
}
);
4. 应用场景
4.1 表 Schema 构建
在构建数据库表的 Schema 时,需要为 Value Schema 和 Key Schema 生成合法的 Avro 名称:
SchemaBuilder valSchemaBuilder = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Value"));
SchemaBuilder keySchemaBuilder = SchemaBuilder.struct()
.name(schemaNameAdjuster.adjust(schemaNamePrefix + ".Key"));
4.2 CloudEvents 格式转换
在将 Debezium 事件转换为 CloudEvents 格式时,需要调整 Schema 名称:
CESchemaBuilder ceSchemaBuilder = defineSchema()
.withName(schemaNameAdjuster.adjust(maker.ceEnvelopeSchemaName()))
4.3 逻辑表路由
在进行逻辑表路由时,需要为新的目标主题生成合法的 Schema 名称:
valueBuilder.name(schemaNameAdjuster.adjust(newTopicName + ".Value"));
4.4 心跳机制
在配置心跳机制时,需要确保心跳消息的 Schema 名称符合规范:
return new HeartbeatImpl(
interval,
topic,
logicalName,
schemaNameAdjuster);
4.5 基本用例
- 表名转换:
String tableName = "my-table";
String adjustedName = adjuster.adjust(tableName); // 结果: "my_table"
- 复杂Schema名称:
String complexName = "com.example.my-schema.v2";
String adjusted = adjuster.adjust(complexName); // 结果: "com.example.my_schema.v2"
- 特殊字符处理:
String specialChars = "table$name@2.0";
String adjusted = adjuster.adjust(specialChars); // 结果: "table_name_2.0"
4.6 具体Schema生成示例
让我们以一个具体的表结构为例,展示 SchemaNameAdjuster 如何处理 Schema 名称:
-- 原始表结构
CREATE TABLE inventory.products (
id INT PRIMARY KEY,
name VARCHAR(255),
description TEXT,
weight DECIMAL(5,