这里介绍一下Flink Sink中jdbc sink的使用方法,以mysql
为例
- 代码
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import java.sql.PreparedStatement;
import java.sql.SQLException;
/**
* @Author: J
* @Version: 1.0
* @CreateTime: 2023/8/2
* @Description: 测试
**/
public class FlinkJdbcSink {
public static void main(String[] args) throws Exception {
// 构建流环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 这里使用的是自定义数据源CustomizeBean(name,age,gender,hobbit),为了方便测试,换成任何数据源都可,只要和最后的要写入的表结构匹配即可
DataStreamSource<CustomizeBean> customizeSource = env.addSource(new CustomizeSource());
// 构建jdbc sink
SinkFunction<CustomizeBean> jdbcSink = JdbcSink.sink(
"insert into t_user(`name`, `age`, `gender`, `hobbit`) values(?, ?, ?, ?)", // 数据插入sql语句
new JdbcStatementBuilder<CustomizeBean>() {
@Override
public void accept(PreparedStatement pStmt, CustomizeBean customizeBean) throws SQLException {
pStmt.setString(1, customizeBean.getName());
pStmt.setInt(2, customizeBean.getAge());
pStmt.setString(3, customizeBean.getGender());
pStmt.setString(4, customizeBean.getHobbit());
}
}, // 字段映射配置,这部分就和常规的java api差不多了
JdbcExecutionOptions.builder()
.withBatchSize(10) // 批次大小,条数
.withBatchIntervalMs(5000) // 批次最大等待时间
.withMaxRetries(1) // 重复次数
.build(), // 写入参数配置
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withDriverName("com.mysql.jdbc.Driver")
.withUrl("jdbc:mysql://lx01:3306/test_db?useSSL=false")
.withUsername("root")
.withPassword("password")
.build() // jdbc信息配置
);
// 添加jdbc sink
customizeSource.addSink(jdbcSink);
env.execute();
}
}
- pom依赖
<!-- 在原有的依赖中加入下面两个内容 -->
<!-- JDBC connector -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- mysql驱动 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
- 结果
jdbc sink的具体使用方式大概就这些内容,还是比较简单的,具体应用还要结合实际业务场景.