集成xxljob项目如何迁移到K8S

news2024/11/17 8:49:57

10.png

前言

大家好,今天我们将基于XXL-Job,探讨任务调度迁移到云端的相关话题。

XXL-Job是一款功能强大、易用可靠的国产分布式任务调度平台,是目前国内使用比较广泛的分布式任务调度平台之一。它的主要特点包括:

  • 支持分布式、多线程任务调度;
  • 具有完整的管理后台,可以实现任务调度的创建、修改、启动和监控;
  • 提供了丰富的调度方式,包括cron表达式、API调用、消息队列等;
  • 支持任务执行过程的日志记录和错误处理,可以帮助用户快速定位问题。

随着云计算的全面普及和发展,越来越多企业开始认识到公共云平台的无限潜力。许多企业开始将自己的应用程序和业务迁移到云环境中,以获取更高的灵活性、弹性和可扩展性。然而,任务调度作为企业中的一个重要业务组件,对于软件开发和运营的质量都有着极大的影响。在云环境下部署和运行任务调度组件,需要考虑诸多因素,如安全性、可靠性、性能等。因此,企业需要认真思考如何在云平台上部署和运行任务调度组件,以保证运营效率、降低成本、提高应用程序的质量和性能。

云端迁移过程

由于历史原因,我们的 xxl-job-admin 端是部署在 k8s 集群外部的。在我们的项目中,我们是使用XML文件来集成xxl-job的,相关的集成配置如下所示:

<bean id="xxlJobExecutor" class="com.xxl.job.core.executor.impl.XxlJobSpringExecutor">
    <property name="adminAddresses" value="${xxl.job.admin.addresses}"/>
    <property name="appname" value="${xxl.job.executor.appname}"/>
    <property name="ip" value="${xxl.job.executor.ip}"/>
    <property name="port" value="${xxl.job.executor.port}"/>
    <property name="accessToken" value="${xxl.job.accessToken}" />
    <property name="logPath" value="${xxl.job.executor.logpath}"/>
    <property name="logRetentionDays" value="${xxl.job.executor.logretentiondays}"/>
</bean>

其中,相关配置值如下:

xxl.job.admin.addresses = http://127.0.0.1/xxl-job-admin
xxl.job.executor.appname = xxl-job-executor-sample
xxl.job.executor.ip = 
xxl.job.executor.port = 30065
xxl.job.accessToken = mytoken
xxl.job.executor.logpath = /etc/logs
xxl.job.executor.logretentiondays = -1

解决注册IP错误问题

当我们使用了与其他普通 Spring 项目的 JAR 包相同的部署方式将任务调度组件部署到了 k8s 上后,虽然我们通过管理页面看到已经成功将服务注册到了 xxl-job-admin,但我们发现该服务的 IP 地址为 k8s 中 Pod 的私有 IP 地址。因为k8s 集群内部通信的私有 IP 地址在集群外不可访问,这导致了任务无法正常执行,系统提示 IP 地址无效。

15.png

那么该如果解决这个问题呢?

阅读XXL-Job源码可以深入了解XXL-Job框架的实现细节和内部机制。在XXL-Job源码中,可以找到一些关键方法,帮助我们了解IP和port的获取规则。

具体来说,这些方法位于com.xxl.job.core.executor.XxlJobExecuto类中的initEmbedServer方法。当执行器启动时,会优先使用配置文件中的IP和端口,如果配置文件未指定,则通过NetUtils获取本地主机地址和默认端口。在注册成功后,执行器就可以通过该IP和端口与注册中心进行正常通信。部分源码如下:

port = port > 0 ? port : NetUtil.findAvailablePort(9999);
ip = ip != null && ip.trim().length() > 0 ? ip : IpUtil.getIp();

由此可见,为了解决这个问题,我们有两种方法可以尝试。

  • 我们可以直接将配置文件中的 xxl.job.executor.ip 指定为正确的IP地址,这样XXL-Job就可以正确地找到执行器并与之通信了。
  • 在XXL-Job的管理页面上将执行器的注册方式改为手动录入,并直接填写正确的IP地址。

16.png

无论使用哪种方法,唯一的要求就是确保与执行器实际运行的IP地址匹配。这样就可以使XXL-Job正常工作了。

实现动态注册IP

无论采用前面提到的两种方式中的哪一种,均存在一个xxl-job配置写死IP地址的问题,而无法实现IP的动态获取,这对于后期的维护和动态扩缩容都是不利的。那么如何在保证获取到的IP正确的前提下实现自动获取呢?

为了实现xxl-job自动获取注册IP的目的,在获取IP的过程中,我们可以结合Dubbo框架的获取IP逻辑,改造获取IP的顺序。按照以下顺序获取IP:

  • 首先根据环境变量获取IP,如果环境变量中存在,则获取环境变量中的IP地址。

  • 如果环境变量中不存在,则根据配置文件获取IP,如果配置文件中存在,则获取配置文件中的IP地址。

  • 如果配置文件中不存在,则获取本地IP地址。

这样的优先级顺序可以确保我们始终能够获得一个可用的注册IP。通过这种方式会让获取IP更加智能化和可靠。以下是具体改造步骤:

  • deploy.yaml 文件中添加环境变量。
spec:
  template:
    spec:
      containers:
        - env:
            - name: XXLJOB_IP_TO_REGISTRY
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.hostIP
  • 使用Java代码中的注释@Configuration和@Bean注释来替代使用XML文件进行Bean的注册和配置。
@Slf4j
@Configuration
public class XxlJobConfiguration {

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init start...");

        // 获取ip规则优先级, 环境变量(此值为deploy.yaml中配置)>配置文件>默认(本地)
        String ip = System.getenv("XXLJOB_IP_TO_REGISTRY");
        ip = StringUtils.isBlank(ip) ? PropertiesCacheUtil.getConfigValue("xxl.job.executor.ip") : ip;

        String port = PropertiesCacheUtil.getConfigValue("xxl.job.executor.port");
        String logRetentionDays = PropertiesCacheUtil.getConfigValue("xxl.job.executor.logretentiondays");

        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.admin.addresses")));
        xxlJobSpringExecutor.setAppname(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.executor.appname")));
        xxlJobSpringExecutor.setIp(ip);

        if (StringUtils.isNotBlank(port)) {
            xxlJobSpringExecutor.setPort(Integer.parseInt(port));
        }
        xxlJobSpringExecutor.setAccessToken(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.accessToken")));
        xxlJobSpringExecutor.setLogPath(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.executor.logpath")));
        if (StringUtils.isNotBlank(logRetentionDays)) {
            xxlJobSpringExecutor.setLogRetentionDays(Integer.parseInt(logRetentionDays));
        }
        
        log.info(">>>>>>>>>>> xxl-job config init end...");
        return xxlJobSpringExecutor;
    }
}

通过这样的改造,我们可以更加智能可靠地获取注册IP,实现了xxl-job自动获取IP地址的目的。

解决分片问题

无论使用上面提到的写死配置方式还是实现动态注册IP,都是仅适用于单机的情况,如果需要部署多台任务调度组件,那么又该如何配置才能保证每个服务都可以被调度,以达到实现分片处理的目的呢?

方法1:

我们可以通过在deploy.yaml文件中配置Pod的反亲和性,使得单台宿主机上仅能部署一个服务,并且配置在service.yaml中配置代理策略为Local的方式来达到上述目的。具体配置如下:

deploy.yaml改造如下:

spec:
  template:
    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - topologyKey: kubernetes.io/hostname
              labelSelector:
                matchExpressions:
                  - key: app
                    operator: In
                    values:
                      - 你的APP名称

service.yaml 改造如下:

spec:
  ## 代理策略:默认Cluster。Cluster表示:流量可以转发到其他节点上的Pod。Local表示:流量只发给本机的Pod
  externalTrafficPolicy: Local

经过上面的改造,我们成功的解决了分片问题,但是又带来了新的问题,如下图所示:
在这里插入图片描述

上面的方法都是使用Deployment方式部署的,那么,我们是否可以换下思路使用StatefulSet方式部署呢?这就衍生出了下面的方法。

方法2:
  • 改造配置:
## 注册到xxljob的端口,多个使用英文逗号分隔
xxl.job.executor.port = 30065,30066,30067
  • 改造代码
@Slf4j
@Configuration
public class XxlJobConfiguration {

    @Bean
    public XxlJobSpringExecutor xxlJobExecutor() {
        log.info(">>>>>>>>>>> xxl-job config init.");

        // 获取ip规则优先级, 配置中心>环境变量(此值为deploy.yml中配置)
        String ip = PropertiesCacheUtil.getConfigValue("xxl.job.executor.ip");
        ip = StringUtils.isBlank(ip) ? System.getenv("XXLJOB_IP_TO_REGISTRY") : ip;
        log.info("==>ip:{}", ip);

        String podName = StringUtils.trimToEmpty(System.getenv("POD_NAME"));
        log.info("==>POD_NAME:{}", podName);

        String[] split = StringUtils.split(podName, "-");
        String index = split[split.length - 1];
        log.info("==>index:{}", index);

        String allPort = PropertiesCacheUtil.getConfigValue("xxl.job.executor.port");
        String[] portSplit = StringUtils.split(allPort, ",");
        String port = portSplit[Integer.parseInt(index)];
        log.info("==>port:{}", port);

        String logRetentionDays = PropertiesCacheUtil.getConfigValue("xxl.job.executor.logretentiondays");

        XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
        xxlJobSpringExecutor.setAdminAddresses(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.admin.addresses")));
        xxlJobSpringExecutor.setAppname(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.executor.appname")));
        xxlJobSpringExecutor.setIp(ip);

        if (StringUtils.isNotBlank(port)) {
            xxlJobSpringExecutor.setPort(Integer.parseInt(port));
        }
        xxlJobSpringExecutor.setAccessToken(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.accessToken")));
        xxlJobSpringExecutor.setLogPath(StringUtils.trimToEmpty(PropertiesCacheUtil.getConfigValue("xxl.job.executor.logpath")));
        if (StringUtils.isNotBlank(logRetentionDays)) {
            xxlJobSpringExecutor.setLogRetentionDays(Integer.parseInt(logRetentionDays));
        }
        return xxlJobSpringExecutor;
    }
}
@Slf4j
@Component
public class InitNotifyDataFromDBHandler {

    @XxlJob("initNotifyDataFromDBHandler")
    public void initNotifyDataFromDBHandler(String params) {
            // XxlJobHelper.getShardIndex():当前分片序号(从0开始),执行器集群列表中当前执行器的序号;
            // XxlJobHelper.getShardTotal():总分片数,执行器集群的总机器数量;

            String podName = StringUtils.trimToEmpty(System.getenv("POD_NAME"));
            log.info("==>POD_NAME:{}", podName);
            XxlJobHelper.log("==>POD_NAME:{}", podName);

            String[] split = StringUtils.split(podName, "-");
            String index = split[split.length - 1];
            log.info("==>index:{}", index);
            XxlJobHelper.log("==>index:{}", index);

            // 下标0:机器总数目,下标1:当前机器在总机器中的位置下标
            String[] args = {XxlJobHelper.getShardTotal() + "", index};
            
            // 其他业务逻辑
            }
     }
  • 重写K8S中yaml部署文件
## 创建StatefulSet
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: your-app
  namespace: your-namespace
spec:
  serviceName: your-app
  replicas: 3
  selector:
    matchLabels:
      app: your-app
  template:
    metadata:
      annotations:
        statefulset.kubernetes.io/pod-name: $(POD_NAME)
      labels:
        app: your-app
    spec:
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
              - matchExpressions:
                  - key: project.node
                    operator: In
                    values:
                      - your-project-node
      volumes:
        - name: timezone
          hostPath:
            path: /usr/share/zoneinfo/Asia/Shanghai
      containers:
        - env:
            - name: POD_NAME
              valueFrom:
                fieldRef:
                  fieldPath: metadata.name
            - name: DUBBO_IP_TO_REGISTRY
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.hostIP
            - name: XXLJOB_IP_TO_REGISTRY
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: status.hostIP
          image: your-image
          imagePullPolicy: Always
          name: your-app
          terminationMessagePath: /dev/termination-log
          terminationMessagePolicy: File
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      terminationGracePeriodSeconds: 30
## 创建service
---
apiVersion: v1
kind: Service
metadata:
  name: service-your-app-0
  namespace: your-namespace
spec:
  selector:
    statefulset.kubernetes.io/pod-name: your-app-0
  type: NodePort
  sessionAffinity: None
  ports:
    - name: xxljob-your-app
      port: 30065
      targetPort: 30065
      nodePort: 30065
---
apiVersion: v1
kind: Service
metadata:
  name: service-your-app-1
  namespace: your-namespace
spec:
  selector:
    statefulset.kubernetes.io/pod-name: your-app-1
  type: NodePort
  sessionAffinity: None
  ports:
    - name: xxljob-your-app
      port: 30066
      targetPort: 30066
      nodePort: 30066
---
apiVersion: v1
kind: Service
metadata:
  name: service-your-app-2
  namespace: your-namespace
spec:
  selector:
    statefulset.kubernetes.io/pod-name: your-app-2
  type: NodePort
  sessionAffinity: None
  ports:
    - name: xxljob-your-app
      port: 30067
      targetPort: 30067
      nodePort: 30067

经过上面的改造,我们成功的解决了使用第一种方法带来的问题。但是这个方法同样以下缺点,但是这种缺点相对来说是可以忽略的,因为生产环境不会随便增减副本数量。

  • 在K8S的dashboard页面直接新增副本数量无效,需要先新增配置文件中的端口,再新增部署yaml中对应的Service,才能真正实现副本数量的增加。

小结

以上就是今天分享的任务调度上云的相关内容,我们的目标不仅仅是将任务调度程序迁移到云端,更是要通过实现自动注册功能,使任务调度程序能自动加入云端调度集群,从而更方便地进行任务调度,提升运行效率和可扩展性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1390537.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

29 旋转工具箱

效果演示 实现了一个菜单按钮的动画效果&#xff0c;当鼠标悬停在菜单按钮上时&#xff0c;菜单按钮会旋转315度&#xff0c;菜单按钮旋转的同时&#xff0c;菜单按钮旋转的8个小圆圈也会依次旋转360度&#xff0c;并且每个小圆圈的旋转方向和菜单按钮的旋转方向相反&#xff0…

查看服务器的yum 源

1、cd /etc/yum.repos.d 2、编辑 CentOS-Stream-Sources.repo 3、 查看里面的yum源地址 4、更新yum源&#xff0c;执行下面指令 yum clean all # 清除系统所有的yum缓存 yum makeacache # 生成新的yum缓存 yum repolist

Spring Boot - Application Events 同步 VS 异步 发布订阅事件实战

文章目录 PreCode基础工程启动类切入口事件 发布事件同步 Listener异步Listener增加EnableAsync增加 Async 测试 Pre Spring Boot - Application Events 的发布顺序_ApplicationStartingEvent Spring Boot - Application Events 的发布顺序_ApplicationEnvironmentPreparedEv…

低代码自动化测试的实践

何为低代码测试 传统上&#xff0c;功能、 UI、端到端等测试自动化的实现都涉及编写测试脚本&#xff0c;代替测试人员执行重复的手动测试任务。自动化脚本的开发工作通常由 QA 工程师或开发人员完成&#xff0c;这需要编写大量代码。 而低代码甚至无代码的理念也是在自动化测…

SpringBoot+SSM项目实战 苍穹外卖(12) Apache POI

继续上一节的内容&#xff0c;本节是苍穹外卖后端开发的最后一节&#xff0c;本节学习Apache POI&#xff0c;完成工作台、数据导出功能。 目录 工作台Apache POI入门案例 导出运营数据Excel报表 工作台 工作台是系统运营的数据看板&#xff0c;并提供快捷操作入口&#xff0c…

SQLServer 为角色开视图SELECT权限,报错提示需要开基础表权限

问题&#xff1a; 创建了个视图V&#xff0c;里面包含V库的a表&#xff0c;和T库的b表 为角色开启视图V的SELECT权限&#xff0c;提示T库的b表无SELECT权限&#xff0c;报错如下 解决方案&#xff1a; ①在T库建个视图TV&#xff0c;里面包含b表&#xff08;注意是在b表的对…

《2023年度程序员收入报告》 :旧金山位居第一,北京程序员中位数超60万元

2024年刚刚拉开序幕&#xff0c;备受瞩目的程序员薪资调研报告再度登场。由知名数据采集平台levels.fyi 搜集并整理了《2023年全球程序员收入报告》&#xff0c;为我们揭示了程序员最新的收入情况&#xff0c;其中有哪些值得关注的亮点呢&#xff1f; 行情向好&#xff0c;大多…

MS8257N超低噪声、宽带、可选反馈电阻跨阻放大器

产品简述 MS8257N 是一颗宽带、快速过载恢复时间、快速建立时 间、跨阻增益可调、超低噪声的跨阻放大器&#xff0c;主要用于光电 监测和各种高性能的光电系统。快速过载恢复特性和内部输 入保护电路可以让信号从过载传输中快速恢复正常。两档可 选跨阻增益保证了极高的动…

Matlab深度学习进行波形分割(二)

&#x1f517; 运行环境&#xff1a;Matlab &#x1f6a9; 撰写作者&#xff1a;左手の明天 &#x1f947; 精选专栏&#xff1a;《python》 &#x1f525; 推荐专栏&#xff1a;《算法研究》 &#x1f510;#### 防伪水印——左手の明天 ####&#x1f510; &#x1f497; 大家…

Ubuntu 22.04 安装prometheus

服务器监控和报警软件有很多&#xff0c;为什么我们会选择Prometheus而不是其他软件呢&#xff1f; 因为它有以下优点&#xff1a; 自带简易web监控页面&#xff0c;用户可以很方便地查看监控数据和使用仪表盘。能实时收集数据并根据自定义警报规则推送告警&#xff1b;具有丰…

企业如何找到合适的内容策略?媒介盒子分享

企业如果想要抢先占领用户心智的话&#xff0c;媒介盒子认为首先需要找到合适的内容策略&#xff0c;好的内容能够与消费者建立双向信任的关系&#xff0c;一种让消费者对品牌的好感度提升&#xff0c;进而成为品牌的忠实用户&#xff0c;接下来媒介盒子就来和大家聊聊&#xf…

Selenium Grid - 多台计算机上并行运行

当你希望在多台计算机上并行运行测试&#xff1f;Selenium Grid可以帮你实现。 官方文档原文&#xff1a; https://www.selenium.dev/documentation/grid/getting_started/ Selenium Grid允许通过将客户端发送的命令路由到远程浏览器实例&#xff0c;在远程机器上执行WebDriv…

mac上搭建 hadoop 伪集群

1. hadoop介绍 Hadoop是Apache基金会开发的一个开源的分布式计算平台&#xff0c;主要用于处理和分析大数据。Hadoop的核心设计理念是将计算任务分布到多个节点上&#xff0c;以实现高度可扩展性和容错性。它主要由以下几个部分组成&#xff1a; HDFS (Hadoop Distributed Fi…

Kafka生产消费流程

Kafka生产消费流程 1.Kafka一条消息发送和消费的流程图(非集群) 2.三种发送方式 准备工作 创建maven工程&#xff0c;引入依赖 <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.3.1…

【MATLAB】 HANTS滤波算法

有意向获取代码&#xff0c;请转文末观看代码获取方式~ 1 基本定义 HANTS滤波算法是一种时间序列谐波分析方法&#xff0c;它综合了平滑和滤波两种方法&#xff0c;能够充分利用遥感图像存在时间性和空间性的特点&#xff0c;将其空间上的分布规律和时间上的变化规律联系起来…

【MySQL性能优化】- MySQL结构与SQL执行过程

MySQL结构与SQL执行过程 &#x1f604;生命不息&#xff0c;写作不止 &#x1f525; 继续踏上学习之路&#xff0c;学之分享笔记 &#x1f44a; 总有一天我也能像各位大佬一样 &#x1f3c6; 博客首页 怒放吧德德 To记录领地 &#x1f31d;分享学习心得&#xff0c;欢迎指正…

BPMN 2.0 相关定义概要描述

官方文档&#xff1a;https://www.omg.org/spec/BPMN/2.0/ BPMN 2.0 &#xff08;BPMN&#xff0c;业务流程模型和标记&#xff0c;Business Process Model And Notation&#xff09; 是一种业务流程建模和执行的标准&#xff0c;它使用 XML 格式来描述业务流程。 以下是 BPM…

Eureka 本机集群实现

距离上次发布博客已经一年多了&#xff0c;主要就是因为考研&#xff0c;没时间学习技术的内容&#xff0c;现在有时间继续完成关于代码方面的心得&#xff0c;希望跟大家分享。 今天在做一个 Eureka 的集群实现&#xff0c;我是在本电脑上跑的&#xff0c;感觉这个挺有意思&a…

alibaba.item_get API:电商行业中的数据驱动决策支持

alibaba.item_get API 是阿里巴巴提供的一个用于获取商品详情的接口。在电商行业中&#xff0c;数据驱动的决策支持是非常重要的&#xff0c;而这个 API 可以帮助你获取到商品的各种详细信息&#xff0c;从而为你的决策提供支持。 具体来说&#xff0c;通过使用 alibaba.item_…

可以部署到Vercel的一些有趣项目

博客地址 可以部署到Vercel的一些有趣项目-雪饼分享几款可以部署在Vercel上的项目&#xff0c;更新中~ 免费的域名要不要&#xff1f; 如果你还不会将项目部署到Vercel&#xff0c;或是绑定域名建议阅读 将项目部署到Vercel&#xff0c;并绑定域名 Excalidraw 白板 一个开源的…