xxl-job跨集群调度改造

news2025/1/7 10:29:44

这篇文章为大家提供一种在多k8s集群中部署一套xxl-job的方案。

问题背景:

  1. 公司生产环境有多套k8s集群,为保证服务可用,容器需要部署到不同集群中。单集群中容器间可直接通过本地ip访问,跨集群容器间调用需通过宿主ip+映射端口访问。
  2. 由于k8s部署容器时不一定会在哪台宿主机,我们设定xxl-job执行器注册的ip为自动获取的本地ip,端口固定。admin使用的数据库为同一套,导致在跨集群情况下admin调度另一个k8s集群的executor会失败。
    如下图:
    在这里插入图片描述

目标:

  1. admin与executor能在所有集群中部署
  2. 不同集群中的admin能够调度所有的executor
  3. 保证任务调度策略(如轮询,第一个,故障转移,最近最久未使用,admin管理端指定ip调用)不受影响。
    在这里插入图片描述

解决思路

在admin中添加调度代理接口,利用不同k8s集群配置的ip网段差异,在admin调度时判断与executor网段是否一致,如果一致则直接调度,不一致则将调度请求(通过调度代理接口)交给与executor同集群的admin,由同集群admin调用执行器。
在这里插入图片描述

具体开发:

  1. 在admin中创建调度代理接口
    如上面架构图展示的,调度器通过http请求执行器接口完成调度,源码:
    com.xxl.job.core.biz.client.ExecutorBizClient
public class ExecutorBizClient implements ExecutorBiz {
.......
    private String addressUrl ;
    private String accessToken;
    private int timeout = 3;


    @Override
    public ReturnT<String> beat() {
        return XxlJobRemotingUtil.postBody(addressUrl+"beat", accessToken, timeout, "", String.class);
    }

    @Override
    public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam){
        return XxlJobRemotingUtil.postBody(addressUrl+"idleBeat", accessToken, timeout, idleBeatParam, String.class);
    }

    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
    }
...........

所以先在xxl-job-admin中创建接收调度请求的代理接口:

package com.xxl.job.admin.controller;

@Controller
public class ProxyController {

    @RequestMapping(value = "health",method = RequestMethod.POST)
    @ResponseBody
    @PermissionLimit(limit = false)
    public ReturnT health(@RequestHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN) String accessTokenReq,@RequestBody HealthParam healthParam) throws Exception {
        String accessToken = Config().getAccessToken();//这里是获取配置的accesstoken,请根据您自身获取accesstoken的方式进行修改
        if (accessToken != null
                && accessToken.trim().length() > 0
                && !accessToken.equals(accessTokenReq)) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
        }
        if(StringUtils.isBlank(healthParam.getAddressUrl())) return new ReturnT<String>(ReturnT.FAIL_CODE, "The addressUrl is wrong.");

        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(healthParam.getAddressUrl());
        return executorBiz.health();
    }

    @RequestMapping(value = "beat",method = RequestMethod.POST)
    @ResponseBody
    @PermissionLimit(limit = false)
    public ReturnT beat(@RequestHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN) String accessTokenReq,@RequestBody BeatParam beatParam) throws Exception {
        String accessToken = Config().getAccessToken();//这里是获取配置的accesstoken,请根据您自身获取accesstoken的方式进行修改
        if (accessToken != null
                && accessToken.trim().length() > 0
                && !accessToken.equals(accessTokenReq)) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
        }
        if(StringUtils.isBlank(beatParam.getAddressUrl())) return new ReturnT<String>(ReturnT.FAIL_CODE, "The addressUrl is wrong.");

        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(beatParam.getAddressUrl());
        return executorBiz.beat();
    }

    @RequestMapping(value = "idleBeat",method = RequestMethod.POST)
    @ResponseBody
    @PermissionLimit(limit = false)
    public ReturnT idleBeat(@RequestHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN) String accessTokenReq, @RequestBody IdleBeatParam idleBeatParam) throws Exception {
        String accessToken = Config().getAccessToken();//这里是获取配置的accesstoken,请根据您自身获取accesstoken的方式进行修改
        if (accessToken != null
                && accessToken.trim().length() > 0
                && !accessToken.equals(accessTokenReq)) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
        }
        if(StringUtils.isBlank(idleBeatParam.getAddressUrl())) return new ReturnT<String>(ReturnT.FAIL_CODE, "The addressUrl is wrong.");

        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(idleBeatParam.getAddressUrl());
        return executorBiz.idleBeat(idleBeatParam);
    }
    @RequestMapping(value = "run",method = RequestMethod.POST)
    @ResponseBody
    @PermissionLimit(limit = false)
    public ReturnT run(@RequestHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN) String accessTokenReq,@RequestBody TriggerParam triggerParam) throws Exception {
        String accessToken = Config().getAccessToken();//这里是获取配置的accesstoken,请根据您自身获取accesstoken的方式进行修改
        if (accessToken != null
                && accessToken.trim().length() > 0
                && !accessToken.equals(accessTokenReq)) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
        }
        if(StringUtils.isBlank(triggerParam.getAddressUrl())) return new ReturnT<String>(ReturnT.FAIL_CODE, "The addressUrl is wrong.");

        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(triggerParam.getAddressUrl());
        return executorBiz.run(triggerParam);
    }
    @RequestMapping(value = "kill",method = RequestMethod.POST)
    @ResponseBody
    @PermissionLimit(limit = false)
    public ReturnT kill(@RequestHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN) String accessTokenReq,@RequestBody KillParam killParam) throws Exception {
        String accessToken = Config().getAccessToken();//这里是获取配置的accesstoken,请根据您自身获取accesstoken的方式进行修改
        if (accessToken != null
                && accessToken.trim().length() > 0
                && !accessToken.equals(accessTokenReq)) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
        }
        if(StringUtils.isBlank(killParam.getAddressUrl())) return new ReturnT<String>(ReturnT.FAIL_CODE, "The addressUrl is wrong.");

        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(killParam.getAddressUrl());
        return executorBiz.kill(killParam);
    }
    @RequestMapping(value = "log",method = RequestMethod.POST)
    @ResponseBody
    @PermissionLimit(limit = false)
    public ReturnT log(@RequestHeader(XxlJobRemotingUtil.XXL_JOB_ACCESS_TOKEN) String accessTokenReq,@RequestBody LogParam logParam) throws Exception {
        String accessToken = Config().getAccessToken();//这里是获取配置的accesstoken,请根据您自身获取accesstoken的方式进行修改
        if (accessToken != null
                && accessToken.trim().length() > 0
                && !accessToken.equals(accessTokenReq)) {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "The access token is wrong.");
        }
        if(StringUtils.isBlank(logParam.getAddressUrl())) return new ReturnT<String>(ReturnT.FAIL_CODE, "The addressUrl is wrong.");

        ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(logParam.getAddressUrl());
        return executorBiz.log(logParam);
    }
}

其次我添加了一个请求参数的父类BaseParam,用于保存想要调度的executor ip。并让所有的请求参数继承自这个父类。

public class BaseParam {
    private String addressUrl;

    public String getAddressUrl() {
        return addressUrl;
    }

    public void setAddressUrl(String addressUrl) {
        this.addressUrl = addressUrl;
    }
}
public class TriggerParam extends BaseParam implements Serializable{
.....

其次要注意,要将代理接口的登录校验、鉴权关闭。

  1. 修改获取执行器信息逻辑:
    com.xxl.job.admin.core.trigger.XxlJobTrigger#runExecutor中获取调度器并调用执行器:
public class XxlJobTrigger {
.....
    /**
     * run executor
     * @param triggerParam
     * @param address
     * @return
     */
    public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
        ReturnT<String> runResult = null;
        try {
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);//获取执行器信息,我们需要修改这里的逻辑
            runResult = executorBiz.run(triggerParam);//调用执行器
        } catch (Exception e) {
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        }

        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());
        return runResult;
    }

}

原获取执行器的代码:

    public static ExecutorBiz getExecutorBiz(String address) throws Exception {
        // valid
        if (address==null || address.trim().length()==0) {
            return null;
        }

        // load-cache
        address = address.trim();
        ExecutorBiz executorBiz = executorBizRepository.get(address);
        if (executorBiz != null) {
            return executorBiz;
        }

        // set-cache
        executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());

        executorBizRepository.put(address, executorBiz);
        return executorBiz;
    }

添加代理逻辑:

public static ExecutorBiz getExecutorBiz(String localAddressUrl) throws Exception {
        // valid
        if (localAddressUrl == null || localAddressUrl.trim().length() == 0) {
            return null;
        }

        // load-cache
        localAddressUrl = localAddressUrl.trim();
        ExecutorBiz executorBiz = executorBizRepository.get(localAddressUrl);
        if (executorBiz != null) {
            return executorBiz;
        }

        // set-cache
        //添加代理:
        String addressUrl = getProxyAddress(localAddressUrl);
        executorBiz = new ExecutorBizClient(addressUrl,localAddressUrl, JobAdminConfig.getAdminConfig().getAccessToken());

        executorBizRepository.put(localAddressUrl, executorBiz);
        return executorBiz;
    }

    /**
     * 判断执行器与当前admin是否同一网段,是则原address返回,不是则返回代理(与目标执行器同网段)admin的k8s映射地址+端口地址
     * @param address
     * @return
     * @throws UnknownHostException
     */
    private static String getProxyAddress(String address) throws UnknownHostException {
        InetAddress localhost = InetAddress.getLocalHost();
        String hostAddress = localhost.getHostAddress();
        String segment = address.split("\\.")[0].replace("http://", "");
        Config config = Config().getConfig();//这里是获取不同集群网段内admin的映射地址配置,json格式字符串
        String adminProxyAddress = config.getProperty("adminProxyAddress", "");
        if(!(StringUtils.isAnyBlank(hostAddress,adminProxyAddress) || hostAddress.split("\\.")[0].equals(segment))){
            JSONObject jsonObject = JSONObject.parseObject(adminProxyAddress);
            String proxyAddress = jsonObject.getString(segment);
            if(StringUtils.isNotBlank(proxyAddress)){
                String[] split = proxyAddress.split(",");
                Random random = new Random();
                int randomNumber = random.nextInt(split.length);//集群内有多个容器,这里地址随机取
                return split[randomNumber];
            }
        }
        return address;
    }

ExecutorBizClient中添加本地executor地址:localAddressUrl

public class ExecutorBizClient implements ExecutorBiz {

    public ExecutorBizClient() {
    }

    public ExecutorBizClient(String addressUrl,String localAddressUrl,String accessToken) {
        this.addressUrl = addressUrl;
        this.accessToken = accessToken;
        this.localAddressUrl = localAddressUrl;
        // valid
        if (!this.addressUrl.endsWith("/")) {
            this.addressUrl = this.addressUrl + "/";
        }
        if (!this.localAddressUrl.endsWith("/")) {
            this.localAddressUrl = this.localAddressUrl + "/";
        }
    }
    //执行器调用地址,同一网段下addressUrl=localAddressUrl,不同网段下addressUrl为代理地址
    private String addressUrl;
    //同网段执行器调用地址localAddressUrl,是原addressUrl的含义
    private String localAddressUrl;
    private String accessToken;
    private int timeout = 4;//这里因为走代理多了一次转发,所以容易出现调度接口超时,超时时间需要看情况增加,不改也行,不影响执行

这样就完成了xxl-job的跨集群改造

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1652882.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

脸上长斑怎么办?教你一招——如何用新型揿针治疗黄褐斑?

点击文末领取揿针的视频教程跟直播讲解 你有没有发现&#xff0c;女性一到了30岁&#xff0c;脸上总是很容易长出斑点&#xff0c;特别是黄褐斑。 ​ 俗话说&#xff0c;一白遮百丑&#xff0c;一斑毁所有&#xff0c;长斑真的让人伤不起&#xff01;很多人因为黄褐斑的出现…

microsoft的azure语音,开发环境运行正常,发布到centos7线上服务器之后,无法运行

最近在做AI语音对话的功能&#xff0c;用到了azure的语音语音服务&#xff0c;开发的时候还算顺利&#xff0c;部署到线上后&#xff0c;发现在正式服上无法完成语音转文本的操作&#xff0c;提示&#xff1a; org.springframework.web.util.NestedServletException: Handler d…

Github的使用教程(下载和上传项目)

根据『教程』一看就懂&#xff01;Github基础教程_哔哩哔哩_bilibili 整理。 1.项目下载 1&#xff09;直接登录到源码链接页或者通过如下图的搜索 通过编程语言对搜索结果进一步筛选。 2&#xff09;红框区为项目的源代码&#xff0c;README.md &#xff08;markdown格式&…

实战 | 实时手部关键点检测跟踪(附完整源码+代码详解)

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

排名第一的电脑监控软件,电脑监控软件就选这款了

广受好评的电脑监控软件确实有很多选择&#xff0c;排名第一的只有一个&#xff0c;以下推荐几款备受认可的电脑监控软件&#xff0c;它们各自具有独特的特点和优势&#xff1a; 第一名&#xff0c;安企神 特点与优势&#xff1a;安企神是一款功能全面的IT资产管理和电脑桌面监…

Web前端三大主流框架是什么?

Web前端开发领域的三大主流框架分别是Angular、React和Vue.js。它们在Web开发领域中占据着重要的地位&#xff0c;各自拥有独特的特点和优势。 Angular Angular是一个由Google开发的前端框架&#xff0c;最初版本称为AngularJS&#xff0c;后来升级为Angular。它是一个完整的…

ChIP-seq or CUTTag,谁能hold住蛋白质与DNA互作主战场?

DNA与蛋白质的相互作用作为表观遗传学中的一个重要领域&#xff0c;对理解基因表达调控、DNA复制与修复、表观遗传修饰&#xff08;组蛋白修饰&#xff09;及染色质结构等基本生命过程至关重要。 自1983年James Broach首次公布染色质免疫共沉淀&#xff08;ChIP&#xff09;技…

备战人工智能大赛!卓翼飞思实验室启动机器人挑战赛赛事培训

一.大赛培训通知 本月起&#xff0c;卓翼飞思实验室将针对机器人任务挑战赛&#xff08;无人协同系统&#xff09;赛项内容开启赛事培训计划&#xff0c;采用“线上线下”相结合的培训模式&#xff0c;围绕赛事关键技术&#xff0c;让您轻松应对比赛。 5月8日进行第一期培训&am…

LLM——大语言模型完整微调策略指南

1、 概述 GPT-4、LaMDA、PaLM等大型语言模型&#xff08;LLMs&#xff09;以其在广泛主题上的深入理解和生成高度类人文本的能力而闻名遐迩&#xff0c;它们在全球范围内引起了广泛关注。这些模型的预训练过程涉及对来自互联网、书籍和其他来源的数十亿词汇的海量数据集进行学…

技术分享 | 京东商品API接口|京东零售数据可视化平台产品实践与思考

导读 本次分享题目为京东零售数据可视化平台产品实践与思考。 主要包括以下四个部分&#xff1a; 1.京东API接口介绍 2. 平台产品能力介绍 3. 业务赋能案例分享 01 京东API接口介绍 02 平台产品能力介绍 1. 产品矩阵 数据可视化产品是一种利用数据分析和可视化技术&…

Tuxera NTFS for Mac Mac用户无缝地读写NTFS格式的硬盘和U盘

在数字化时代&#xff0c;数据交换和共享变得日益重要。然而&#xff0c;对于Mac用户来说&#xff0c;与Windows系统之间的文件交换可能会遇到一些挑战。这是因为Mac OS默认不支持Windows常用的NTFS文件系统。幸运的是&#xff0c;Tuxera NTFS for Mac为我们提供了一个优雅的解…

APP广告变现:自刷的秘密与规则

在移动互联网时代&#xff0c;广告已成为众多APP盈利的主要方式之一。对于开发者和运营者而言&#xff0c;如何通过广告变现提高收益是他们必须关注的问题。然而&#xff0c;在众多的变现方法中&#xff0c;“自刷广告”这一概念可能让一些人感到迷惑。实际上&#xff0c;只要在…

详细讲解lua中string.gsub的使用

string.gsub 是 Lua 标准库中的一个函数&#xff0c;用于全局替换字符串中的某些部分。string.gsub 是 Lua 中非常实用的一个函数&#xff0c;它可以用来进行字符串的处理和替换操作。 它的基本语法如下&#xff1a; string.gsub(s, pattern, replacement [, n])s 是要处理的…

c++11 标准模板(STL)本地化库 - 平面类别(std::numpunct) - 定义数值标点规则

本地化库 本地环境设施包含字符分类和字符串校对、数值、货币及日期/时间格式化和分析&#xff0c;以及消息取得的国际化支持。本地环境设置控制流 I/O 、正则表达式库和 C 标准库的其他组件的行为。 平面类别 定义数值标点规则 std::numpunct template< class CharT >…

【Python】一道字典题目

题目&#xff1a;输入一段文本&#xff0c;统计每个字符的个数 in_inputinput(“输入&#xff1a;”) dic{} for char in in_input: if char in dic: dic[char]1 # 字典添加键值对的方法&#xff0c;给字典给键和值的方法 else: dic[char]1 print(dic) 输出台&#xff1a;

PY计算生态是什么?

Python 的计算生态指的是与 Python 相关的广泛的软件、库、框架和工具集合. 它们为各种计算任务提供了丰富的解决方案和支持。Python 作为一种简洁、易学、功能强大的编程语言&#xff0c;在科学计算、数据分析、人工智能、机器学习等领域都有着强大的影响力。以下是 Python 计…

有哪些有效的复习方法可以帮助备考软考?

软考目前仍然是一个以记忆为主、理解为辅的考试。学过软考的朋友可能会感到困惑&#xff0c;因为软考的知识在日常工作中有许多应用场景&#xff0c;需要理解的地方也很多。但为什么我说它是理解为辅呢&#xff1f;因为这些知识点只要记住了&#xff0c;都不难理解&#xff0c;…

程序员离不开的8款开发软件(必备)

在数字化时代&#xff0c;程序员扮演着关键的角色&#xff0c;他们的工作不仅仅是编写代码&#xff0c;更是创造了无限可能的世界。而要让这个创造过程更加高效和愉悦&#xff0c;选择适合自己的开发工具和软件是至关重要的。 今天&#xff0c;我将为大家介绍一些程序员爱不释…

数字孪生涉及到的前沿技术:虚拟现实 人工智能 区块链 边缘计算。

数字孪生是各类技术的综合应用&#xff0c;除了咱们常见的传感器、数据采集、清洗、传输、建模、可视化技术外&#xff0c;还有还有一些前沿技术&#xff0c;会让数字孪生更加强大和智能&#xff0c;本文介绍几个。 虚拟现实&#xff08;Virtual Reality&#xff0c;VR&#x…

浅谈C++ overload(重载) override(覆盖) overwrite(重写)

目录 1. 名词辨析2 含义解析1 overload重载2 override覆盖3 overwrite重写 3 区别4 代码示例 1. 名词辨析 关于这3个名词的中文翻译&#xff1a; overload翻译为重载&#xff0c;基本是没有歧义的&#xff1b;override和overwrite的翻译&#xff0c;我在参考了cppreference中…