6.3、Flink数据写入到MySQL

news2025/1/22 21:05:18

目录

1、添加POM依赖

2、这一个完整的案例

3、何时批量写入MySQL呢?

4、容错性的保证(精确一次&至少一次)

4.1、 至少一次

4.2、精确一次


1、添加POM依赖

Apache Flink 集成了通用的 JDBC 连接器,使用时需要根据生产环境的版本引入相应的依赖

官网链接:官网

<!-- 引入 flink jdbc连接器 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>1.17-SNAPSHOT</version>
</dependency>

<!-- 引入 mysql数据库的驱动 -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>

2、这一个完整的案例

开发语言:java1.8

flink版本:flink1.17.0

package com.baidu.datastream.sink;

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;

// 将 Flink数据写入到 MySQL
/*
* TODO Step1、在 开启socket服务,输入下列数据
*       1,红楼梦,曹雪芹,19.8,1
*       1,红楼梦,曹雪芹,19.8,2
*       1,红楼梦,曹雪芹,19.8,3
*       1,红楼梦,曹雪芹,19.8,4
*       1,红楼梦,曹雪芹,19.8,5
*       1,红楼梦,曹雪芹,19.8,6
*
* TODO Step2、MySQL book表DDL
*       create table books(id int, title varchar(99), author varchar(99), price double, qty int);
*
* */
public class SinkMySQL {
    public static void main(String[] args) throws Exception {
        // 1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 开启checkpoint后,会触发 数据写入MySQL操作
        //env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);

        // 2.将socket作为数据源(开启socket端口: nc -lk 9999)
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 9999);

        // 3.初始化 JdbcSink实例
        SinkFunction<String> jdbcSink = JdbcSink.sink(
                // TODO 1、指定要执行的SQL
                "insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
                // TODO 2、指定 将 dataStream数据 封装到 SQL的占位符中
                new JdbcStatementBuilder<String>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, String s) throws SQLException {
                        int id = Integer.parseInt(s.split(",")[0]);
                        String title = s.split(",")[1];
                        String author = s.split(",")[2];
                        double price = Double.parseDouble(s.split(",")[3]);
                        int qty = Integer.parseInt(s.split(",")[4]);

                        preparedStatement.setInt(1, id);
                        preparedStatement.setString(2, title);
                        preparedStatement.setString(3, author);
                        preparedStatement.setDouble(4, price);
                        preparedStatement.setInt(5, qty);
                    }
                },
                // TODO 3、指定 批量写入MySQL大小和频率 (当满足 设置的批次或者提交时间时 会触发写入MySQL)
                //  重要:这里的设置非常重要,它控制着flink写入MySQL的延迟程度
                //  当不设置 JdbcExecutionOptions 时,将使用默认配置 (缓存5000条数据后触发写入操作)
                JdbcExecutionOptions.builder()
                        .withMaxRetries(3) // 插入发生异常重试次数 注意:只支持SQL Exception 异常及其子类异常重试(默认值为3)
                        .withBatchSize(1)  // 批量的大小:条数(默认值为5000)
                        .withBatchIntervalMs(2000) // 批次的时间间隔 (默认值0L,表示无限长的时间间隔)
                        .build(),
                // TODO 4、指定 MySQL的连接信息
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://worker01:3306/flink?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                        .withUsername("root")
                        .withPassword("gaocun123")
                        .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                        .build()
        );

        streamSource.addSink(jdbcSink);

        // 3.触发程序执行
        env.execute();
    }
}

3、何时批量写入MySQL呢?

可以通过 JdbcExecutionOptions 来控制写入MySQL的数据量和时间频率,这也决定了Flink写入MySQL的延迟程度。

JdbcExecutionOptions 三个常用的参数:

  • withMaxRetries(3) :插入操作发生异常时的重试次数
    • 注意:只支持SQL Exception 异常及其子类异常重试(默认值为3)
  • withBatchSize(100)  :    批量写入数据的条数(默认值为5000)   
  • withBatchIntervalMs(2000)  :  批量写入的时间间隔 (默认值0L,表示关闭定时写入)
触发批量写入的条件

实时写入MySQL应该如何配置:

// TODO 实时写入MySQL
JdbcExecutionOptions.builder()
    .withMaxRetries(3) // 插入发生异常重试次数 注意:只支持SQL Exception 异常及其子类异常重试(默认值为3)
    .withBatchSize(1)  // 批量的大小设置为1,表示产生一条数据就会被写入MySQL
    .withBatchIntervalMs(0) // 批次的时间间隔为0,表示关闭定时写入
    .build()

4、容错性的保证(精确一次&至少一次)

4.1、 至少一次

使用Flink提供的 JDBC Sink 能够保证至少一次的语义

注意:

        这里的至少一次的保证指的是 MySQL故障后,数据不会丢失

4.2、精确一次

       对于 JDBC Sink,例如 MySQL,要实现故障时的精确一次的保证通过 upsert 语句或幂等更新实现

MySQL 中常用的 upsert 语句:       

        在MySQL中,"upsert"是指一种操作,它根据一定的条件在表中插入新的行,或者如果已经存在满足条件的行,则更新这些行的数据

使用 upset语句的前提是:

        表具有唯一键(UNIQUE KEY)或主键(PRIMARY KEY),以便在插入行时进行冲突检测


DDL:

-- TODO 要想使用 upsert语句,表必须具有 PRIMARY KEY 或者 UNIQUE约束
create table books
(
    id     int PRIMARY KEY,
    title  varchar(99) UNIQUE,
    author varchar(99),
    price  double,
    qty    int
);

insert into 语句

功能:向 表中写入数据,如果 主键字段(id)或UNIQUE字段(title) 存在时,则插入数据失败

示例:

insert into books (id, title, author, price, qty) values (1,'红楼梦','红楼梦',19.9,1);


insert ignore 语句

功能: 向 表中写入数据,如果 主键字段(id)或UNIQUE字段(title) 存在时,则忽略这次插入行为

示例:

-- insert ignore 语句
insert ignore books (id, title, author, price, qty) values (1,'红楼梦','曹雪芹1',19.9,1);

replace into 语句 (更新所有字段)

功能:向 表中写入数据,如果 主键字段(id)或UNIQUE字段(title) 存在时,会删除原有数据,再将这次插入数据写入 

示例:

-- replace into 语句
replace into books (id, title, author, price, qty) values (1,'红楼梦','曹雪芹',19.8,2);


insert on duplicate key update 语句 (更新局部字段)

功能:向 表中写入数据,如果 主键字段(id)或UNIQUE字段(title) 存在时,只会对 on duplicate key update 后指定的字段进行更新

示例:

-- insert on duplicate key update 语句
insert ignore books (id, title, author, price, qty) values (1,'红楼梦','曹雪芹?',8.8,3)
on duplicate key update author = 'XueQinCao';
;

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1018665.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

哈夫曼编码原理及实现

文章目录 一.哈夫曼编码原理哈夫曼二叉树构建 二.具体代码实现 一.哈夫曼编码原理 哈夫曼编码&#xff08;Huffman Coding&#xff09;是一种用于数据压缩的编码方法&#xff0c;它通过给出不同的数据符号分配不同长度的编码&#xff0c;使得出现频率高的符号具有较短的编码&a…

MySql中分割字符串

MySql中分割字符串 在MySql中分割字符串可以用到SUBSTRING_INDEX&#xff08;str, delim, count&#xff09; 参数解说       解释 str         需要拆分的字符串 delim         分隔符&#xff0c;通过某字符进行拆分 count          当 count 为正数&…

Web 第一步:HTTP 协议(基础)

这里是JavaWeb的开头部分&#xff01;那么先解释一下吧&#xff1a; Web&#xff1a;全球广域网&#xff0c;也称为万维网&#xff08;www&#xff09;&#xff0c;能够通过浏览器访问的网站。 JavaWeb&#xff1a;是用Java技术来解决相关 Web 互联网领域的技术栈。 &#xf…

Java8中判断一个对象不为空存在一个类对象是哪个

Java8中判断一个对象不为空存在一个类对象是哪个&#xff1f; 在Java 8中&#xff0c;你可以使用java.util.Optional类来处理可能为空的对象。Optional类可以帮助你优雅地处理空值情况&#xff0c;而不需要显式地进行空值检查。 这是一个简单的Optional示例&#xff1a; imp…

蓝桥杯 题库 简单 每日十题 day3

01 约数个数 题目描述 本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 1200000 有多少个约数&#xff08;只计算正约数&#xff09;。 解题思路 枚举&#xff0c;从1开始一直到1200000本身都作为1200000的除数&#xff0c;…

操作系统03-IO设备管理

中断源指的就是中断请求&#xff0c;所谓中断字是指中断请求的编号。 内存不足时候&#xff0c;OS可以采取“内存紧缩”进行改善。 进程控制块PCB组织管理一般采用线性方式。 7个状态模型增加就绪挂起和阻塞挂起的状态&#xff1a;1提高效率 2 提供足够的内存 3有利于调试 生…

【电子学会】2023年05月图形化三级 -- 数星星

数星星 1. 准备工作 &#xff08;1&#xff09;删除默认角色小猫&#xff0c;添加角色Pico和Star&#xff1b; &#xff08;2&#xff09;添加背景&#xff1a;Stars。 2. 功能实现 &#xff08;1&#xff09;Pico位置在舞台左下角&#xff1b; &#xff08;2&#xff09;…

springboot和vue:三、web入门(spring-boot-starter- web+控制器+路由映射+参数传递)

spring-boot-starter- web Spring Boot将传统Web开发的mvc、json、tomcat等框架整合&#xff0c;提供了spring-boot-starter-web组件&#xff0c;简化了Web应用配置。创建SpringBoot项目勾选Spring Web选项后&#xff0c;会自动将spring-boot-starter- web组件加入到项目中。w…

Ubuntu 虚拟化中Android Studio 不支持HAXM(CPU不支持问题)

Ubuntu虚拟机中Android Studio virtual device 安装弹出如图1所示的内容时&#xff0c;解决办法就是VMware 中对处理器开启虚拟化引擎&#xff08;图2&#xff09;&#xff08;此方式是在物理设备CPU是Intel环境测试的&#xff0c;AMD的可以参考是否可行&#xff09; 查看我的文…

Python封装一个接收UDP组播的模块

Python封装一个可以接收UDP组播的类MulticastDataReceiver&#xff0c;在子线程中接收组播数据 # udp_multicast.py import socket import threading import timeclass MulticastDataReceiver:def __init__(self, multicast_group, multicast_port, dest_addr, dest_port):sel…

es小记(copy_to)

简单创建索引复制字段 1: 3个主分片,各自有一个副本,总分片数为 3*26; refresh_interval为刷新频率; 其他参数描述,转载自 PUT test1 { “settings”:{ “number_of_shards”: 1, “number_of_replicas”: 1, “refresh_interval”: “30s” }, “mappings”:{ “properties”…

C语言天花板——指针(初阶)

&#x1f320;&#x1f320;&#x1f320; 大家在刚刚接触C语言的时候就肯定听说过&#xff0c;指针的重要性以及难度等级&#xff0c;以至于经常“谈虎色变”&#xff0c;但是今天我来带大家走进指针的奇妙世界。&#x1f387;&#x1f387;&#x1f387; 一、什么是指针&…

11.外观模式

外观模式&#xff08;Facade&#xff09;&#xff0c;为子系统中的一组接口提供一个一致的界面&#xff0c;此模式定义了一个高层接口&#xff0c;这个接口使得这一子系统更加容易使用。 UML 测试代码 #include <iostream> using namespace std;class SubSystemOne { pu…

Mac专用投屏工具AirServer 7 .27 for Mac中文免费激活版

AirServer 7 .27 for Mac中文免费激活版是一款Mac专用投屏工具&#xff0c;能够通过本地网络将音频、照片、视频以及支持AirPlay功能的第三方App&#xff0c;从 iOS 设备无线传送到 Mac 电脑的屏幕上&#xff0c;把Mac变成一个AirPlay终端的实用工具。 目前最新的AirServer 7.2…

Linux关于memory cgroup的几个要点

概述 本文讲述memory cgroup比较容易误解的一些逻辑&#xff0c;如果不太经常使用和解决问题的话&#xff0c;对于memory cgroup的认知会比较浅显&#xff1a;cgroup memory用来限制进程的内存使用&#xff0c;但是我们进一步想如下的问题&#xff1a; 进程的内存可以分很多类…

「UG/NX」Block UI 指定坐标SpecifyCSYS

✨博客主页何曾参静谧的博客📌文章专栏「UG/NX」BlockUI集合📚全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C+&#

JavaScript学习记录 | DOM事件流 事件冒泡-事件捕获-事件委托

目录 DOM事件流常见面试题事件冒泡与事件捕获事件冒泡使用场景事件捕获使用场景事件冒泡和事件捕获区别 事件委托 - 利用事件冒泡机制事件委托应用场景支持事件委托的事件事件委托的优缺点 DOM事件流 DOM事件流的三个阶段&#xff1a;捕获阶段 -> 目标阶段 -> 冒泡阶段 …

9月13-14日上课内容 第三章 ELK日志分析系统及部署实例

本章结构 ELK日志分析系统简介 ELK日志分析系统分为 Elasticsearch Logstash Kibana 日志处理步骤 1.将日志进行集中化管理 2.将日志格式化(Logstash) 并输出到Elasticsearch 3.对格式化后的数据进行索引和存储 (Elasticsearch) 4.前端数据的展示(Kibana) Elasticsearch介…

Maven 工具学习笔记(基础)

Maven 是专门用于管理和构建Java项目的工具&#xff0c;其主要功能提供有&#xff1a; 标准化的项目结构&#xff08;在不同IDE之间其项目结构不一样&#xff0c;代表不能通用&#xff09;标准化的构建流程&#xff08;编译 ——> 测试 ——> 打包 ——> 发布...&…

epoll实现TCP的服务器与客户端通信

服务器&#xff1a; #include<myhead.h> #define IP "192.168.250.100" #define PORT 8888 /* typedef union epoll_data {void *ptr;int fd;uint32_t u32;uint64_t u64; } epoll_data_t;struct epoll_event {uint32_t events; …