1、功能说明:
在Flink 自定义源算子中封装jdbc来读取MySQL中的数据
2、代码示例
Flink版本说明:flink_1.13.0、scala_2.12
自定义Source算子,这里我们继承RichParallelSourceFunction,因为要使用open方法来初始化数据库连接对象
Tips:这种实现方式为可并行算子,当并行度>1时,每个并行任务都会读取相同的数据,使用的时候需要注意
package com.baidu.bean
case class User(id: Long, name: String)
class MysqlSource extends RichParallelSourceFunction[User] {
// 定义 Connection、PreparedStatement对象
var connection: Connection = null
var ps: PreparedStatement = null
// 函数初始化方法,常用来初始化资源对象,常用来做一次性的设置
// 当 MysqlSource对象被创建时,调用一次
override def open(parameters: Configuration): Unit = {
// 初始化 Connection、PreparedStatement对象
// 加载数据库驱动
Class.forName("com.mysql.jdbc.Driver")
// 获取连接
connection = DriverManager.getConnection("jdbc:mysql://worker01/flink", "root", "worker123")
// 读取user表
ps = connection.prepareStatement("select * from user")
}
override def run(ctx: SourceFunction.SourceContext[User]): Unit = {
// 执行查询操作,获取查询结果
val resultSet = ps.executeQuery()
// 将查询结果封装到user对象
while (resultSet.next()) {
val user = User(resultSet.getLong(1),
resultSet.getString(2))
ctx.collect(user)
}
}
// 关闭连接资源
override def cancel(): Unit = {
connection.close()
ps.close()
}
}
使用 MysqlSource 来读取数据(作为有界流来处理):
test("使用 自定义Source算子,读取mysql数据") {
// 1. 获取流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 将 自定义数据源 作为数据源
val ds: DataStream[User] = env.addSource(new MysqlSource).setParallelism(4)
// 3. 打印DataStream
ds.print()
// 4. 出发程序执行
env.execute()
}
执行结果: