2.3 如何使用FlinkSQL读取写入到JDBC(MySQL)

news2025/1/12 4:02:51

1、JDBC SQL 连接器

FlinkSQL允许使用 JDBC连接器,向任意类型的关系型数据库读取或者写入数据

添加Maven依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-jdbc</artifactId>
  <version>3.1.0-1.17</version>
</dependency>

注意:如果使用 sql-client客户端,需保证 flink-1.17.1/lib 目录下 存在相应的jar包

 相关jar可以通过官网下载:JDBC SQL 连接器 


2、读取 MySQL

FlinkSQL读取MySQL表时,为批式处理,在流式计算任务中,通常被做维表来使用

-- 在FlinkSQL中创建 MySQL Source 表
drop table mysql_source_table;
CREATE TABLE mysql_source_table (
  `id` INT,
  `title` STRING,
  `author` STRING,
  `price` DOUBLE,
  `qty` INT
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://worker01/flink',
   'driver' = 'com.mysql.jdbc.Driver',  -- 【可选】不设置时,将自动从url中推导
   'username' = 'xxxx',
   'password' = 'xxxx',
   'table-name' = 'books'
);

-- 批式 sql,查看 JDBC 表中的数据
select * from mysql_source_table;

运行结果:


3、写入MySQL

3.1 何时批量写入MySQL呢?

FlinkSQL往MySQL写入数据时,默认会在客户端缓存数据,当触发设置的阈值后,才会向服务端发送数据

开启checkpoint :

# TODO 开启checkpoint,当checkpoint后,会触发jdbc的flush操作
set execution.checkpointing.interval=300sec;

设置 flush 前缓存记录的最大值 、flush 间隔时间:

-- TODO 创建sink mysql table
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (
  `id` INT,
  `title` STRING,
  `author` STRING,
  `price` DOUBLE,
  `qty` INT
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',
   'username' = 'xxxx',
   'password' = 'xxxx',
   'table-name' = 'books',
   'sink.buffer-flush.max-rows' = '100', -- flush 前缓存记录的最大值,默认值为100,设置为0时,表示不缓存数据(来一条写入一条)
   'sink.buffer-flush.interval' = '50s' -- flush 间隔时间,超过该时间后异步线程将 flush 数据。默认为1s
);

使用说明:

FLinkSQL写入MySQL时,常通过 sink.buffer-flush.max-rows、sink.buffer-flush.interval 来控制写入数据的延迟程度

        当 对写入实时性要求较高时,可以将 sink.buffer-flush.max-rows = 0 ,表示到来一条数据后立即写入MySQL,但带来的后果是 长时间占有mysql连接

        当 数据量大且对实时要求不高时,可根据业务需求调大配置,可使实时行和性能最优


3.2 sink mysql table 中主键的作用

在FLinkSQL中创建sink mysql table时,如果表中定义了主键,则连接器将以 upsert 模式工作

否则连接器将以 append 模式工作

         upsert 模式:Flink 将根据主键判断插入新行或者更新已存在的行

                               使用这种模式时,确保MySQL中的底表定义主键和添加唯一性约束

       append 模式:对MySQL库中底表做insert操作

 upsert 模式:

-- TODO 创建MySQL 表
CREATE TABLE `books` (
  `id` int(11) NOT NULL,
  `title` varchar(99) DEFAULT NULL,
  `author` varchar(99) DEFAULT NULL,
  `price` double DEFAULT NULL,
  `qty` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (
  `id` INT,
  `title` STRING,
  `author` STRING,
  `price` DOUBLE,
  `qty` INT,
  PRIMARY KEY (id) NOT ENFORCED -- 指定主键字段
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',
   'username' = 'root',
   'password' = 'xxxx',
   'table-name' = 'books',
   'sink.buffer-flush.max-rows' = '0' -- 实时写入
);

-- TODO 往 mysql中写入数据(相同key的数据写入后,会做upsert操作)
insert into mysql_sink_table
SELECT * FROM (VALUES
  (5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);

append 模式:

-- TODO 创建FLinkSQL表(sink mysql table)
drop table mysql_sink_table;
CREATE TABLE mysql_sink_table (
  `id` INT,
  `title` STRING,
  `author` STRING,
  `price` DOUBLE,
  `qty` INT
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8',
   'username' = 'root',
   'password' = 'xxx',
   'table-name' = 'books',
   'sink.buffer-flush.max-rows' = '0' -- 实时写入
);

-- TODO 往 mysql中写入数据(相同key的数据写入后,会做操作)
insert into mysql_sink_table
SELECT * FROM (VALUES
  (5,'A Dream in Red Mansions','y', 3.0,1)
, (6,'Journey to the West','y', 3.0,1)
, (7,'Water Margin','y', 3.0,1)
) AS books (id, title,author,price,qty);

注意:使用 append模式时,如果MySQL底表中存在主键或唯一性约束时,INSERT 插入可能会失败

insert into 失败:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1109028.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

基于RuoYi-Flowable-Plus的若依ruoyi-nbcio支持自定义业务表单流程的集成方法与步骤(一)

更多ruoyi-nbcio功能请看演示系统 gitee源代码地址 前后端代码&#xff1a; https://gitee.com/nbacheng/ruoyi-nbcio 演示地址&#xff1a;RuoYi-Nbcio后台管理系统 由于大家最自定义业务表单的整个集成方法还不熟悉&#xff0c;下面大概介绍一下这个流程与方法。 1、首先…

顿号在键盘上怎么打?教你4个输入方法!

“朋友们&#xff0c;我正在准备一篇期末论文&#xff0c;但是文章里的顿号我一直输入不了。顿号在键盘上应该怎么输入呀&#xff1f;谁能教教我呢&#xff1f;非常感谢&#xff01;” 在使用电脑编辑文档时&#xff0c;我们可能经常需要输入顿号。但有些朋友还不知道顿号在键盘…

Java 关键字:synchronized详解

synchronized详解 基本使用源码解析常见面试题好书推荐 基本使用 Java中的synchronized关键字用于在多线程环境下确保数据同步。它可以用来修饰方法和代码块 当一个线程访问一个对象的synchronized方法或代码块时&#xff0c;其他线程将无法访问该对象的其他synchronized方法或…

ppt录屏怎么导出来?学会这个,让分享更容易

ppt已经成为了日常工作与学习中必不可少的工具&#xff0c;而ppt屏幕录制功能&#xff0c;可以方便用户将他人的演讲或视频中的内容记录下来&#xff0c;以便进一步学习与研究。录制ppt演示并将其导出为视频文件&#xff0c;可以帮助我们进行分享&#xff0c;但是很多人不知道p…

el-upload实现上传文件夹

背景&#xff1a;如图一所示&#xff0c;最下面有一个黄色上传文件按钮&#xff0c;为手动上传而且上传区域有上传文件和上传文件夹的区分 所以需要在点击了上传文件夹做特殊处理使得el-upload可以上传文件夹 一、template区域 <el-uploadclass"upload-file"dra…

Prometheus metrics数据抓取解析

Prometheus node的监控数据如链接展示&#xff0c;我们希望能更加方便的看到监控数据&#xff0c;shodan对Prometheus metrics 的数据做了格式化处理。172.96.3.215:9100/metricshttp://172.96.3.215:9100/metrics 本文我自己实现了一个命令行工具&#xff0c;可以输出类shodan…

STR时,android发生了什么(一)

在QA的基线中&#xff0c;触发android进入STR流程的方式是向qvm注入power key 按下松开的操作(对于单android的基线&#xff0c;我的理解方式应该也是相同的&#xff0c;都是模拟了power key的按下松开操作&#xff09;。 这个按键操作会通过virtio上报到VHAL层&#xff08;下…

用CSS+SVG做一个优雅的环形进度条

开门见山 先上最终效果图&#xff1a;&#xff08;Demo 传送门&#xff09; 其中进度、尺寸、环宽和颜色都可以非常方便地进行控制。 核心原理&#xff1a; 利用两个重叠的圆环形&#xff0c;通过对上层圆环弧长的控制来表示进度&#xff0c;下层圆环则作为辅助&#xff0c;…

59 分割等和子集

分割等和子集 NP 完全问题&#xff08;01背包&#xff09;题解1 二维DP题解2 空间优化DP&#xff08;改为1D&#xff09; 给你一个只包含正整数的非空数组 nums 。请你判断是否可以将这个数组分割成两个子集&#xff0c;使得两个子集的元素和相等。 示例 1&#xff1a; 输入&a…

Ubuntu - 安装Java 11

在Ubuntu上安装Java 11的最简单方法是通过使用OpenJDK 11。以下是安装Java 11的步骤&#xff1a; 打开终端。 更新包列表&#xff0c;以确保获取最新的软件信息&#xff1a; sudo apt update 安装OpenJDK 11&#xff1a; sudo apt install openjdk-11-jdk 安装后&#xf…

独家原创,改进的智能优化算法

↖加关注这种话银家怎么好意思说出口嘛-- 声明&#xff1a;对于作者的原创代码&#xff0c;禁止转售倒卖&#xff0c;违者必究&#xff01; 以下代码均为作者独家原创&#xff0c;大家可以拿走去水水论文。 有一些质量较高的代码&#xff0c;可以发一些高质量的期刊&#xff0c…

LoogArch 指令集学习

1 SoC_Lite片上系统结构 mycpu和dram、confreg之间有一个“一分二”部件。这是因为在LoongArch指令系统架构下&#xff0c;所有I/O设备的寄存器都是采用memory mapped方式访问的。我们这里实现的confreg也不例外。Memory mapped的访问方式意味I/O设备中的寄存器各自都有一个唯一…

【大数据】Kafka 入门简介

Kafka 入门简介 1.什么是 Kafka2.Kafka 的基本概念3.Kafka 分布式架构4.配置单机版 Kafka4.1 下载并解压包4.2 启动 Kafka4.3 创建 Topic4.4 向 Topic 中发送消息4.5 从 Topic 中消费消息 5.实验5.1 实验一&#xff1a;Python 实现生产者消费者5.2 实验二&#xff1a;消费组实现…

防蓝光护眼灯有用吗?教你认识防蓝光护眼台灯

要不是亲眼所见&#xff0c;真的很难想象一个台灯用处如此大&#xff0c;护眼效果非常明显。说起来很久没有用过护眼灯具了&#xff0c;这次用过之后有着明显的反差&#xff0c;如果能给孩子用&#xff0c;那将大大保障了孩子的用眼、护眼问题。我自己是用来睡前看书的&#xf…

Hadoop集群资源管理器-YARN

1.YARN 简介 Apache YARN (Yet Another Resource Negotiator) 是 hadoop 2.0 引入的集群资源管理系统。用户可以将各种服务框架部署在 YARN 上,由 YARN 进行统一地管理和资源分配。 2.YARN架构

欧科云链研究院:人类或将成为仅次于AI第二聪明物种?Web3不允许

出品&#xff5c;欧科云链研究院 在 AI行业“掘金买铲”的英伟达&#xff0c;60%的红杉投资在AI相关领域&#xff0c;之前只专注Web3的顶级VC&#xff0c;Paradigm 正在从转向人工智能等 "前沿 "技术。 资本的追逐让AI迷人且危险。 OKG RESEARCH IN FT AI教父Geoffre…

Docker 容器化(初学者的分享)

目录 一、什么是docker 二、docker的缺陷 三、简单的操作 一、首先配置一台虚拟机 二、安装Docker-CE 一、安装utils 二、 将 Docker 的软件源添加到 CentOS 的 yum 仓库中。这样可以通过 yum 命令来安装、更新和管理 Docker 相关的软件包。 三、将 download.docker.co…

苹果QQ聊天记录导出的3个方法你知道吗?

QQ功能强大、方便快捷&#xff0c;获得众多小伙伴的喜爱。在QQ中最重要的东西莫过于聊天记录。QQ聊天记录中包含了我们日常的点点滴滴、工作业务往来&#xff0c;以及一些重要的文件等等。 有时候&#xff0c;你可能希望将聊天记录导出以节省手机空间&#xff0c;或者拿来留作…

【广州华锐互动】VR高层小区安全疏散演练系统

在今天的高科技时代&#xff0c;虚拟现实&#xff08;VR&#xff09;技术已经被广泛应用到各个领域&#xff0c;包括教育和培训。由广州华锐互动定制开发的VR高层小区安全疏散演练系统&#xff0c;开始在房地产行业中崭露头角。这种系统通过模拟真实的紧急情况&#xff0c;帮助…

在博物馆建设方案中,如何考虑功能展项区域的扩展和灵活性?

对于博物馆设计来说&#xff0c;空间内功能展项区域的分布非常重要&#xff0c;关乎展厅内容呈现的视觉效果以及观感体验&#xff0c;我们在了解到博物馆建设主题之后&#xff0c;就需要对其功能展区进行划分&#xff0c;不同类型的分区功能展示&#xff0c;也会影响到用户的参…