技术思想
利用 mysql catalog,mysql cdc,flink jdbc 等技术实现 mysql 整库迁移至下游数据库,这里是示范 mysql to mysql ,其他 sink 组件可自行扩展实现。
通过 flink ParameterTool,可以选择是整库同步还是多表亦或单表同步,可以设置全局并发,源表 mysql 参数,目标表 mysql 参数
通过 sql Connection 实现自动建表逻辑 (mysql 数据类型众多,这里并没有测试所有的类型参数,如担心建表不成功,可手动建表,不影响程序运行)
下游使用 flink jdbc 来实现,语法为 upsert 即幂等写入(重复数据只会写入一次)
使用 mysql catalog 来实现源表元数据的获取
自定义 CustomDebeziumDeserializer 实现 DebeziumDeserializationSchema 接口对数据进行转换
该任务本质上是 单 source 多 sink 任务,不同的表数据不一样可能会有一定的反压
程序测试 生成五百万条数据 五张表 一分钟左右完成,增量数据一万条,可以同步完成
环境 flink 1.16 cdc 2.3.0
refer:
https://nightlies.apache.org/flink/flink-docs-release-1.16/
https://blog.csdn.net/qq_36062467/article/details/128117647
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc%28ZH%29.html
代码
github 地址 https://github.com/SophiaData/Bigdata_Code_Tutorial/blob/master/Code_Tutorial/Flink-demo/
/** (@SophiaData) (@date 2022/12/1 16:02). */
public class CustomDebeziumDeserializer
implements DebeziumDeserializationSchema<Tuple2<String, Row>> {
private static final Logger LOG = LoggerFactory.getLogger(CustomDebeziumDeserializer.class);
private final Map<String, DeserializationRuntimeConverter> physicalConverterMap =
Maps.newConcurrentMap();
CustomDebeziumDeserializer(Map<String, RowType> tableRowTypeMap) {
for (String tableName : tableRowTypeMap.keySet()) {
RowType rowType = tableRowTypeMap.get(tableName);
DeserializationRuntimeConverter physicalConverter = createNotNullConverter(rowType);
this.physicalConverterMap.put(tableName, physicalConverter);
}
}
@Override
public void deserialize(SourceRecord record, Collector<Tuple2<String, Row>> out)
throws Exception {
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
Struct source = value.getStruct("source");
String tableName = source.get("table").toString();
DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tableName);
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
Row insert = extractAfterRow(value, valueSchema, physicalConverter);
insert.setKind(RowKind.INSERT);
out.collect(Tuple2.of(tableName, insert));
} else if (op == Envelope.Operation.DELETE) {
Row delete = extractBeforeRow(value, valueSchema, physicalConverter);
delete.setKind(RowKind.DELETE);
out.collect(Tuple2.of(tableName, delete));
} else if (op == Envelope.Operation.UPDATE) {
Row before = extractBeforeRow(value, valueSchema, physicalConverter);
before.setKind(RowKind.UPDATE_BEFORE);
out.collect(Tuple2.of(tableName, before));
Row after = extractAfterRow(value, valueSchema, physicalConverter);
after.setKind(RowKind.UPDATE_AFTER);
out.collect(Tuple2.of(tableName, after));
} else {
LOG.warn(" Unexpected statement: {}", value);
}
}
private Row extractAfterRow(
Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter)
throws Exception {
Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
return (Row) physicalConverter.convert(after, afterSchema);
}
private Row extractBeforeRow(
Struct value, Schema valueSchema, DeserializationRuntimeConverter physicalConverter)
throws Exception {
Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
return (Row) physicalConverter.convert(before, beforeSchema);
}
@Override
public TypeInformation<Tuple2<String, Row>> getProducedType() {
return TypeInformation.of(new TypeHint<Tuple2<String, Row>>() {});
}
public static DeserializationRuntimeConverter createNotNullConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return null;
}
};
case BOOLEAN:
return convertToBoolean();
case TINYINT:
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Byte.parseByte(dbzObj.toString());
}
};
case SMALLINT:
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return Short.parseShort(dbzObj.toString());
}
};
case INTEGER:
case INTERVAL_YEAR_MONTH:
return convertToInt();
case BIGINT:
case INTERVAL_DAY_TIME:
return convertToLong();
case DATE:
return convertToDate();
case TIME_WITHOUT_TIME_ZONE:
return convertToTime();
case TIMESTAMP_WITHOUT_TIME_ZONE:
return convertToTimestamp(ZoneId.of("UTC"));
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return convertToLocalTimeZoneTimestamp(ZoneId.of("UTC"));
case FLOAT:
return convertToFloat();
case DOUBLE:
return convertToDouble();
case CHAR:
case VARCHAR:
return convertToString();
case BINARY:
case VARBINARY:
return convertToBinary();
case DECIMAL:
return createDecimalConverter((DecimalType) type);
case ROW:
return createRowConverter((RowType) type);
case ARRAY:
case MAP:
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
private static DeserializationRuntimeConverter convertToBoolean() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Boolean) {
return dbzObj;
} else if (dbzObj instanceof Byte) {
return (byte) dbzObj == 1;
} else if (dbzObj instanceof Short) {
return (short) dbzObj == 1;
} else {
return Boolean.parseBoolean(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToInt() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return dbzObj;
} else if (dbzObj instanceof Long) {
return ((Long) dbzObj).intValue();
} else {
return Integer.parseInt(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToLong() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Integer) {
return ((Integer) dbzObj).longValue();
} else if (dbzObj instanceof Long) {
return dbzObj;
} else {
return Long.parseLong(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter createDecimalConverter(DecimalType decimalType) {
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
BigDecimal bigDecimal;
if (dbzObj instanceof byte[]) {
// decimal.handling.mode=precise
bigDecimal = Decimal.toLogical(schema, (byte[]) dbzObj);
} else if (dbzObj instanceof String) {
// decimal.handling.mode=string
bigDecimal = new BigDecimal((String) dbzObj);
} else if (dbzObj instanceof Double) {
// decimal.handling.mode=double
bigDecimal = BigDecimal.valueOf((Double) dbzObj);
} else {
if (VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())) {
SpecialValueDecimal decimal =
VariableScaleDecimal.toLogical((Struct) dbzObj);
bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);
} else {
// fallback to string
bigDecimal = new BigDecimal(dbzObj.toString());
}
}
return DecimalData.fromBigDecimal(bigDecimal, precision, scale);
}
};
}
private static DeserializationRuntimeConverter convertToDouble() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return ((Float) dbzObj).doubleValue();
} else if (dbzObj instanceof Double) {
return dbzObj;
} else {
return Double.parseDouble(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToFloat() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Float) {
return dbzObj;
} else if (dbzObj instanceof Double) {
return ((Double) dbzObj).floatValue();
} else {
return Float.parseFloat(dbzObj.toString());
}
}
};
}
private static DeserializationRuntimeConverter convertToDate() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return (int) TemporalConversions.toLocalDate(dbzObj).toEpochDay();
}
};
}
private static DeserializationRuntimeConverter convertToTime() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case MicroTime.SCHEMA_NAME:
return (int) ((long) dbzObj / 1000);
case NanoTime.SCHEMA_NAME:
return (int) ((long) dbzObj / 1000_000);
}
} else if (dbzObj instanceof Integer) {
return dbzObj;
}
// get number of milliseconds of the day
return TemporalConversions.toLocalTime(dbzObj).toSecondOfDay() * 1000;
}
};
}
private static DeserializationRuntimeConverter convertToTimestamp(ZoneId serverTimeZone) {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof Long) {
switch (schema.name()) {
case Timestamp.SCHEMA_NAME:
return TimestampData.fromEpochMillis((Long) dbzObj);
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
return TimestampData.fromEpochMillis(
micro / 1000, (int) (micro % 1000 * 1000));
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromEpochMillis(
nano / 1000_000, (int) (nano % 1000_000));
}
}
LocalDateTime localDateTime =
TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
return TimestampData.fromLocalDateTime(localDateTime);
}
};
}
private static DeserializationRuntimeConverter convertToLocalTimeZoneTimestamp(
ZoneId serverTimeZone) {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof String) {
String str = (String) dbzObj;
// TIMESTAMP_LTZ type is encoded in string type
Instant instant = Instant.parse(str);
return TimestampData.fromLocalDateTime(
LocalDateTime.ofInstant(instant, serverTimeZone));
}
throw new IllegalArgumentException(
"Unable to convert to TimestampData from unexpected value '"
+ dbzObj
+ "' of type "
+ dbzObj.getClass().getName());
}
};
}
private static DeserializationRuntimeConverter convertToString() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
return StringData.fromString(dbzObj.toString());
}
};
}
private static DeserializationRuntimeConverter convertToBinary() {
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) {
if (dbzObj instanceof byte[]) {
return dbzObj;
} else if (dbzObj instanceof ByteBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) dbzObj;
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return bytes;
} else {
throw new UnsupportedOperationException(
"Unsupported BYTES value type: " + dbzObj.getClass().getSimpleName());
}
}
};
}
private static DeserializationRuntimeConverter createRowConverter(RowType rowType) {
final DeserializationRuntimeConverter[] fieldConverters =
rowType.getFields().stream()
.map(RowType.RowField::getType)
.map(CustomDebeziumDeserializer::createNotNullConverter)
.toArray(DeserializationRuntimeConverter[]::new);
final String[] fieldNames = rowType.getFieldNames().toArray(new String[0]);
return new DeserializationRuntimeConverter() {
private static final long serialVersionUID = 1L;
@Override
public Object convert(Object dbzObj, Schema schema) throws Exception {
Struct struct = (Struct) dbzObj;
int arity = fieldNames.length;
Row row = new Row(arity);
for (int i = 0; i < arity; i++) {
String fieldName = fieldNames[i];
Field field = schema.field(fieldName);
if (field == null) {
row.setField(i, null);
} else {
Object fieldValue = struct.getWithoutDefault(fieldName);
Schema fieldSchema = schema.field(fieldName).schema();
Object convertedField =
convertField(fieldConverters[i], fieldValue, fieldSchema);
row.setField(i, convertedField);
}
}
return row;
}
};
}
private static Object convertField(
DeserializationRuntimeConverter fieldConverter, Object fieldValue, Schema fieldSchema)
throws Exception {
if (fieldValue == null) {
return null;
} else {
return fieldConverter.convert(fieldValue, fieldSchema);
}
}
}
效果
最后
代码和测试可能不充分,仅供参考,欢迎提出意见。
欢迎访问博客 https://sophiadata.github.io/Bigdata_Blog_Website/learning/overview