测试案例
1、遇到的问题
1.1 bug1
io.debezium.DebeziumException: Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation Error code: 1227; SQLSTATE: 42000.
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
at java.lang.Thread.run(Thread.java:750)
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:937)
... 3 more
[ERROR] 2023-04-10 15:21:54,778(28432) --> [Source Data Fetcher for Source: MySQL Source -> Sink kafkaSink (1/1)#11] org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager$1.accept(SplitFetcherManager.java:119): Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:72)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
... 1 more
Caused by: io.debezium.DebeziumException: Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation Error code: 1227; SQLSTATE: 42000.
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146)
... 5 more
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:937)
... 3 more
上面的报错数据是分布式数据库权限的问题。需要解决权限问题。
bug2
在配置flink kafka producer的EXACTLY_ONCE
flink checkpoint无法触发。
flinkKafkaProducer中配置exactly once,flink开启ck,提交事务失败,其中报错原因是
[INFO ] 2023-04-10 12:37:34,662(142554) --> [Checkpoint Timer] org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:913): Failed to trigger checkpoint for job 80b8184c08504bf8026a8fa4f2e03fb5 because Checkpoint triggering task Source: MySQL Source -> (Sink: Print to Std. Out, Sink kafkaSink) (1/1) of job 80b8184c08504bf8026a8fa4f2e03fb5 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running.
其中flink checkpoint的配置信息
executionEnvironment.getCheckpointConfig().
setCheckpointStorage(new FileSystemCheckpointStorage("file:///d:/cdc/ck"));
// executionEnvironment.setStateBackend(new FsStateBackend("hdfs://drmcluster/flink/checkpoints"));
//开启checkpoint 启用 checkpoint,设置触发间隔(两次执行开始时间间隔)
executionEnvironment.enableCheckpointing(1000*10L); //测试5秒触发一次 生产环境10分钟
executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 超时时间,checkpoint没在时间内完成则丢弃
executionEnvironment.getCheckpointConfig().setCheckpointTimeout(50000L); //10秒
executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
executionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
//最小间隔时间(前一次结束时间,与下一次开始时间间隔)
executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
// 当 Flink 任务取消时,保留外部保存的 checkpoint 信息
executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints
(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
在flink 1.14中创建kafkaFink的api发生了改变
Properties properties = new Properties();
properties.setProperty("transaction.timeout.ms", 1000 * 60 * 2+ "");//设置事务时间 5分钟提交事务
return KafkaSink.<String>builder()
.setBootstrapServers("qn-flink01:9092,qn-flink02:9092,qn-flink03:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.setKeySerializationSchema(new MyKeySerializationSchema())
.setPartitioner(new FlinkKafkaPartitioner<String>() {
//数据分区,按照scene字段的hash值来分发数据到3个分区
@Override
public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
JSONObject jsonObject = JSONObject.parseObject(record);
String afterJson = jsonObject.get("after").toString();
Object json = JSONObject.parseObject(afterJson).get(filed);
log.info("scene: " + json);
return Math.abs(json.hashCode() % partitions.length);
}
}).build()
)
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) //精确一次消费
.setKafkaProducerConfig(properties)
// .setTransactionalIdPrefix("scene")
.build();
在从flik到kafka的端对端的语义的时候:
FLink端到端需要注意的点:
- Flink任务需要开启checkpoint配置为CheckpointingMode.EXACTLY_ONCE
- Flink任务FlinkKafkaProducer需要指定参数Semantic.EXACTLY_ONCE
- Flink任务FlinkKafkaProducer配置需要配置transaction.timeout.ms,checkpoint间隔(代码指定)<transaction.timeout.ms(默认为1小时)<transaction.max.timeout.ms(默认为15分钟)
- 消费端在消费FlinkKafkaProducer的topic时需要指定isolation.level(默认为read_uncommitted)为read_committed
原文链接:https://blog.csdn.net/yiweiyi329/article/details/127297375
2、成功的案例
Flink kafka producer的配置是在AT LEAST ONCE的模式,这种情况下,生产者写入的数据会存在重复的情况。
Properties properties = new Properties();
properties.setProperty("transaction.timeout.ms", 1000 * 60 * 2+ "");//设置事务时间 5分钟提交事务
return KafkaSink.<String>builder()
.setBootstrapServers("qn-flink01:9092,qn-flink02:9092,qn-flink03:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic(topic)
.setValueSerializationSchema(new SimpleStringSchema())
.setKeySerializationSchema(new MyKeySerializationSchema())
.setPartitioner(new FlinkKafkaPartitioner<String>() {
//数据分区,按照scene字段的hash值来分发数据到3个分区
@Override
public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
JSONObject jsonObject = JSONObject.parseObject(record);
String afterJson = jsonObject.get("after").toString();
Object json = JSONObject.parseObject(afterJson).get(filed);
log.info("scene: " + json);
return Math.abs(json.hashCode() % partitions.length);
}
}).build()
)
.setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) //精确一次消费
.setKafkaProducerConfig(properties)
// .setTransactionalIdPrefix("scene")
.build();
本地代码在控制台上了实时打印出update、insert、delete的操作日志信息。
update操作:
更新最后一条数据的charge为100:
```json
{
“database”: “zczqdb”,
“before”: {
“is_tax_inclusive”: 0,
“charge”: 13333.0,
“create_time”: 1681143473000,
“treat_shop_id”: “11003”,
“scene”: “3”,
“is_delete”: 0,
“field1”: “”,
“partner_id”: “520181000000”,
“channel_source”: “GRJY(贵人家园)”,
“association_contract”: “”,
“customer_id”: “11003”,
“order_id”: “fc84774d-3031-4511-b99e-5604a7e99a89”,
“accept_time”: 1681143482000,
“status”: 7
},
“after”: {
“is_tax_inclusive”: 0,
“charge”: 100.0,
“create_time”: 1681143473000,
“treat_shop_id”: “11003”,
“scene”: “3”,
“is_delete”: 0,
“field1”: “”,
“partner_id”: “520181000000”,
“channel_source”: “GRJY(贵人家园)”,
“association_contract”: “”,
“customer_id”: “11003”,
“order_id”: “fc84774d-3031-4511-b99e-5604a7e99a89”,
“accept_time”: 1681143482000,
“status”: 7
},
“type”: “update”,
“tableName”: “general_order”
}
delete操作,删除最后一条数据:
```json
{
"database": "zczqdb",
"before": {
"is_tax_inclusive": 0,
"charge": 100.0,
"create_time": 1681143473000,
"treat_shop_id": "11003",
"scene": "3",
"is_delete": 0,
"field1": "",
"partner_id": "520181000000",
"channel_source": "GRJY(贵人家园)",
"association_contract": "",
"customer_id": "11003",
"order_id": "fc84774d-3031-4511-b99e-5604a7e99a89",
"accept_time": 1681143482000,
"status": 7
},
"after": {},
"type": "delete",
"tableName": "general_order"
}