这篇文章为大家提供一种在多k8s集群中部署一套xxl-job的方案。
问题背景:
- 公司生产环境有多套k8s集群,为保证服务可用,容器需要部署到不同集群中。单集群中容器间可直接通过本地ip访问,跨集群容器间调用需通过宿主ip+映射端口访问。
- 由于k8s部署容器时不一定会在哪台宿主机,我们设定xxl-job执行器注册的ip为自动获取的本地ip,端口固定。admin使用的数据库为同一套,导致在跨集群情况下admin调度另一个k8s集群的executor会失败。
如下图:
目标:
- admin与executor能在所有集群中部署
- 不同集群中的admin能够调度所有的executor
- 保证任务调度策略(如轮询,第一个,故障转移,最近最久未使用,admin管理端指定ip调用)不受影响。
解决思路
在admin中添加调度代理接口,利用不同k8s集群配置的ip网段差异,在admin调度时判断与executor网段是否一致,如果一致则直接调度,不一致则将调度请求(通过调度代理接口)交给与executor同集群的admin,由同集群admin调用执行器。
具体开发:
- 在admin中创建调度代理接口
如上面架构图展示的,调度器通过http请求执行器接口完成调度,源码:
com.xxl.job.core.biz.client.ExecutorBizClient
public class ExecutorBizClient implements ExecutorBiz {
.......
private String addressUrl ;
private String accessToken;
private int timeout = 3;
@Override
public ReturnT<String> beat() {
return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, "", String.class);
}
@Override
public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam){
return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, idleBeatParam, String.class);
}
@Override
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
...........
所以先在xxl-job-admin中创建接收调度请求的代理接口:
package com.xxl.job.admin.controller;
@Controller
public class ProxyController {
@RequestMapping(value = "health",method = RequestMethod.POST)
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT health(@RequestHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN) String accessTokenReq,@RequestBody HealthParam healthParam) throws Exception {
String accessToken = Config().getAccessToken();//这里是获取配置的accesstoken,请根据您自身获取accesstoken的方式进行修改
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
if(StringUtils.isBlank(healthParam.getAddressUrl())) return new ReturnT<String>(ReturnT.FAIL_CODE, "The addressUrl is wrong.");
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(healthParam.getAddressUrl());
return executorBiz.health();
}
@RequestMapping(value = "beat",method = RequestMethod.POST)
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT beat(@RequestHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN) String accessTokenReq,@RequestBody BeatParam beatParam) throws Exception {
String accessToken = Config().getAccessToken();//这里是获取配置的accesstoken,请根据您自身获取accesstoken的方式进行修改
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
if(StringUtils.isBlank(beatParam.getAddressUrl())) return new ReturnT<String>(ReturnT.FAIL_CODE, "The addressUrl is wrong.");
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(beatParam.getAddressUrl());
return executorBiz.beat();
}
@RequestMapping(value = "idleBeat",method = RequestMethod.POST)
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT idleBeat(@RequestHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN) String accessTokenReq, @RequestBody IdleBeatParam idleBeatParam) throws Exception {
String accessToken = Config().getAccessToken();//这里是获取配置的accesstoken,请根据您自身获取accesstoken的方式进行修改
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
if(StringUtils.isBlank(idleBeatParam.getAddressUrl())) return new ReturnT<String>(ReturnT.FAIL_CODE, "The addressUrl is wrong.");
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(idleBeatParam.getAddressUrl());
return executorBiz.idleBeat(idleBeatParam);
}
@RequestMapping(value = "run",method = RequestMethod.POST)
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT run(@RequestHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN) String accessTokenReq,@RequestBody TriggerParam triggerParam) throws Exception {
String accessToken = Config().getAccessToken();//这里是获取配置的accesstoken,请根据您自身获取accesstoken的方式进行修改
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
if(StringUtils.isBlank(triggerParam.getAddressUrl())) return new ReturnT<String>(ReturnT.FAIL_CODE, "The addressUrl is wrong.");
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(triggerParam.getAddressUrl());
return executorBiz.run(triggerParam);
}
@RequestMapping(value = "kill",method = RequestMethod.POST)
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT kill(@RequestHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN) String accessTokenReq,@RequestBody KillParam killParam) throws Exception {
String accessToken = Config().getAccessToken();//这里是获取配置的accesstoken,请根据您自身获取accesstoken的方式进行修改
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
if(StringUtils.isBlank(killParam.getAddressUrl())) return new ReturnT<String>(ReturnT.FAIL_CODE, "The addressUrl is wrong.");
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(killParam.getAddressUrl());
return executorBiz.kill(killParam);
}
@RequestMapping(value = "log",method = RequestMethod.POST)
@ResponseBody
@PermissionLimit(limit = false)
public ReturnT log(@RequestHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN) String accessTokenReq,@RequestBody LogParam logParam) throws Exception {
String accessToken = Config().getAccessToken();//这里是获取配置的accesstoken,请根据您自身获取accesstoken的方式进行修改
if (accessToken != null
&& accessToken.trim().length() > 0
&& !accessToken.equals(accessTokenReq)) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
}
if(StringUtils.isBlank(logParam.getAddressUrl())) return new ReturnT<String>(ReturnT.FAIL_CODE, "The addressUrl is wrong.");
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(logParam.getAddressUrl());
return executorBiz.log(logParam);
}
}
其次我添加了一个请求参数的父类BaseParam,用于保存想要调度的executor ip。并让所有的请求参数继承自这个父类。
public class BaseParam {
private String addressUrl;
public String getAddressUrl() {
return addressUrl;
}
public void setAddressUrl(String addressUrl) {
this.addressUrl = addressUrl;
}
}
public class TriggerParam extends BaseParam implements Serializable{
.....
其次要注意,要将代理接口的登录校验、鉴权关闭。
- 修改获取执行器信息逻辑:
com.xxl.job.admin.core.trigger.XxlJobTrigger#runExecutor中获取调度器并调用执行器:
public class XxlJobTrigger {
.....
/**
* run executor
* @param triggerParam
* @param address
* @return
*/
public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);//获取执行器信息,我们需要修改这里的逻辑
runResult = executorBiz.run(triggerParam);//调用执行器
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
}
StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
runResultSB.append("<br>address:").append(address);
runResultSB.append("<br>code:").append(runResult.getCode());
runResultSB.append("<br>msg:").append(runResult.getMsg());
runResult.setMsg(runResultSB.toString());
return runResult;
}
}
原获取执行器的代码:
public static ExecutorBiz getExecutorBiz(String address) throws Exception {
// valid
if (address==null || address.trim().length()==0) {
return null;
}
// load-cache
address = address.trim();
ExecutorBiz executorBiz = executorBizRepository.get(address);
if (executorBiz != null) {
return executorBiz;
}
// set-cache
executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
executorBizRepository.put(address, executorBiz);
return executorBiz;
}
添加代理逻辑:
public static ExecutorBiz getExecutorBiz(String localAddressUrl) throws Exception {
// valid
if (localAddressUrl == null || localAddressUrl.trim().length() == 0) {
return null;
}
// load-cache
localAddressUrl = localAddressUrl.trim();
ExecutorBiz executorBiz = executorBizRepository.get(localAddressUrl);
if (executorBiz != null) {
return executorBiz;
}
// set-cache
//添加代理:
String addressUrl = getProxyAddress(localAddressUrl);
executorBiz = new ExecutorBizClient(addressUrl,localAddressUrl, JobAdminConfig.getAdminConfig().getAccessToken());
executorBizRepository.put(localAddressUrl, executorBiz);
return executorBiz;
}
/**
* 判断执行器与当前admin是否同一网段,是则原address返回,不是则返回代理(与目标执行器同网段)admin的k8s映射地址+端口地址
* @param address
* @return
* @throws UnknownHostException
*/
private static String getProxyAddress(String address) throws UnknownHostException {
InetAddress localhost = InetAddress.getLocalHost();
String hostAddress = localhost.getHostAddress();
String segment = address.split("\\.")[0].replace("http://", "");
Config config = Config().getConfig();//这里是获取不同集群网段内admin的映射地址配置,json格式字符串
String adminProxyAddress = config.getProperty("adminProxyAddress", "");
if(!(StringUtils.isAnyBlank(hostAddress,adminProxyAddress) || hostAddress.split("\\.")[0].equals(segment))){
JSONObject jsonObject = JSONObject.parseObject(adminProxyAddress);
String proxyAddress = jsonObject.getString(segment);
if(StringUtils.isNotBlank(proxyAddress)){
String[] split = proxyAddress.split(",");
Random random = new Random();
int randomNumber = random.nextInt(split.length);//集群内有多个容器,这里地址随机取
return split[randomNumber];
}
}
return address;
}
ExecutorBizClient中添加本地executor地址:localAddressUrl
public class ExecutorBizClient implements ExecutorBiz {
public ExecutorBizClient() {
}
public ExecutorBizClient(String addressUrl,String localAddressUrl,String accessToken) {
this.addressUrl = addressUrl;
this.accessToken = accessToken;
this.localAddressUrl = localAddressUrl;
// valid
if (!this.addressUrl.endsWith("/")) {
this.addressUrl = this.addressUrl + "/";
}
if (!this.localAddressUrl.endsWith("/")) {
this.localAddressUrl = this.localAddressUrl + "/";
}
}
//执行器调用地址,同一网段下addressUrl=localAddressUrl,不同网段下addressUrl为代理地址
private String addressUrl;
//同网段执行器调用地址localAddressUrl,是原addressUrl的含义
private String localAddressUrl;
private String accessToken;
private int timeout = 4;//这里因为走代理多了一次转发,所以容易出现调度接口超时,超时时间需要看情况增加,不改也行,不影响执行
这样就完成了xxl-job的跨集群改造