SparkLaunch提交任务
- 1.提交Spark任务的方式
- 2.SparkLaunch 官方接口
- 3.任务提交流程及实战
1.提交Spark任务的方式
- 通过Spark-submit 提交任务
- 通过Yarn REST Api提交Spark任务
- 通过Spark Client Api 的方式提交任务
- 通过SparkLaunch 自带API提交任务
- 基于Livy的方式提交任务,可参考我的另一篇文章 Apache Livy 安装部署使用示例
上面的几种方式提交任务各自有对应的优缺点,不再进行赘述,下面要介绍的是通过SparkLaunch 的方式提交到集群中,并异步获取任务的执行状态进行更新到运行记录表中,从而实现Saprk任务的提交和状态获取。
2.SparkLaunch 官方接口
通过官方文档可以了解到SaprkLaunch 对应的方法:SparkLaunch
SparkLaunch 主要有两个接口:
- SparkAppHandle 主要负责Spark任务提交到集群上面
- SparkAppHandle.Listerner 主要是用来监控Spark的运行状态
可以查看SparkLaunch 类对应的方法主要用到的方法如下所示:
//设置配置文件地址
public SparkLauncher setPropertiesFile(String path);
//设置App 名称
public SparkLauncher setAppName(String appName);
//设置 Master
public SparkLauncher setMaster(String master);
//设置 部署模式
public SparkLauncher setDeployMode(String mode) ;
//设置 Jar包运行主类
public SparkLauncher setMainClass(String mainClass);
//设置 Spark 相关参数,需要以spark. 开头
public SparkLauncher addSparkArg(String name, String value);
//设置 Main函数的参数
public SparkLauncher addAppArgs(String... args);
// 启动Saprk任务的提交
public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners)
。。。。。。
上面的一些方法主要用于设置Spark所需要的相关参数,比如资源相关参数、Jar包相关参数、部署模式等,调用startApplication 方法会镇长的取创建一个任务提交的实现,通过下面的两个方法就能够使得任务可以正常提交到Yarn 上面
sparkLauncher.startApplication(new SparkAppHandle.Listener() {
// 任务运行状态改变的时候触发的操作
@Override
public void stateChanged(SparkAppHandle handle) {
}
// 日志状态出现改变的时候触发的操作
@Override
public void infoChanged(SparkAppHandle handle) {
}
});
3.任务提交流程及实战
总结上面的方法可以得到任务提交的总体流程:
- 创建SparkLaunch
- 初始化相关参数信息、资源参数、配置参数、Jar包参数
- 调用startApplication 启动任务的提交
- 调用stateChanged 捕获状态信息的改变并作出相应的操作
接下来给出完整的任务提交的相关伪代码:
/**
* 发起任务提交
*
* @param sparkApplicationParam Spark 相关配置参数
* @param otherConfigParams 其他配置参数
* @param mainParams main 方法配置参数
*/
private void launch(DmpRunInfo dmpRunInfo, SparkApplicationParam sparkApplicationParam, Map<String,
String> otherConfigParams, String[] mainParams) {
// 初始化SparkLauncher
SparkLauncher launcher = new SparkLauncher()
.setSparkHome(sparkApplicationParam.getSparkHome())
.setAppResource(sparkApplicationParam.getMainJarPath())
.setMainClass(sparkApplicationParam.getMainClass())
.setMaster(sparkApplicationParam.getMaster())
.setDeployMode(sparkApplicationParam.getDeployMode())
.setAppName(sparkApplicationParam.getAppName())
.setConf("spark.driver.memeory", sparkApplicationParam.getDriverMemory())
.setConf("spark.executor.memory", sparkApplicationParam.getExecutorMemory())
.setConf("spark.executor.cores", sparkApplicationParam.getExecutorCores())
// spark.yarn.archive 配置的HDFS地址
.setConf("spark.yarn.archive", SparkParamConstants.SPARK_YARN_ARCHIVE)
.setConf("spark.yarn.queue", SparkParamConstants.SPARK_PARAM_YARN_QUEUE)
.setVerbose(true);
// 禁用输出到本地日志方式
// .redirectError(new File(otherConfigParams.get("SPARK_ERROR_LOG_DIR")))
// .redirectOutput(new File(otherConfigParams.get("SPARK_OUT_LOG_DIR")))
/**
* 设置其他的参数时候需要使用[spark.] 开头的key ,否则spark 解析不出来
*/
if (otherConfigParams != null && otherConfigParams.size() > 0) {
logger.info("开始设置spark job 运行参数");
for (Map.Entry<String, String> conf : otherConfigParams.entrySet()) {
logger.info("{}:{}", conf.getKey(), conf.getValue());
launcher.setConf(conf.getKey(), conf.getValue());
}
}
if (mainParams.length != 0) {
logger.info("开始设置Spark Job Main 方法的参数 {}", Arrays.toString(mainParams));
launcher.addAppArgs(mainParams);
}
logger.info("参数设置完成,开始提交Spark任务");
// 线程池的方式运行任务
executor.execute(() -> {
try {
// 线程计数
CountDownLatch countDownLatch = new CountDownLatch(1);
SparkAppHandle sparkAppHandle = launcher.startApplication(new SparkAppHandle.Listener() {
@Override
public void stateChanged(SparkAppHandle handle) {
// 修改运行状态
。。。。。。。。。
if (handle.getAppId() != null) {
// 设置运行ID 到运行记录中
logger.info("{} stateChanged :{}", handle.getAppId(), handle.getState().toString());
} else {
logger.info("stateChanged :{}", handle.getState().toString());
}
// 更新状态
。。。。。。。。
// 失败告警发送到群功能
if (SparkAppHandle.State.FAILED.toString().equals(handle.getState().toString())) {
// 失败告警
。。。。。。。。。。。。
}
// Job 状态完成之后退出线程
if (handle.getState().isFinal()) {
countDownLatch.countDown();
}
}
@Override
public void infoChanged(SparkAppHandle handle) {
// do something
}
});
logger.info("The task is executing, current is get application id before,please wait ........");
String applicationId = null;
while (!SparkAppHandle.State.RUNNING.equals(sparkAppHandle.getState())) {
applicationId = sparkAppHandle.getAppId();
if (applicationId != null) {
logger.warn("handle current state is {}, appid is {}",
sparkAppHandle.getState().toString(), applicationId);
break;
}
}
logger.warn("handle current state is {}, appid is {}",
sparkAppHandle.getState().toString(), applicationId);
countDownLatch.await();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
});
}