Flink JdbcSink.sink源码解析及常见问题

news2025/1/10 17:11:37

文章目录

    • 源码
      • 入口
      • 我们看下flush方法干了什么
      • flush方法至此走完了,但是什么时机写入的数据呐?
      • 补充
      • 总结:
    • 常见问题
      • 1. 为什么会出现JdbcSink.sink方法插入Mysql无数据的情况?
      • 2. JdbcSink.sink写Phoenix无数据问题
    • 参考

基于Flink 1.14.4

源码

入口

public static <T> SinkFunction<T> sink(
            String sql,
            JdbcStatementBuilder<T> statementBuilder,
            JdbcExecutionOptions executionOptions,
            JdbcConnectionOptions connectionOptions) {
        return new GenericJdbcSinkFunction<>(
                new JdbcOutputFormat<>(         // 批量写出处理类
                        new SimpleJdbcConnectionProvider(connectionOptions), // JdbcConnectionOptions
                        executionOptions, // 执行参数 重试次数、批次大小、最大等待时间
                        context -> JdbcBatchStatementExecutor.simple(sql, statementBuilder, Function.identity()),
                        JdbcOutputFormat.RecordExtractor.identity()
                )
        );
    }

● GenericJdbcSinkFunction

public class GenericJdbcSinkFunction<T> extends RichSinkFunction<T>
        implements CheckpointedFunction, InputTypeConfigurable {
    private final JdbcOutputFormat<T, ?, ?> outputFormat;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        RuntimeContext ctx = getRuntimeContext();
        outputFormat.setRuntimeContext(ctx);
        // 1.调用 JdbcOutputFormat#open
        outputFormat.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
    }
            
	// 数据每来一条处理处理一条
    @Override
    public void invoke(T value, Context context) throws IOException {
        // 2.调用 JdbcOutputFormat#writeRecord
        outputFormat.writeRecord(value);
    }

    @Override
    public void initializeState(FunctionInitializationContext context) {}

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // 3.barrier到达 出发ck 时调用JdbcOutputFormat#flush
        outputFormat.flush();
    }

    @Override
    public void close() {
        outputFormat.close();
    }

● JdbcOutputFormat

@Override
    public void open(int taskNumber, int numTasks) throws IOException {
        try {
            connectionProvider.getOrEstablishConnection();
        } catch (Exception e) {
            throw new IOException("unable to open JDBC writer", e);
        }
        jdbcStatementExecutor = createAndOpenStatementExecutor(statementExecutorFactory);
        
        // todo 如果 配置输出到jdbc最小间隔不等于0 且最小条数不是1 就创建一个固定定时线程池
        if (executionOptions.getBatchIntervalMs() != 0 && executionOptions.getBatchSize() != 1) {
            this.scheduler =
                    Executors.newScheduledThreadPool(
                            1, new ExecutorThreadFactory("jdbc-upsert-output-format"));
            this.scheduledFuture =
                	// 周期调度执行器
                    this.scheduler.scheduleWithFixedDelay(
                            () -> {
                                synchronized (JdbcOutputFormat.this) {
                                    if (!closed) {
                                        try { 
                                            flush(); // 执行任务
                                        } catch (Exception e) {
                                            flushException = e;
                                        }
                                    }
                                }
                            },
                            executionOptions.getBatchIntervalMs(), // 用户设置的withBatchIntervalMs参数
                            executionOptions.getBatchIntervalMs(),
                            TimeUnit.MILLISECONDS);
        }
    }

● scheduleWithFixedDelay 说明:

Java中的 scheduleWithFixedDelay 是 java.util.concurrent.ScheduledExecutorService
接口的一个方法,它用于创建一个周期性执行任务的调度器

Runnable task = /* 你要执行的任务 */;

// 第一个参数为要执行的任务
// 第二个参数为初始延迟时间(单位为时间单位)
// 第三个参数为两次任务执行之间的延迟时间(单位为时间单位)
scheduler.scheduleWithFixedDelay(task, initialDelay, delay, TimeUnit.SECONDS)

小结:我们可以看出,程序会根据BatchIntervalMs、BatchSize设置的值,创建一个周期任务调度器,按照BatchIntervalMs执行flush任务。

我们看下flush方法干了什么

@Override
     // 同步方法
    public synchronized void flush() throws IOException {
        checkFlushException();

        for (int i = 0; i <= executionOptions.getMaxRetries(); i++) {
            try {
                attemptFlush(); // 调用attemptFlush
                batchCount = 0; // 初始值为0
                break;
            }
            ...
}

protected void attemptFlush() throws SQLException {
    // JdbcBatchStatementExecutor 为接口,得看他的实现类
        jdbcStatementExecutor.executeBatch(); 
}

● 实现类 SimpleBatchStatementExecutor

说到这个还得往上找,SinkFunction#sink方法,看那个lambda表达式

// SinkFunction<T> sink 中调用的
JdbcBatchStatementExecutor.simple(sql, statementBuilder, Function.identity()
                                  
   ->
                                  
return new SimpleBatchStatementExecutor<>(sql, paramSetter, valueTransformer);

   ->
// 初始化 SimpleBatchStatementExecutor 
 SimpleBatchStatementExecutor(
            String sql, JdbcStatementBuilder<V> statementBuilder, Function<T, V> valueTransformer) {
        this.sql = sql;
        this.parameterSetter = statementBuilder;
        this.valueTransformer = valueTransformer;
        this.batch = new ArrayList<>(); //空集合
    }

● SimpleBatchStatementExecutor#executeBatch

JdbcOutputFormat#attemptFlush 的实际执行方法
public void executeBatch() throws SQLException {
        if (!batch.isEmpty()) {
            // 这个batch实际是上面的ArrayList
            for (V r : batch) { 
                parameterSetter.accept(st, r);
                st.addBatch();
            }
            st.executeBatch(); //批量执行
            batch.clear(); // 清空这批数据
        }
    }

flush方法至此走完了,但是什么时机写入的数据呐?

我们看到GenericJdbcSinkFunction#invoke中调用了,JdbcOutputFormat#writeRecord来处理数据。

● writeRecord

@Override
    // 同步方法
    public final synchronized void writeRecord(In record) throws IOException {
        checkFlushException();

        try {
            In recordCopy = copyIfNecessary(record);
            // 数据添加进缓冲区
            addToBatch(record, jdbcRecordExtractor.apply(recordCopy));
            
            batchCount++; // 初始值为0
            // 这里有个情况,就是BatchSize = 1时,就会来一条写一条
            if (executionOptions.getBatchSize() > 0
                    && batchCount >= executionOptions.getBatchSize()) {
                flush(); // 调用flush 
            }
        } catch (Exception e) {
            throw new IOException("Writing records to JDBC failed.", e);
        }
    }

 protected void addToBatch(In original, JdbcIn extracted) throws SQLException {
         // 最终调用的是下面的
        jdbcStatementExecutor.addToBatch(extracted);
}

// SimpleBatchStatementExecutor的 addToBatch
 @Override
    public void addToBatch(T record) {
        // batch就是上面的ArrayList,往集合里面攒批
        batch.add(valueTransformer.apply(record));
}

补充

关于batchIntervalMs、batchSize、maxRetries三者的默认值,可以看JdbcExecutionOptions类

public class JdbcExecutionOptions implements Serializable {
    public static final int DEFAULT_MAX_RETRY_TIMES = 3;
    private static final int DEFAULT_INTERVAL_MILLIS = 0;
    public static final int DEFAULT_SIZE = 5000;

    private final long batchIntervalMs;
    private final int batchSize;
    private final int maxRetries;

    private JdbcExecutionOptions(long batchIntervalMs, int batchSize, int maxRetries) {
        Preconditions.checkArgument(maxRetries >= 0);
        this.batchIntervalMs = batchIntervalMs;
        this.batchSize = batchSize;
        this.maxRetries = maxRetries;
    }

总结:

sink方法,如果设置了JdbcExecutionOptions参数,batchIntervalMs != 0,大概流程图如下:

在这里插入图片描述

常见问题

1. 为什么会出现JdbcSink.sink方法插入Mysql无数据的情况?

原因:
batchSize默认大小是5000,数据量未达到或者未开启ck都有可能会导致数据"丢失"问题的。

解决:
batchSize设为1

2. JdbcSink.sink写Phoenix无数据问题

原因:
Phoenix默认手动管理commit,Phoenix使用commit()而不是executeBatch()来控制批量更新。看源码可以了解到,JdbcSink使用的是executeBatch(),未调用commit方法!

解决:

// 连接参数加 AutoCommit=true
"jdbc:phoenix:192.168.xx.xx:2181;AutoCommit=true"

参考

http://124.221.225.29/archives/flinkjdbcsink-shi-yong-ji-yuan-ma-jie-xi
https://blog.51cto.com/u_15064630/4148244
https://codeantenna.com/a/KBFBolba25

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/676992.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

设计模式之组合模式笔记

设计模式之组合模式笔记 说明Composite(组合)目录组合模式示例类图菜单组件抽象类菜单类菜单项类测试类 说明 记录下学习设计模式-组合模式的写法。JDK使用版本为1.8版本。 Composite(组合) 意图:将对象组合成树型结构以表示“部分-整体”的层次结构。Composite使得用户对单…

Linux网络-网络层IP协议

目录 IP协议 计算机网络分层 IP协议头格式 IP数据报 - 数据分片 数据报为什么要分片&#xff1f; 数据报分片是什么&#xff1f; 如何做到IP数据报分片&#xff1f; 分片demo示例 并不推荐分片&#xff0c;能不分片则不分片。 网段划分 前置了解 网络号和主机号 为…

如何监测和优化阿里云服务器的性能?有哪些性能分析工具和指标?

如何监测和优化阿里云服务器的性能&#xff1f;有哪些性能分析工具和指标&#xff1f;   阿里云服务器性能监测与优化是云计算服务中一个非常重要的环节。为了确保服务器稳定、高效地运行&#xff0c;我们需要对其性能进行监测&#xff0c;并在监测的基础上进行优化。本文将为…

Packet Tracer - 综合技能练习(配置 VLAN、中继、DHCP 服务器、DHCP 中继代理,并将路由器配置为 DHCP 客户端)

Packet Tracer - 综合技能练习 地址分配表 设备 接口 IP 地址 子网掩码 默认网关 R1 G0/0.10 172.31.10.1 255.255.255.224 不适用 G0/0.20 172.31.20.1 255.255.255.240 不适用 G0/0.30 172.31.30.1 255.255.255.128 不适用 G0/0.40 172.31.40.1 255.255…

MySQL权限控制及日志管理

MySQL权限控制及日志管理 用户权限管理 创建用户 CREATE USER 用户名IP地址 [ IDENTIFIED BY 密码 ]&#xff1b;GRANT SELECT ON *.* TO 用户名’IP地址’ IDENTIFIED BY "密码"&#xff1b;--创建一个用户名为Usr1 密码为 Usr1.mysql的用户 并授权 CREATE USER…

无忧行:突破网络封锁、跨境访问国外的网站和应用程序(安装注册及使用教程详解)

文章目录 步骤一&#xff1a;注册微软账号步骤二&#xff1a;修改账号的国家/地区步骤三&#xff1a;在Edge Dev浏览器中安装无忧行插件步骤四&#xff1a;创建 无忧行 账户步骤五&#xff1a;无忧行使用教程 包括注册微软账号、在Edge Dev浏览器中安装无忧行插件、创建 无忧行…

Python基础篇(六):组织管理代码—模块和包

组织管理代码—模块和包 前言模块(Module)创建模块使用模块 包(Package)创建包使用包 前言 在Python中&#xff0c;模块和包是组织和管理代码的重要概念。模块是一个包含 Python 定义和语句的文件&#xff0c;而包则是一组相关模块的目录。它们是组织和管理代码的强大工具&…

【跟小嘉学 Rust 编程】四、理解 Rust 的所有权概念

系列文章目录 【跟小嘉学 Rust 编程】一、Rust 编程基础 【跟小嘉学 Rust 编程】二、Rust 包管理工具使用 【跟小嘉学 Rust 编程】三、Rust 的基本程序概念 【跟小嘉学 Rust 编程】四、理解 Rust 的所有权概念 文章目录 系列文章目录前言一、所有权(Ownership)1.1.、所有权(Ow…

【unity每日一记】 Camera相机+ Screen屏幕+动画机

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;uni…

Flutter开发——图片加载与缓存源码解析

在Flutter中有个图片组件&#xff1a;Image,通常会使用它的Image.network(src)、Image.file(src)、Image.asset(src)来加载图片。 下面是Image的普通构造方法&#xff1a; const Image({super.key,required this.image,this.frameBuilder,this.loadingBuilder,this.errorBuilde…

第四章 机器学习

文章目录 第四章 决策树4.1基本流程4.2划分选择4.2.1信息增益4.2.2增益率4.2.3基尼指数 4.3剪枝处理4.3.1预剪枝4.3.2后剪枝 4.4连续与缺失值4.4.1连续值处理4.4.2缺失值处理 4.5多变量决策树 第四章 决策树 4.1基本流程 决策过程&#xff1a; 基本算法&#xff1a; 4.2划…

git——使用ssh连接远程仓库

文章目录 前言一. 获取邮箱和密码1. 本地配置你的名字和邮箱2. 使用命令获取你本地的邮箱和密码 二、生成ssh公钥1.任意一个文件夹路径打开Git Bash Here并输入以下命令连按三次回车2. 根据上面红框部分的地址打开文件夹3. 打开并查看id_rsa.pub 文件 三、在GitHub上连接ssh1. …

电商API知识点整理(一)商品采集接口获取商品详情数据API

商品采集接口背景 电商商品采集接口是一种机器人软件接口&#xff0c;用于从电子商务网站上爬取商品信息。它的主要作用是将电商网站上的商品信息采集和整合&#xff0c;方便用户使用。传统的商品采集需要人工收集和整理&#xff0c;工作量大、效率低&#xff1b;而电商商品采…

Flutter的文本、图片和按钮使用

像视图数据流转机制、底层渲染方案、视图更新策略等知识&#xff0c;都是构成一个UI框架的根本&#xff0c;看似枯燥&#xff0c;却往往具有最长久的生命力。 因此&#xff0c; 只有把这些最基础的知识弄明白&#xff0c;修好内功&#xff0c;才能触类旁通&#xff0c;由点及面…

输入阻抗、输出阻抗和阻抗匹配

读者问了一个问题&#xff1a;“集总参数电路中&#xff0c;阻抗匹配&#xff08;内阻外阻&#xff09;可以使负载得到最大的功率输出”这句话怎么理解&#xff1f; 这里涉及到几个概念&#xff1a;输入阻抗、输出阻抗、阻抗匹配&#xff0c;今天简单的聊一聊。 先了解一下阻…

用Visual Studio 2022写出你第一个Windows程序(程序保证能正常运行)

我是荔园微风&#xff0c;作为一名在IT界整整25年的老兵&#xff0c;今天来看看如何用Visual C写出你第一个Windows程序。 与其看很多Windows的书&#xff0c;不如先自己动手写一个Windows程序。由于Windows程序的特有机制&#xff0c;不建议去写那种简单的HELLO WORLD&#x…

【计算机网络详解】——网络层(学习笔记)

&#x1f4d6; 前言&#xff1a;网络层它承担着网络间的数据传输和路由选择等核心任务&#xff0c;通过在传输层协议的基础上添加了路由和转发等功能&#xff0c;使得数据能够在全球范围内的互联网中自由流动。在这篇博客中&#xff0c;我们将深入探讨网络层的工作原理和具体实…

D. Binary String Sorting(枚举位置)

Problem - 1809D - Codeforces 给定一个仅由字符0和/或1组成的二进制字符串s。 您可以对此字符串执行几个操作&#xff08;可能为零&#xff09;。有两种类型的操作&#xff1a; 选择两个相邻的元素并交换它们。为了执行此操作&#xff0c;您需要支付1012硬币&#xff1b; 选…

网络作业10【计算机网络】

网络作业10【计算机网络】 前言推荐网络作业10一. 单选题&#xff08;共13题&#xff0c;68.2分&#xff09;二. 多选题&#xff08;共4题&#xff0c;21.2分&#xff09;三. 阅读理解&#xff08;共2题&#xff0c;10.6分&#xff09; 练习5-39 最后 前言 2023-6-23 15:35:39…

MySQL ----主从复制、分离解析

文章目录 一、MySQL 主从复制1.1服务性能扩展方式1.2 MySQL的扩展什么是读写分离&#xff1f; 1.3为什么要读写分离呢&#xff1f;1.4什么时候要读写分离&#xff1f;1.5主从复制与读写分离1.6mysql支持的复制类型1.7主从复制的工作过程1.8MySQL 读写分离原理1.9目前较为常见的…