Flink之JDBC Sink

news2025/1/10 20:43:59

这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql为例

  • 代码
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

import java.sql.PreparedStatement;
import java.sql.SQLException;

/**
 * @Author: J
 * @Version: 1.0
 * @CreateTime: 2023/8/2
 * @Description: 测试
 **/
public class FlinkJdbcSink {
    public static void main(String[] args) throws Exception {
        // 构建流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可
        DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());
        // 构建jdbc sink
        SinkFunction<CustomizeBean> jdbcSink = JdbcSink.sink(
                "insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句
                new JdbcStatementBuilder<CustomizeBean>() {
                    @Override
                    public void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException {
                        pStmt.setString(1, customizeBean.getName());
                        pStmt.setInt(2, customizeBean.getAge());
                        pStmt.setString(3, customizeBean.getGender());
                        pStmt.setString(4, customizeBean.getHobbit());
                    }
                }, // 字段映射配置,这部分就和常规的java api差不多了
                JdbcExecutionOptions.builder()
                        .withBatchSize(10) // 批次大小,条数
                        .withBatchIntervalMs(5000) // 批次最大等待时间
                        .withMaxRetries(1) // 重复次数
                        .build(), // 写入参数配置
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false")
                        .withUsername("root")
                        .withPassword("password")
                        .build() // jdbc信息配置
        );
        // 添加jdbc sink
        customizeSource.addSink(jdbcSink);
        env.execute();
    }
}
  • pom依赖
        <!-- 在原有的依赖中加入下面两个内容 -->
        
        <!-- JDBC connector -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- mysql驱动 -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>
  • 结果
    在这里插入图片描述
    jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/826850.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

027 - avg()函数

定义&#xff1a; AVG 函数返回数值列的平均值。NULL 值不包括在计算中。 语法&#xff1a; SELECT AVG(column_name) FROM table_name -- 实际操作&#xff08;计算salary的平均值&#xff09;&#xff1a; SELECT avg(salary) AS "平均工资" FROM employee ; --…

300行代码实现简易Spring框架

源码地址 该简易Spring框架实现的功能有 容器启动包扫描单例、多例BeanBean的创建依赖注入Aware回调函数初始化后处理器AOP 目录结构如下&#xff0c;service包为模拟业务逻辑&#xff0c;spring包为spring的实现&#xff08;核心&#xff09;&#xff0c;其中ApplicationCo…

MySQL数据库中.frm和.myi和.myd和.ibd文件是什么文件?

mysql 数据库 存储引擎是myisam, 在data目录下会看到3类文件&#xff1a;.frm、.myi、.myd &#xff08;1&#xff09;.frm–表定义&#xff0c;是描述表结构的文件。 &#xff08;2&#xff09;.MYD–"D"数据信息文件&#xff0c;是表的数据文件。 &#xff08;3&am…

mfc程序发布时带上必要的dll

mfc在开发机器上&#xff0c;运行时没问题的&#xff0c; 但如果到其他windows机器运行会报错&#xff0c;提示几个dll库文件找不到 如何处理&#xff1f; 不要慌&#xff0c;问题不大&#xff0c;不要盲目去下载dll&#xff0c;或到c盘windows下找&#xff0c; 完全没必要…

浅析大数据时代下的视频技术发展趋势以及AI加持下视频场景应用

视频技术的发展可以追溯到19世纪初期的早期实验。到20世纪初期&#xff0c;电视技术的发明和普及促进了视频技术的进一步发展。 1&#xff09;数字化&#xff1a;数字化技术的发明和发展使得视频技术更加先进。数字电视信号具有更高的清晰度和更大的带宽&#xff0c;可以更快地…

WebServer项目

web服务器是IO密集型的任务&#xff1a;> CPU个数 有限状态机来更高效地处理状态的转移&#xff1a; 【差不多捋顺&#xff0c;按模块自己写出文字讲解&#xff0c;讲出优化思路优化的效果&#xff0c;瓶颈是啥解决策略是啥】 【redis 如何实现】【innodb底层如何实现】【e…

物理分代垃圾回收器

内存结构 内存分配 堆上分配 大多数情况在eden【年轻代中的一个区域】上分配&#xff0c;偶尔会直接在old【老年代】上分配&#xff0c;细节取决于GC的实现。栈上分配&#xff08;发生了指针逃逸&#xff0c;又叫指针逃逸分析——JVM优化&#xff09; 原子类型的局部变量。 G…

【Linux命令200例】tee将输入内容输出到屏幕和文件

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;全栈领域新星创作者✌&#xff0c;阿里云社区专家博主&#xff0c;2023年6月csdn上海赛道top4。 &#x1f3c6;本文已收录于专栏&#xff1a;Linux命令大全。 &#x1f3c6;本专栏我们会通过具体的系统的命令讲解加上鲜…

Dockerfile构建nginx镜像(编译安装)

Dockerfile构建nginx镜像 1、建立工作目录 [rootdocker ~]# mkdir nginx [rootdocker ~]# cd nginx/ 2、编写Dockerfile文件 [rootdocker nginx]# vim run.sh [rootdocker nginx]# vim Dockerfile #基于的基础镜像 FROM centos:7#镜像作者信息 MAINTAINER Crushlinux <…

【iOS】json数据解析以及简单的网络数据请求

文章目录 前言一、json数据解析二、简单的网络数据请求三、实现访问API得到网络数据总结 前言 近期写完了暑假最后一个任务——天气预报&#xff0c;在里面用到了简单的网络数据请求以及json数据的解析&#xff0c;特此记录博客总结 一、json数据解析 JSON是一种轻量级的数据…

《Spring Boot源码解读与原理分析》书籍推荐

Spring Boot是目前Java EE开发中颇受欢迎的框架之一。依托于底层Spring Framework的基础支撑&#xff0c;以及完善强大的特性设计&#xff0c;Spring Boot已成为业界流行的应用和微服务开发基础框架。 《Spring Boot源码解读与原理分析》共14章&#xff0c;分为4个部分。第一部…

css white-space属性

先看不换行的效果&#xff0c;调用.space类 再来看使用 white-space:nowrap的效果 运行结果如下&#xff1a;

Spring Boot 集成 Thymeleaf 模板引擎

Spring Boot 集成 Thymeleaf 模板引擎 1. Thymeleaf 介绍 Thymeleaf 是适用于 Web 和独立环境的现代服务器端 Java 模板引擎。 Thymeleaf 的主要目标是为开发工作流程带来优雅的自然模板&#xff0c;既可以在浏览器中正确显示的 HTML&#xff0c;也可以用作静态原型&#xff…

【芯片设计- RTL 数字逻辑设计入门 3- Verdi 常用使用命令】

文章目录 Verdi 全局显示Verdi 前导 0 的显示Verdi 数据笔数统计Verdi 波形数据dump Verdi 全局显示 bsubi -n 16 -J sam visualizer -tracedir ./veloce.wave/debug_waveform.stw 打开波形后&#xff0c;如果想要看到所有信号的数据&#xff0c;可以点击下图中红框中的按钮&a…

DASCTF 2023 0X401七月暑期挑战赛web复现

目录 <1> Web (1) EzFlask(python原型链污染&flask-pin) (2) MyPicDisk(xpath注入&文件名注入) (3) ez_cms(pearcmd文件包含) (4) ez_py(django框架 session处pickle反序列化) <1> Web (1) EzFlask(python原型链污染&flask-pin) 进入题目 得到源…

系统接口自动化测试方案

XXX接口自动化测试方案 1、引言 1.1 文档版本 版本 作者 审批 备注 V1.0 XXXX 创建测试方案文档 1.2 项目情况 项目名称 XXX 项目版本 V1.0 项目经理 XX 测试人员 XXXXX&#xff0c;XXX 所属部门 XX 备注 1.3 文档目的 本文档主要用于指导XXX-Y…

C++ __builtin_popcount函数作用

__builtin_popcount函数是系统自带的一个返回值是int/long/long long二进制1的个数的函数。 对于int&#xff0c;long&#xff0c; long long分别用一下三种函数&#xff1a; __builtin_popcount(unsigned int n)//返回值为int __builtin_popcountl(unsigned int n)//返回值为…

【共享广告主】小白的创业项目:互联网广告,详细介绍

科思创业汇 大家好&#xff0c;这里是科思创业汇&#xff0c;一个轻资产创业孵化平台。赚钱的方式有很多种&#xff0c;我希望在科思创业汇能够给你带来最快乐的那一种&#xff01; 互联网广告做什么生意&#xff1f; 互联网广告行业面向哪些客户群体&#xff1f; 互联网广…

Anaconda创建虚拟环境

参考文章 1 Win10RTX3060配置CUDA等深度学习环境 pytorch 管网&#xff1a;PyTorch 一 进入 Anaconda 二 创建虚拟环境 conda create -n pytorch python3.9注意要注意断 VPN切换镜像&#xff1a; 移除原来的镜像 # 查看当前配置 conda config --show channels conda config…

leetcode 46. Permutations(排列)

返回数组nums中数字的所有可能的排列组合。 思路&#xff1a; 排列组合这种一般会想到DFS。 这个排列中每个数字只能用一次&#xff0c; 可用如下DFS流程 stack.push(num); dfs(nums, num); stack.pop();退出条件&#xff1a; 当stack的size和nums数组一样时&#xff0c;说…