Flink 1.14测试cdc写入到kafka案例

news2025/1/13 17:13:24

测试案例

1、遇到的问题

1.1 bug1

io.debezium.DebeziumException: Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation Error code: 1227; SQLSTATE: 42000.
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
	at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
	at java.lang.Thread.run(Thread.java:750)
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:937)
	... 3 more
[ERROR] 2023-04-10 15:21:54,778(28432) --> [Source Data Fetcher for Source: MySQL Source -> Sink kafkaSink (1/1)#11] org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager$1.accept(SplitFetcherManager.java:119): Received uncaught exception.  
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
	at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
	at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:72)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$ReaderThreadLifecycleListener.onCommunicationFailure(MySqlStreamingChangeEventSource.java:1185)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:606)
	at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:850)
	... 1 more
Caused by: io.debezium.DebeziumException: Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation Error code: 1227; SQLSTATE: 42000.
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1146)
	... 5 more
Caused by: com.github.shyiko.mysql.binlog.network.ServerException: Access denied; you need (at least one of) the REPLICATION SLAVE privilege(s) for this operation
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:937)
	... 3 more

上面的报错数据是分布式数据库权限的问题。需要解决权限问题。

bug2

在配置flink kafka producer的EXACTLY_ONCE
flink checkpoint无法触发。
flinkKafkaProducer中配置exactly once,flink开启ck,提交事务失败,其中报错原因是
[INFO ] 2023-04-10 12:37:34,662(142554) --> [Checkpoint Timer] org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:913): Failed to trigger checkpoint for job 80b8184c08504bf8026a8fa4f2e03fb5 because Checkpoint triggering task Source: MySQL Source -> (Sink: Print to Std. Out, Sink kafkaSink) (1/1) of job 80b8184c08504bf8026a8fa4f2e03fb5 is not being executed at the moment. Aborting checkpoint. Failure reason: Not all required tasks are currently running. 

其中flink checkpoint的配置信息

executionEnvironment.getCheckpointConfig().
                setCheckpointStorage(new FileSystemCheckpointStorage("file:///d:/cdc/ck"));
//        executionEnvironment.setStateBackend(new FsStateBackend("hdfs://drmcluster/flink/checkpoints"));
        //开启checkpoint 启用 checkpoint,设置触发间隔(两次执行开始时间间隔)
        executionEnvironment.enableCheckpointing(1000*10L); //测试5秒触发一次 生产环境10分钟
        executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        超时时间,checkpoint没在时间内完成则丢弃
        executionEnvironment.getCheckpointConfig().setCheckpointTimeout(50000L); //10秒
        executionEnvironment.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
        executionEnvironment.getCheckpointConfig().setTolerableCheckpointFailureNumber(1);
        //最小间隔时间(前一次结束时间,与下一次开始时间间隔)
        executionEnvironment.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
//        当 Flink 任务取消时,保留外部保存的 checkpoint 信息
        executionEnvironment.getCheckpointConfig().enableExternalizedCheckpoints
                (CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

在flink 1.14中创建kafkaFink的api发生了改变

Properties properties = new Properties();
        properties.setProperty("transaction.timeout.ms", 1000 * 60 * 2+ "");//设置事务时间 5分钟提交事务
        return KafkaSink.<String>builder()
                .setBootstrapServers("qn-flink01:9092,qn-flink02:9092,qn-flink03:9092")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(topic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .setKeySerializationSchema(new MyKeySerializationSchema())
                        .setPartitioner(new FlinkKafkaPartitioner<String>() {
                            //数据分区,按照scene字段的hash值来分发数据到3个分区
                            @Override
                            public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
                                JSONObject jsonObject = JSONObject.parseObject(record);
                                String afterJson = jsonObject.get("after").toString();
                                Object json = JSONObject.parseObject(afterJson).get(filed);
                                log.info("scene: " + json);
                                return Math.abs(json.hashCode() % partitions.length);
                            }
                        }).build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) //精确一次消费
                .setKafkaProducerConfig(properties)
//                .setTransactionalIdPrefix("scene")
                .build();

在从flik到kafka的端对端的语义的时候:
在这里插入图片描述FLink端到端需要注意的点:

  • Flink任务需要开启checkpoint配置为CheckpointingMode.EXACTLY_ONCE
  • Flink任务FlinkKafkaProducer需要指定参数Semantic.EXACTLY_ONCE
  • Flink任务FlinkKafkaProducer配置需要配置transaction.timeout.ms,checkpoint间隔(代码指定)<transaction.timeout.ms(默认为1小时)<transaction.max.timeout.ms(默认为15分钟)
  • 消费端在消费FlinkKafkaProducer的topic时需要指定isolation.level(默认为read_uncommitted)为read_committed

原文链接:https://blog.csdn.net/yiweiyi329/article/details/127297375

2、成功的案例

Flink kafka producer的配置是在AT LEAST ONCE的模式,这种情况下,生产者写入的数据会存在重复的情况。

Properties properties = new Properties();
        properties.setProperty("transaction.timeout.ms", 1000 * 60 * 2+ "");//设置事务时间 5分钟提交事务
        return KafkaSink.<String>builder()
                .setBootstrapServers("qn-flink01:9092,qn-flink02:9092,qn-flink03:9092")
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(topic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .setKeySerializationSchema(new MyKeySerializationSchema())
                        .setPartitioner(new FlinkKafkaPartitioner<String>() {
                            //数据分区,按照scene字段的hash值来分发数据到3个分区
                            @Override
                            public int partition(String record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
                                JSONObject jsonObject = JSONObject.parseObject(record);
                                String afterJson = jsonObject.get("after").toString();
                                Object json = JSONObject.parseObject(afterJson).get(filed);
                                log.info("scene: " + json);
                                return Math.abs(json.hashCode() % partitions.length);
                            }
                        }).build()
                )
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) //精确一次消费
                .setKafkaProducerConfig(properties)
//                .setTransactionalIdPrefix("scene")
                .build();

本地代码在控制台上了实时打印出update、insert、delete的操作日志信息。
update操作:
在这里插入图片描述更新最后一条数据的charge为100:
在这里插入图片描述```json
{
“database”: “zczqdb”,
“before”: {
“is_tax_inclusive”: 0,
“charge”: 13333.0,
“create_time”: 1681143473000,
“treat_shop_id”: “11003”,
“scene”: “3”,
“is_delete”: 0,
“field1”: “”,
“partner_id”: “520181000000”,
“channel_source”: “GRJY(贵人家园)”,
“association_contract”: “”,
“customer_id”: “11003”,
“order_id”: “fc84774d-3031-4511-b99e-5604a7e99a89”,
“accept_time”: 1681143482000,
“status”: 7
},
“after”: {
“is_tax_inclusive”: 0,
“charge”: 100.0,
“create_time”: 1681143473000,
“treat_shop_id”: “11003”,
“scene”: “3”,
“is_delete”: 0,
“field1”: “”,
“partner_id”: “520181000000”,
“channel_source”: “GRJY(贵人家园)”,
“association_contract”: “”,
“customer_id”: “11003”,
“order_id”: “fc84774d-3031-4511-b99e-5604a7e99a89”,
“accept_time”: 1681143482000,
“status”: 7
},
“type”: “update”,
“tableName”: “general_order”
}


delete操作,删除最后一条数据:

```json
{
	"database": "zczqdb",
	"before": {
		"is_tax_inclusive": 0,
		"charge": 100.0,
		"create_time": 1681143473000,
		"treat_shop_id": "11003",
		"scene": "3",
		"is_delete": 0,
		"field1": "",
		"partner_id": "520181000000",
		"channel_source": "GRJY(贵人家园)",
		"association_contract": "",
		"customer_id": "11003",
		"order_id": "fc84774d-3031-4511-b99e-5604a7e99a89",
		"accept_time": 1681143482000,
		"status": 7
	},
	"after": {},
	"type": "delete",
	"tableName": "general_order"
}

3、后面接着完成EXACTLY_ONCE的测试。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/419598.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络编程(第一章:网络基础)

文章目录一. 网络基础1.2 联网协议和层1.2.1 网络采用分层的思想1.2.2 OSI体系结构&#xff08;重点&#xff01;&#xff01;)1.2.3 TCP/IP协议1.2.3.1 网络接口与物理层1.2.3.2 网络层1.2.3.3 传输层1.2.3.4 应用层1.2.4 网络封包与拆包1.3 TCP和UDP的异同点&#xff08;重点…

【活动】高效学习方法分享

1 写在前面 当今社会&#xff0c;学习已成为每个人不可避免的事情。无论是在校生还是职场人士&#xff0c;我们都需要不断地更新自己的知识和技能。而如何高效地学习&#xff0c;则成为了许多人迫切需要解决的问题。本文将分享一些高效学习方法&#xff0c;帮助你更快、更好地…

Redis学习笔记之(Redis认识、安装、启动、命令行客户端、图形化界面)

目录 初识Redis 认识NOSQL认识Redis安装Redis Redis常见命令 5种常见数据结构通用命令不同数据结构的操作命令 Redis的Java客户端 Jedis客户端SpringDataRedis客户端 认识NOSQL 认识NoSQL SQLNoSQL数据结构结构化(Structured)非结构化数据关联关联的(Relational)无关联…

Symble

ES6引入了一种新的原始数据类型 Symbol&#xff0c;表示独一无二的值。它是JavaScript语言的第七种数据类型&#xff0c;是一种类似于字符串的数据类型。 Symbol特点 Symbol 的值是唯一的&#xff0c;用来解决命名冲突的问题 Symbol值不能与其他数据进行运算 Symbol定义的对…

SadTalker项目上手教程

背景 最近发现一个很有趣的GitHub项目SadTalker&#xff0c;它能够将一张图片跟一段音频合成一段视频&#xff0c;看起来毫无违和感&#xff0c;如果不仔细看&#xff0c;甚至很难辨别真假&#xff0c;预计未来某一天&#xff0c;一大波网红即将失业。 虽然这个项目目前的主要…

基于 DSP+FPGA+1553B总线的水下信息融合系统的设计

在一个大型水下系统中&#xff0c;针对不同分系统的特 性&#xff0c;通常采用不同的通信协议。串行通信具有抗干扰 能力强&#xff0c;传输距离远等特点&#xff0c;适用于需要远距离通信 的分系统&#xff1b;MIL-STD-1553B 总线在传输方面具有极强 的可靠性和实时性&#xf…

关于“复活节Easter”知识,你了解多少?

复活节是基督教纪念耶稣复活的节日。耶稣被钉死在十字架上&#xff0c;死后第三天复活。Christians say Jesus died on Friday. On the third day, he rose from the dead. He became alive again. 复活节定在每年春分月圆之后第一个星期日举行。因为春分之后日照时间比较长&am…

《菲波那契凤尾》:菲波那契数列,返回最后6位

目录 一、题目 二、思路 1、斐波那契数列 2、返回最后6位 三、代码 详细注释版本&#xff1a; 简化注释版本&#xff1a; 一、题目 菲波那契凤尾 题目链接&#xff1a;菲波那契凤尾 NowCoder号称自己已经记住了1-100000之间所有的斐波那契数。为了考验他&#xff0c…

【实用篇】SpringCloud+RabbitMQ+Docker+Redis+搜索+分布式,系统详解springcloud分布式

文章目录一、服务拆分1.1 服务拆分Demo1.2 微服务远程调用二、Eureka2.1 Eureka原理2.2 Eureka-server服务搭建2.3 eureka-client服务注册2.4 eureka-client服务复制2.5 eureka服务发现三、Ribbon负载均衡3.1 负载均衡原理3.2 负载均衡策略3.3 自定义负载均衡策略3.4 饥饿加载与…

每日学术速递4.12

CV - 计算机视觉 | ML - 机器学习 | RL - 强化学习 | NLP 自然语言处理 Subjects: cs.HC 随着新的“生成代理”论文的发布&#xff0c;LLM刚刚达到了一个重要的里程碑——通过使用 LLM&#xff0c;生成代理能够在受《模拟人生》启发的交互式沙箱中模拟类人行为。代理架构扩展…

Vue的过滤器、内置指令和生命周期【Vue】

Vue 1. 收集表单数据 收集表单数据&#xff1a; 若&#xff1a;< input type“text”/ >&#xff0c;则v-model收集的是value值&#xff0c;用户输入的就是value值。 若&#xff1a;< input type“radio”/ >&#xff0c;则v-model收集的是value值&#xff0c;且…

NIFI大数据进阶_离线同步MySql数据到HDFS_说明操作步骤---大数据之Nifi工作笔记0028

然后我们看如何把mysql中的数据,实时的同步到hdfs中去 准备工作首先,创建一个mysql表,然后启动hadoop集群 处理器我们需要这些处理器,首先通过querydatabasetable处理器,查询mysql中的数据,然后,把mysql中的数据,导入到 convertavrotojson处理器,注意querydatabasetable处理…

C语言结构体练习:【通讯录(静态数组简易版)的实现】

全文目录&#x1f600; 前言&#x1f914; 模块和功能划分&#x1f928; 数据类型的选择&#x1f62e; 功能序号类型 enum&#x1f62e; 个人信息类型 PeoInfo&#x1f62e; 通讯录类型 Contact&#x1f635;‍&#x1f4ab; 功能的实现&#x1f644; 初始化通讯录 InitContact…

MYSQL命令大全(详细版)

&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了 博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳,欢迎大佬指点!人生格言&#xff1a;当你的才华撑不起你的野心的时候,你就应该静下心来学习! 欢迎志同道合的朋友一起加油喔&#x1f9be;&am…

【快乐手撕LeetCode题解系列】——移除链表元素

【【快乐手撕LeetCode题解系列】——移除链表元素&#x1f60e;前言&#x1f64c;删除有序数组中的重复项&#x1f64c;解法一&#xff1a;画图分析&#xff1a;&#x1f60d;思路分析&#xff1a;&#x1f60d;源代码分享&#xff1a;&#x1f60d;解法二&#xff1a;画图分析…

Vite打包后直接使用浏览器打开,显示空白问题

vite打包后&#xff0c;直接用浏览器打开显示空白 1.需求&#xff1a; 安卓webview等浏览器直接打开文件显示 2.原因 &#xff08;1&#xff09;资源路径错误&#xff1a; vite.config.js 配置 base: “./” &#xff08;在webpack中则配置publicPath: "./"即可…

Windows ActiveMQ安装

Windows ActiveMQ 安装 ActiveMQ下载&#xff1a;ActiveMQ Java JDK下载&#xff1a;Java JDK20 下载完成之后解压ZIP安装包&#xff0c;解压后得到如下文件夹 找到根目录下/bin/win64/activemq.bat 双击打开&#xff0c;如果提示权限记得使用管理员身份 。 如提示Unable to e…

学习数据结构第5天(线性表的链式表示)

线性表的链式表示单链表的定义单链表的基本操作双链表双链表的基本操作循环链表块状链表存储结构顺序表和链表的比较线性表的顺序存储结构的特点是逻辑关系上相邻的两个数据元素在物理位置上也是相邻的。我们会发现虽然顺序表的查询很快&#xff0c;时间复杂度为O(1)O(1)O(1),但…

Java EE企业级应用开发(SSM)第5章

第5章Spring MVC入门一.预习笔记 1.Spring MVC是所有使用OOP编程语言都应该遵守的规范 2.Spring MVC的特点 强大的灵活性、非侵入性和可配置性 提供了一个前端控制器DispatcherServlet&#xff0c;开发者无须额外开发控制器对象 分工明确&#xff0c;每一个功能由一个专门…

日前、日内两阶段需求响应热电综合能源联合调度研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…