一、自定义DataSource
自定义DataSource有两大类:单线程的DataSource和多线程的DataSource
-
单线程:继承 SourceFunction
-
多线程:继承 ParallelSourceFunction,继承 RichParallelSourceFunction(可以有其他的很多操作)
import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, RichParallelSourceFunction, SourceFunction} //1. 单线程 class MyNoParallelSource1 extends SourceFunction[Long] { var count = 1L; var isRunning = true override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while(isRunning) { ctx.collect(count) count += 1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } } //2. 多线程 class MyNoParallelSource2 extends ParallelSourceFunction[Long] { var count = 1L var isRunning = true override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while(isRunning) { ctx.collect(count) count += 1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } } /**3. 多线程使用RichFunction的方式 * 提供了open和close方法,可以用于打开和释放资源 */ class MyNoParallelSource3 extends RichParallelSourceFunction[Long] { var count = 1 var isRunning = true override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while (isRunning) { ctx.collect(count) count += 1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } override def open(parameters: Configuration): Unit = super.open(parameters) override def close(): Unit = super.close() }