一、概述
SeaTunnel是一个简单易用的数据集成框架,在企业中,由于开发时间或开发部门不通用,往往有多个异构的、运行在不同的软硬件平台上的信息系统同时运行。数据集成是把不同来源、格式、特点性质的数据在逻辑上或物理上有机地集中,从而为企业提供全面的数据共享。SeaTunnel支持海量数据的实时同步。它每天可以稳定高效地同步数百亿数据。
SeaTunnel适用于以下场景 | SeaTunnel的特点 |
|
|
二、工作原理
以官方案例为例, 通过使用bin/start-seatunnel-flink.sh脚本来提交Flink任务,脚本内容如下:
set -eu
# resolve links - $0 may be a softlink
PRG="$0"
while [ -h "$PRG" ] ; do
# shellcheck disable=SC2006
ls=`ls -ld "$PRG"`
# shellcheck disable=SC2006
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
# shellcheck disable=SC2006
PRG=`dirname "$PRG"`/"$link"
fi
done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
APP_JAR=${APP_DIR}/starter/seatunnel-flink-13-starter.jar
APP_MAIN="org.apache.seatunnel.core.starter.flink.FlinkStarter"
if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
. "${CONF_DIR}/seatunnel-env.sh"
fi
if [ $# == 0 ]
then
args="-h"
else
args=$@
fi
set +u
# Log4j2 Config
if [ -e "${CONF_DIR}/log4j2.properties" ]; then
JAVA_OPTS="${JAVA_OPTS} -Dlog4j2.configurationFile=${CONF_DIR}/log4j2.properties"
JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.path=${APP_DIR}/logs"
JAVA_OPTS="${JAVA_OPTS} -Dseatunnel.logs.file_name=seatunnel-flink-starter"
fi
CLASS_PATH=${APP_DIR}/starter/logging/*:${APP_JAR}
CMD=$(java ${JAVA_OPTS} -cp ${CLASS_PATH} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
if [ ${EXIT_CODE} -eq 234 ]; then
# print usage
echo "${CMD}"
exit 0
elif [ ${EXIT_CODE} -eq 0 ]; then
echo "Execute SeaTunnel Flink Job: $(echo "${CMD}" | tail -n 1)"
eval $(echo "${CMD}" | tail -n 1)
else
echo "${CMD}"
exit ${EXIT_CODE}
fi
其中,比较重要的两条命令为:
APP_MAIN="org.apache.seatunnel.core.starter.flink.FlinkStarter"
CMD=$(java ${JAVA_OPTS} -cp ${CLASS_PATH} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
SeaTunnel通过脚本去执行了seatunnel-core-flink.jar并且入口类为org.apache.seatunnel.core.flink.FlinkStarter,我们接下来移步源码来看这个FlinkStarter类。
public class FlinkStarter implements Starter {
private static final String APP_NAME = SeatunnelFlink.class.getName();
public static final String APP_JAR_NAME = "seatunnel-flink-starter.jar";
/**
* SeaTunnel parameters, used by SeaTunnel application. e.g. `-c config.conf`
*/
private final FlinkCommandArgs flinkCommandArgs;
/**
* SeaTunnel flink job jar.
*/
private final String appJar;
FlinkStarter(String[] args) {
this.flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), StarterConstant.SHELL_NAME, true);
// set the deployment mode, used to get the job jar path.
Common.setDeployMode(flinkCommandArgs.getDeployMode());
Common.setStarter(true);
this.appJar = Common.appStarterDir().resolve(APP_JAR_NAME).toString();
}
@SuppressWarnings("checkstyle:RegexpSingleline")
public static void main(String[] args) {
FlinkStarter flinkStarter = new FlinkStarter(args);
System.out.println(String.join(" ", flinkStarter.buildCommands()));
}
@Override
public List<String> buildCommands() {
List<String> command = new ArrayList<>();
command.add("${FLINK_HOME}/bin/flink");
command.add(flinkCommandArgs.getRunMode().getMode());
command.addAll(flinkCommandArgs.getOriginalParameters());
command.add("-c");
command.add(APP_NAME);
command.add(appJar);
command.add("--config");
command.add(flinkCommandArgs.getConfigFile());
if (flinkCommandArgs.isCheckConfig()) {
command.add("--check");
}
//set job name
command.add("-Dpipeline.name=" + flinkCommandArgs.getJobName());
// set System properties
flinkCommandArgs.getVariables().stream()
.filter(Objects::nonNull)
.map(String::trim)
.forEach(variable -> command.add("-D" + variable));
return command;
}
}
在FlinkStarter类中,通过buildCommands()构建Flink提交任务命令。至此,可以大概推断出来SeaTunnel的执行逻辑,即将自己封装为一个Jar包提交Flink执行,在执行时,SeaTunnel根据用户编写的conf来装填对应的Source、Transform、Sink插件,最终将拼好任务后提交给Flink构建StreamGraph和JobGraph。
FlinkStarter类中,将要执行的jar的main设置为SeatunnelFlink.class,该类主要完成flinkcommand构建和运行。
public class SeatunnelFlink {
public static void main(String[] args) throws CommandException {
FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parse(args, new FlinkCommandArgs(), StarterConstant.SHELL_NAME, true);
Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
.buildCommand(flinkCommandArgs);
Seatunnel.run(flinkCommand);
}
}
CommandLineUtils类中实现了所有实现AbstractCommandArgs接口的对象的参数解析
public static <T extends AbstractCommandArgs> T parse(String[] args, T obj, String programName, boolean acceptUnknownOptions) {
JCommander jCommander = JCommander.newBuilder()
.programName(programName)
.addObject(obj)
.acceptUnknownOptions(acceptUnknownOptions)
.build();
try {
jCommander.parse(args);
// The args is not belongs to SeaTunnel, add into engine original parameters
obj.setOriginalParameters(jCommander.getUnknownOptions());
} catch (ParameterException e) {
System.err.println(e.getLocalizedMessage());
exit(jCommander);
}
if (obj.isHelp()) {
exit(jCommander);
}
return obj;
}
Seatunnel.run方法主要负责提交command执行,而command作为接口,包含七个实现类,其中flink相关的,包含 FlinkApiConfValidateCommand (校验配置参数是否规范)和FlinkApiTaskExecuteCommand (真正构建执行任务)
public static <T extends CommandArgs> void run(Command<T> command) throws CommandException {
try {
command.execute();
} catch (ConfigRuntimeException e) {
showConfigError(e);
throw e;
} catch (Exception e) {
showFatalError(e);
throw e;
}
}
FlinkApiConfValidateCommand类主要通过获取config配置信息,FlinkApiTaskExecuteCommand类主要负责任务执行。该类中通过构建FlinkExecution对象执行任务,FlinkExecution对象是TaskExecution接口的具体实现类。构造函数通过传入config,初始化对应jarpath、env,source、transform、sink。其中source、transform、sink对应为PluginExecuteProcessor 接口的对应实现类对象。
public class FlinkApiTaskExecuteCommand implements Command<FlinkCommandArgs> {
private final FlinkCommandArgs flinkCommandArgs;
public FlinkApiTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
this.flinkCommandArgs = flinkCommandArgs;
}
@Override
public void execute() throws CommandExecuteException {
Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
checkConfigExist(configFile);
Config config = new ConfigBuilder(configFile).getConfig();
FlinkExecution seaTunnelTaskExecution = new FlinkExecution(config);
try {
seaTunnelTaskExecution.execute();
} catch (Exception e) {
throw new CommandExecuteException("Flink job executed failed", e);
}
}
}
FlinkApiConfValidateCommand类主要通过获取config配置信息,然后构建FlinkExecution对象,FlinkExecution对象是TaskExecution接口的具体实现类。构造函数通过传入config,初始化对应jarpath、env,source、transform、sink。其中source、transform、sink对应为PluginExecuteProcessor 接口的对应实现类对象。
private final FlinkEnvironment flinkEnvironment;
private final PluginExecuteProcessor sourcePluginExecuteProcessor;
private final PluginExecuteProcessor transformPluginExecuteProcessor;
private final PluginExecuteProcessor sinkPluginExecuteProcessor;
private final List<URL> jarPaths;
public FlinkExecution(Config config) {
try {
jarPaths = new ArrayList<>(Collections.singletonList(
new File(Common.appStarterDir().resolve(FlinkStarter.APP_JAR_NAME).toString()).toURI().toURL()));
} catch (MalformedURLException e) {
throw new SeaTunnelException("load flink starter error.", e);
}
registerPlugin(config.getConfig("env"));
JobContext jobContext = new JobContext();
jobContext.setJobMode(FlinkEnvironmentFactory.getJobMode(config));
this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(jarPaths, config.getConfigList(Constants.SOURCE), jobContext);
this.transformPluginExecuteProcessor = new TransformExecuteProcessor(jarPaths,
TypesafeConfigUtils.getConfigList(config, Constants.TRANSFORM, Collections.emptyList()), jobContext);
this.sinkPluginExecuteProcessor = new SinkExecuteProcessor(jarPaths, config.getConfigList(Constants.SINK), jobContext);
this.flinkEnvironment = new FlinkEnvironmentFactory(this.registerPlugin(config, jarPaths)).getEnvironment();
this.sourcePluginExecuteProcessor.setFlinkEnvironment(flinkEnvironment);
this.transformPluginExecuteProcessor.setFlinkEnvironment(flinkEnvironment);
this.sinkPluginExecuteProcessor.setFlinkEnvironment(flinkEnvironment);
}
FlinkExecution对象构建完成后,执行execute方法,分别对sourcePluginExecuteProcessor、transformPluginExecuteProcessor、sinkPluginExecuteProcessor执行execute方法。其中,FlinkEnvironment和SparkEnvironment一样,是对Flink和Spark中执行上下文的封装。
@Override
public void execute() throws TaskExecuteException {
List<DataStream<Row>> dataStreams = new ArrayList<>();
dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
sinkPluginExecuteProcessor.execute(dataStreams);
log.info("Flink Execution Plan:{}", flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
try {
flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
} catch (Exception e) {
throw new TaskExecuteException("Execute Flink job error", e);
}
}
sourcePluginExecuteProcessor中execute方法通过addsource获取带有Schema的DataStreamSource<Row>,最终返回DataStream<Row>的列表。
@Override
public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) {
StreamExecutionEnvironment executionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
List<DataStream<Row>> sources = new ArrayList<>();
for (int i = 0; i < plugins.size(); i++) {
//插件方式,动态引入数据源
SeaTunnelSource internalSource = plugins.get(i);
BaseSeaTunnelSourceFunction sourceFunction;
//检查数据源是否支持协同
if (internalSource instanceof SupportCoordinate) {
sourceFunction = new SeaTunnelCoordinatedSource(internalSource);
} else {
sourceFunction = new SeaTunnelParallelSource(internalSource);
}
DataStreamSource<Row> sourceStream = addSource(executionEnvironment,
sourceFunction,
"SeaTunnel " + internalSource.getClass().getSimpleName(),
internalSource.getBoundedness() == org.apache.seatunnel.api.source.Boundedness.BOUNDED);
Config pluginConfig = pluginConfigs.get(i);
if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
int parallelism = pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
sourceStream.setParallelism(parallelism);
}
// 将处理后的数据注册为表
registerResultTable(pluginConfig, sourceStream);
sources.add(sourceStream);
}
return sources;
}
private DataStreamSource<Row> addSource(
final StreamExecutionEnvironment streamEnv,
final BaseSeaTunnelSourceFunction function,
final String sourceName,
boolean bounded) {
checkNotNull(function);
checkNotNull(sourceName);
checkNotNull(bounded);
TypeInformation<Row> resolvedTypeInfo = function.getProducedType();
boolean isParallel = function instanceof ParallelSourceFunction;
streamEnv.clean(function);
final StreamSource<Row, ?> sourceOperator = new StreamSource<>(function);
return new DataStreamSource<>(streamEnv, resolvedTypeInfo, sourceOperator, isParallel, sourceName, bounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED);
}
transformPluginExecuteProcessor中execute方法首先判断transform插件是否为空,为空则代表无transform操作,根据transform插件依次对数据进行转换,最后将结果写入result返回。
@Override
public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws TaskExecuteException {
if (plugins.isEmpty()) {
return upstreamDataStreams;
}
DataStream<Row> input = upstreamDataStreams.get(0);
List<DataStream<Row>> result = new ArrayList<>();
for (int i = 0; i < plugins.size(); i++) {
try {
FlinkStreamTransform transform = plugins.get(i);
Config pluginConfig = pluginConfigs.get(i);
DataStream<Row> stream = fromSourceTable(pluginConfig).orElse(input);
input = transform.processStream(flinkEnvironment, stream);
registerResultTable(pluginConfig, input);
transform.registerFunction(flinkEnvironment);
result.add(input);
} catch (Exception e) {
throw new TaskExecuteException(
String.format("SeaTunnel transform task: %s execute error", plugins.get(i).getPluginName()), e);
}
}
return result;
}
SinkExecuteProcessor中execute方法同理,最终通过stream.sinkTO将结果写入对于sink端,同时根据sink端配置信息,设置是否并行。
@Override
public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) throws TaskExecuteException {
DataStream<Row> input = upstreamDataStreams.get(0);
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable> seaTunnelSink = plugins.get(i);
DataStream<Row> stream = fromSourceTable(sinkConfig).orElse(input);
seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert(stream.getType()));
DataStreamSink<Row> dataStreamSink = stream.sinkTo(new FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
int parallelism = sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
}
}
// the sink is the last stream
return null;
}
三、源码补充
Flink DataSource
在sourcePluginExecuteProcessor的execute方法中,创建了一个名为sourcefunction的BaseSeaTunnelFunction对象,该类继承至抽象类RichSourceFunction,并实现了CheckpointListener、CheckpointedFunction、ResultTypeQueryable<Row>三个接口。其中RichSourceFunction抽象类又继承至抽象类AbstractRichFunction,并实现SourcFunction接口。AbstractRichFunction的父类是抽象类RichFunction,该类是用户自定义function的基类 。
sourcePluginExecuteProcessor的execute中通过addSource方法将一个StreamFunction封装为StreamSource,并最终返回一个DataStreamSource对象,DataStreamSource表示DataStream的起点,是SingleOutputStreamOperator的子类,超级父类为DataStream,所有的数据源都会被包装为DataStreamSource。通过addSource方法添加具有自定义类型信息的数据源到计算任务中。用户自定义数据源则通过实现SourceFunction接口,该接口供两个方法:
run | 执行方法,实现读取数据的实际操作 |
cancel | 取消函数,用于取消/关闭连接使用 |
SourceFunction定义的数据为非并行的,实现ParallelSourceFunction接口或继承RichParallelSourceFunction来定义并行的source。SeaTunnel中SeaTunnelParallelSource实现ParallelSourceFunction接口提供并行数据源。
seatunnel Source
seatunnel ExecuteProcessor