【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka

news2024/10/5 13:17:58

【Flink-cdc-Mysql-To-Kafka】使用 Flinksql 利用集成的 connector 实现 Mysql 数据写入 Kafka

  • 1)环境准备
  • 2)准备相关 jar 包
  • 3)实现场景
  • 4)准备工作
    • 4.1.Mysql
    • 4.2.Kafka
  • 5)Flink-Sql
  • 6)验证

1)环境准备

Linux 或者 Windows 端需要安装:Mysql,Kafka,Flink 等。(略)

2)准备相关 jar 包

  • flink-connector-jdbc_2.11-1.12.0.jar
  • mysql-connector-java-5.1.49.jar

下载地址:JDBC-Sql-Connector

在这里插入图片描述

在这里插入图片描述

  • flink-format-changelog-json-1.2.0.jar
  • flink-sql-connector-mysql-cdc-1.2.0.jar
  • flink-sql-connector-postgres-cdc-1.2.0.jar

下载地址:ververica/flink-cdc-connectors

在这里插入图片描述

备用下载地址:gitee地址(github上不去就下载源码,改好version自己打包)

  • flink-sql-connector-kafka_2.11-1.12.0.jar

下载地址:flink-sql-connector-kafka

  • 将下载好的包放在 Flink 的 lib 目录下

3)实现场景

1、首先确认MySQL是否开启binlog机制,log_bin = ON 为开启 (如下图)

在这里插入图片描述

2、如果是本地环境的 Mysql 按照下面方式开启 binlog

在 C:\ProgramData\MySQL\MySQL Server 5.7\my.ini 下添加

log_bin = mysql-bin
binlog_format = ROW
expire_logs_days = 30

3、重启 Mysql 服务

4)准备工作

4.1.Mysql

1、在 Mysql 中创建 source 表:

CREATE TABLE `mysql2kafka_cdc_test` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `eventId` varchar(255) DEFAULT NULL,
  `eventStDt` varchar(255) DEFAULT NULL,
  `bak6` varchar(255) DEFAULT NULL,
  `bak7` varchar(255) DEFAULT NULL,
  `businessId` varchar(255) DEFAULT NULL,
  `phone` varchar(255) DEFAULT NULL,
  `bak1` varchar(255) DEFAULT NULL,
  `bak2` varchar(255) DEFAULT NULL,
  `bak13` varchar(255) DEFAULT NULL,
  `bak14` varchar(255) DEFAULT NULL,
  `bak11` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8

2、写入数据的语句准备就绪

INSERT INTO mysql2kafka_cdc_test(
eventId,
eventStDt,
bak6,
bak7,
businessId,
phone,
bak1,
bak2,
bak13,
bak14,
bak11
) VALUES(
'111',
'2022-11-3023:37:49',
'测试',
'https://test?user',
'1727980911111111111111111111',
'12345678910',
'1234',
'2021-12-0100:00:00',
'1727980911111111111111111111',
'APP',
'TEST1'
);

4.2.Kafka

创建 Topic

5)Flink-Sql

  • source
set table.dynamic-table-options.enabled=true;
set table.exec.source.cdc-events-duplicate=true;

CREATE TABLE source_mysql_test(
  id INT,
  eventId STRING,
  eventStDt STRING,
  bak6 STRING,
  bak7 STRING,
  businessId STRING,
  phone STRING,
  bak1 STRING,
  bak2 STRING,
  bak13 STRING,
  bak14 STRING,
  bak11 STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH(
    'connector' = 'mysql-cdc',
    'hostname' = '${ip}',
    'port' = '${port}',
    'database-name' = 'test',
    'table-name' = 'mysql2kafka_cdc_test',
    'username' = '${username}',
    'password' = '${password}',
    'scan.startup.mode'='timestamp',
    'scan.startup.timestamp-millis' = '1692115200000'
);
  • sink
CREATE TABLE sink_kafka_test (
  id INT,
  eventId STRING,
  eventStDt STRING,
  bak6 STRING,
  bak7 STRING,
  businessId STRING,
  phone STRING,
  bak1 STRING,
  bak2 STRING,
  bak13 STRING,
  bak14 STRING,
  bak11 STRING,
  PRIMARY KEY (id) NOT ENFORCED
 ) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'test',
    'sink.parallelism' = '3',
    'key.format' = 'json',
    'value.format' = 'json',
    'properties.bootstrap.servers' = '${kafka-bootstrap-server}',
    'properties.security.protocol' = 'SASL_PLAINTEXT',
    'properties.sasl.kerberos.service.name' = 'kafka',
    'metadata.max.age.ms' = '300000'
);
  • insert
insert into sink_kafka_test select * from source_mysql_test;

6)验证

Mysql 中写入测试数据,Kafka-Topic 中观察是否有数据生成。

INSERT INTO mysql2kafka_cdc_test(
eventId,
eventStDt,
bak6,
bak7,
businessId,
phone,
bak1,
bak2,
bak13,
bak14,
bak11
) VALUES(
'111',
'2022-11-3023:37:49',
'测试',
'https://test?user',
'1727980911111111111111111111',
'12345678910',
'1234',
'2021-12-0100:00:00',
'1727980911111111111111111111',
'APP',
'TEST1'
);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1314775.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

EAM系统在地铁设备管理中的应用

在现代城市的交通系统中,地铁作为一种高效、快速、可靠的公共交通工具,扮演着至关重要的角色。为了确保地铁系统的正常运行和可靠性,地铁管理部门需要有效地管理大量的设备和设施。在这个过程中,企业资产管理(EAM&…

虚幻学习笔记13—C++静态和动态加载

一、前言 我们在蓝图中可以很方便的添加各种需要的组件,那么在C代码中要如何实现呢。在代码中分静态和动态加载,而无论静态和动态,加载的内容有资源和资源类,资源类通常为带资源的蓝图类。 二、实现 在实现静态或动态加载时&…

mybatis-plus雪花算法自动生成ID到前端后精度丢失问题

问题发生 前端接收到后端的数据出现异常,异常如下: 如图这是后端正常返回的数据, 但是点击预览时发现这个id的数据被改变了 这就导致了我通过id去修改相关数据时无法成功 问题原因 id的长度过长(19位),前…

针对网页html中插入动图gif不能循环播放只播放一次的解决方案

针对网页html中插入动图gif不能循环播放只播放一次的解决方案 原因分析解决方案 原因分析 使用图片编辑软件制作的过程中未启用“循环播放”功能,这里以Photoshop为例,演示设置GIF图片循环播放的操作流程:所需材料:PS。第一步&am…

云仓酒庄为您挑选意大利葡萄酒

作为世界产酒大国之一,意大利葡萄酒种类也是纷繁多样,赢得了众多葡萄酒爱好者的喜爱。说意大利葡萄酒地位仅次法国也不为过。那么,云仓酒庄的品牌雷盛红酒分享有没有一些挑选意大利葡萄酒的方法和技巧呢? 意大利的酒也有几千万种…

基于springboot乐器视频学习网站设计与实现

项目描述 临近学期结束,还是毕业设计,你还在做java程序网络编程,期末作业,老师的作业要求觉得大了吗?不知道毕业设计该怎么办?网页功能的数量是否太多?没有合适的类型或系统?等等。你想解决的问题,今天给大家介绍…

ES-脚本

脚本 简单使用 POST product/_update/2 {"script": {"source": "ctx._source.salary1" #将薪水字段的值 1} }预定义变量 POST product/_update/2 {"script": {"lang": "painless","source": "…

办公技巧:分享五个在线画图工具,值得收藏

目录 1. processon ​编辑 2. visual paradigm online 3. zen flowchart 4. draw io 5. Excalidraw 今天小编给大家分享五个在线画图工具,感兴趣的可以下载试一试! 1. processon 说流程图除了必提http://draw.io,processon也必须要有…

【深度学习】机器学习概述(一)机器学习三要素——模型、学习准则、优化算法

​ 文章目录 一、基本概念二、机器学习的三要素1. 模型a. 线性模型b. 非线性模型 2. 学习准则a. 损失函数1. 0-1损失函数2. 平方损失函数(回归问题)3. 交叉熵损失函数(Cross-Entropy Loss)4. Hinge 损失函数 b. 风险最小化准则1.…

如何在 Windows 10/11 上恢复已删除的 Word 文档

意外删除重要的 Word 文档可能会令人心碎。当文件恰好非常重要时尤其如此。关键数据的丢失可能会导致沮丧和恐慌,因为数小时的辛勤工作和有价值的信息似乎消失得无影无踪。然而,在您屈服于绝望之前,有个好消息。 有多种技术和工具可帮助您恢…

【AI美图】第04期效果图,AI人工智能无绘画,精选五组特写版美图欣赏

标题好看的照片让AI实现 也许你不相信吧,这么秀的照片居然全部是电脑合成AI妙手,一键打造,速度之快,效果之好,任谁都不愿意相信,然后在今天这一切都是现实的,现在让我们从照片的拍摄谈谈什么样的…

准备迎接超级人工智能系统,OpenAI宣布RLHF即将终结!超级对齐技术将接任RLHF,保证超级人工智能系统遵循人类的意志

本文原文来自DataLearnerAI: 准备迎接超级人工智能系统,OpenAI宣布RLHF即将终结!超级对齐技术将接任RLHF,保证超级人工智能系统遵循人类的意志 | 数据学习者官方网站(Datalearner)https://www.datalearner.com/blog/105170265526…

RocketMQ —消费重试

消费者出现异常,消费某条消息失败时, Apache RocketMQ 会根据消费重试策略重新投递该消息进行故障恢复。本文介绍消费重试机制的原理、版本兼容性和使用建议。 一、应用场景​ Apache RocketMQ 的消费重试主要解决的是业务处理逻辑失败导致的消费完整性…

EasyExcel读取Excel数据(含多种方式)

目录 EasyExcel简介 使用EasyExcel进行读数据 引入依赖: EasyExcel提供了两种读取模式 使用 监听器 读取模式 1.创建一个实体类 2.创建监听器 代码 使用 同步读 读取模式 1.创建一个实体类 2.代码 添加导入数据库的逻辑 其实官方文档讲得很清楚&#xff…

git 的使用

git reset详解-CSDN博客 git reset 命令详解 git revert命令详解。-CSDN博客 关于Git分支中HEAD和Master的理解 - 知乎 (zhihu.com) 一文带你精通 Git(Git 安装与使用、Git 命令精讲、项目的推送与克隆)-CSDN博客 Git 常用操作(5&#xff…

【Linux】tree命令使用

tree命令 tree命令用于以树状图列出目录的内容。 语法 tree [参数] [目录] tree 命令 -Linux手册页 bash: tree: 未找到命令... 安装tree yum -y install tree如果你系统中有安装tree 但是还是执行找不到该命令的话,那原因就是:环境变量错误&#x…

智能优化算法应用:基于热交换算法3D无线传感器网络(WSN)覆盖优化 - 附代码

智能优化算法应用:基于热交换算法3D无线传感器网络(WSN)覆盖优化 - 附代码 文章目录 智能优化算法应用:基于热交换算法3D无线传感器网络(WSN)覆盖优化 - 附代码1.无线传感网络节点模型2.覆盖数学模型及分析3.热交换算法4.实验参数设定5.算法结果6.参考文…

【数字电路】MacBook使用iverilog进行数字电路仿真

安装流程 在终端中用brew包管理工具进行安装仿真工具: 编译verilog代码: brew install icarus-verilog编译verilog代码: brew install verilatorMacOS系统显示UNIX GUI brew install xquartz可视化仿真波形图: brew install gtk…

2024年天津体育学院专升本专业课网上报名确认缴费安排

天津体育学院2024年高职升本科专业考试报名安排 一、时间安排 1.报名时间:2023年12月19日9:00-12月21日17:00 2.缴费时间:2023年12月26日-27日 (考试考务费:体育教育专业:160元/人&#xff…

项目中使用Arrays.asList、ArrayList.subList的坑

使用Arrays.asList的注意事项 1.1 可能会踩的坑 先来看下Arrays.asList的使用&#xff1a; List<Integer> statusList Arrays.asList(1, 2); System.out.println(statusList); System.out.println(statusList.contains(1)); System.out.println(statusList.contains(3)…