XXL-JOB分布式任务调度框架(二)-策略详解

news2024/11/16 11:44:52

在这里插入图片描述

文章目录

    • 1.引言
    • 2.任务详解
      • 2.1.执行器
      • 2.2.基础配置
    • 3.路由策略(第一个)-案例
    • 4.路由策略(最后一个)-案例
    • 5.轮询策略-案例
    • 6.随机选取
    • 7.轮询选取
    • 8.一致性hash
    • 9.最不经常使用 (LFU)
    • 10.最近最久未使用(LRU)
    • 11.故障转移
    • 12.忙碌转移
    • 7.分片广播任务

1.引言

本篇文章承接上文《XXL-JOB分布式任务调度框架(一)-基础入门》,上一次和大家简单介绍了下 xxl-job 的由来以及使用方法,本篇文章将会详细介绍一些高级使用方法及特性。

上文中我们在新建一个任务的时候发现有很多的选项,现在我们来详细聊一聊他们的作用。
)

2.任务详解

2.1.执行器

执行器:任务的绑定的执行器,任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能;

另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器
在这里插入图片描述

2.2.基础配置

  • 执行器:每个任务必须绑定一个执行器, 方便给任务进行分组。任务触发调度时将会自动发现注册成功的执行器, 实现任务自动发现功能; 另一方面也可以方便的进行任务分组。每个任务必须绑定一个执行器, 可在 “执行器管理” 进行设置

  • 任务描述:任务的描述信息,便于任务管理

  • 报警邮件:任务调度失败时邮件通知的邮箱地址,支持配置多邮箱地址,配置多个邮箱地址时用逗号分隔;

  • 负责人:任务的负责人;
    在这里插入图片描述

  • 调度类型

    • 无:该类型不会主动触发调度;
    • CRON:该类型将会通过CRON,触发任务调度;
    • 固定速度:该类型将会以固定速度,触发任务调度;按照固定的间隔时间,周期性触发;

在这里插入图片描述

  • 运行模式
    在这里插入图片描述

  • BEAN模式:以JobHandler方式维护在执行器端;需要结合 “JobHandler” 属性匹配执行器中任务;

  • GLUE模式(Java):任务以源码方式维护在调度中心;该模式的任务实际上是一段继承自IJobHandler的Java类代码并 “groovy” 源码方式维护,它在执行器项目中运行,可使用@Resource/@Autowire注入执行器里中的其他服务;

  • GLUE模式(Shell):任务以源码方式维护在调度中心;该模式的任务实际上是一段 “shell” 脚本;

  • GLUE模式(Python):任务以源码方式维护在调度中心;该模式的任务实际上是一段 “python” 脚本;

  • GLUE模式(PHP):任务以源码方式维护在调度中心;该模式的任务实际上是一段 “php” 脚本;

  • GLUE模式(NodeJS):任务以源码方式维护在调度中心;该模式的任务实际上是一段 “nodejs” 脚本;

  • GLUE模式(PowerShell):以源码方式维护在调度中心;该模式的任务实际上是一段 “PowerShell” 脚本;

  • JobHandler:运行模式为 “BEAN模式” 时生效,对应执行器中新开发的JobHandler类“@JobHandler”注解自定义的value值;

  • 执行参数:任务执行所需的参数;

  • 阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;

    • 单机串行(默认):调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
    • 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
    • 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
      在这里插入图片描述
      路由策略:当执行器集群部署时,提供丰富的路由策略

路由策略是指一个任务可以由多个执行器完成,那具体由哪一个完成呢,这就要看我们指定的路由策略了,这个参数当执行器做集群部署的时候才有意义
在这里插入图片描述
那么这里的第一个,最后一个是按什么顺序来的呢,就是点击查看-注册节点中的1,2,3,4,第一个指的就是1,最后一个指的就是4。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • 子任务:每个任务都拥有一个唯一的任务ID(任务ID可以从任务列表获取),当本任务执行结束并且执行成功时,将会触发子任务ID所对应的任务的一次主动调度。

  • 任务超时时间:支持自定义任务超时时间,任务运行超时将会主动中断任务;

  • 失败重试次数;支持自定义任务失败重试次数,当任务失败时将会按照预设的失败重试次数主动进行重试;

3.路由策略(第一个)-案例

源码:

package com.xxl.job.admin.core.route.strategy;
 
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
 
import java.util.List;
 
/**
 * Created by xuxueli on 17/3/10.
 */
public class ExecutorRouteFirst extends ExecutorRouter {
 
    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){
        return new ReturnT<String>(addressList.get(0));
    }
 
}

看代码就很容易理解,获取当前传入的执行器的注册地址集合的第一个。

执行器部署集群
在这里插入图片描述
在这里插入图片描述

配置路由规则
在这里插入图片描述
任务执行效果:第一个执行器执行任务
在这里插入图片描述

4.路由策略(最后一个)-案例

源码:

package com.xxl.job.admin.core.route.strategy;
 
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
 
import java.util.List;
 
/**
 * Created by xuxueli on 17/3/10.
 */
public class ExecutorRouteLast extends ExecutorRouter {
 
    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        return new ReturnT<String>(addressList.get(addressList.size()-1));
    }
 
}

这个也很容易理解,选取当前传入得执行器的注册地址集合的最后一个,下标从0开始 最后一个为addressList.size()-1

执行器部署集群

在这里插入图片描述
在这里插入图片描述配置路由规则
在这里插入图片描述
任务执行效果:最后一个执行器执行任务
在这里插入图片描述

5.轮询策略-案例

执行器部署集群

在这里插入图片描述
在这里插入图片描述配置路由规则
在这里插入图片描述
任务执行效果:轮询执行任务(一共执行4次,各执行2次)
在这里插入图片描述

6.随机选取

源码:

package com.xxl.job.admin.core.route.strategy;
 
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
 
import java.util.List;
import java.util.Random;
 
/**
 * Created by xuxueli on 17/3/10.
 */
public class ExecutorRouteRandom extends ExecutorRouter {
 
    private static Random localRandom = new Random();
 
    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = addressList.get(localRandom.nextInt(addressList.size()));
        return new ReturnT<String>(address);
    }
 
}

整个算法核心部分就是通过一个Random对象的nextInt方法在求出[0,addressList.size())区间内的任意一个地址

7.轮询选取

源码:

package com.xxl.job.admin.core.route.strategy;
 
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
 
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
 
/**
 * Created by xuxueli on 17/3/10.
 */
public class ExecutorRouteRound extends ExecutorRouter {
 
    private static ConcurrentMap<Integer, AtomicInteger> routeCountEachJob = new ConcurrentHashMap<>();
    private static long CACHE_VALID_TIME = 0;
 
    private static int count(int jobId) {
        // cache clear
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            routeCountEachJob.clear();
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
        }
 
        AtomicInteger count = routeCountEachJob.get(jobId);
        if (count == null || count.get() > 1000000) {
            // 初始化时主动Random一次,缓解首次压力
            count = new AtomicInteger(new Random().nextInt(100));
        } else {
            // count++
            count.addAndGet(1);
        }
        routeCountEachJob.put(jobId, count);
        return count.get();
    }
 
    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = addressList.get(count(triggerParam.getJobId())%addressList.size());
        return new ReturnT<String>(address);
    }
}

这里注意到创建了一个静态的ConcurrentMap对象,这个routeCountEachJob就是用来存放路由任务的,而且还设置了缓存时间,有效期为24小时,当超过24小时的时候,自动的清空当前的缓存。

其中ConcurrentMap的key为jobIdvalue为当前jobId所对应的计数器,每访问一次就自增一,最大增到100000,然后又从[0,100)的随机数开始重新自增。

这个算法的思想就是取余数,每次先计算出当前jobId所对应的计数器的值,然后 计数器的值 % addressList.size() 求得这一次轮询的地址。

8.一致性hash

package com.xxl.job.admin.core.route.strategy;
 
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
 
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
 
/**
 * 分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;
 *      a、virtual node:解决不均衡问题
 *      b、hash method replace hashCode:String的hashCode可能重复,需要进一步扩大hashCode的取值范围
 * Created by xuxueli on 17/3/10.
 */
public class ExecutorRouteConsistentHash extends ExecutorRouter {
 
    private static int VIRTUAL_NODE_NUM = 100;
 
    /**
     * get hash code on 2^32 ring (md5散列的方式计算hash值)
     * @param key
     * @return
     */
    private static long hash(String key) {
 
        // md5 byte
        MessageDigest md5;
        try {
            md5 = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("MD5 not supported", e);
        }
        md5.reset();
        byte[] keyBytes = null;
        try {
            keyBytes = key.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException("Unknown string :" + key, e);
        }
 
        md5.update(keyBytes);
        byte[] digest = md5.digest();
 
        // hash code, Truncate to 32-bits
        long hashCode = ((long) (digest[3] & 0xFF) << 24)
                | ((long) (digest[2] & 0xFF) << 16)
                | ((long) (digest[1] & 0xFF) << 8)
                | (digest[0] & 0xFF);
 
        //通过md5算出的hashcode % 2^32 余数,将hash值散列在一致性hash环上  这个环分了2^32个位置
        long truncateHashCode = hashCode & 0xffffffffL;
        return truncateHashCode;
    }
 
    public String hashJob(int jobId, List<String> addressList) {
 
        // ------A1------A2-------A3------
        // -----------J1------------------
        TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
        for (String address: addressList) {
            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                //为每一个注册的节点分配100个虚拟节点,并算出这些节点的一致性hash值,存放到TreeMap中
                long addressHash = hash("SHARD-" + address + "-NODE-" + i);
                addressRing.put(addressHash, address);
            }
        }
        //第二步求出job的hash值 通过jobId计算
        long jobHash = hash(String.valueOf(jobId));
        //通过treeMap性质,所有的key都按照从小到大的排序,即按照hash值从小到大排序,通过tailMap 求出>=hash(jobId)的剩余一部分map,
        SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
        if (!lastRing.isEmpty()) {
            //若找到则取第一个key,为带路由的地址
            return lastRing.get(lastRing.firstKey());
        }
        //若本身hash(jobId)为treeMap的最后一个key,则找当前treeMap的第一个key
        return addressRing.firstEntry().getValue();
    }
 
    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = hashJob(triggerParam.getJobId(), addressList);
        return new ReturnT<String>(address);
    }
 
}

一致哈希 是一种特殊的哈希算法。在使用一致哈希算法后,哈希表槽位数(大小)的改变平均只需要对 K/n 个关键字重新映射,其中K是关键字的数量, n是槽位数量。然而在传统的哈希表中,添加或删除一个槽位的几乎需要对所有关键字进行重新映射。

为什么要引入这个算法那,这个算法就是为了解决目前分布式所存在的问题,举个例子:

现在我们有三台Redis服务器,假设编号为0,1,2,每台服务器都缓存了当前最热门的商品详情信息,我们的映射规则是按照 hash(商品的id)%(redis服务器数量)的结果来映射到某一台编号的redis服务器中,

在这里插入图片描述

但是突然由于有一天公司商品越来越多,客户流量也越来越大,三台服务器扛不住怎么办啊,那我们就加一台服务器,那么服务器数量就发生了变动,那肯定我们的取余数这个算法重新计算映射的编号就发生了变动,很容易造成大面积缓存失效,造成缓存雪崩,

把所有请求都请求到后端数据库,造成压力过大。为了解决这个问题,就引入了一致性hash算法,即服务节点的变更不会造成大量的哈希重定位。一致性哈希算法由此而生~。

   这个一致性hash引入之后,若服务器节点数量过少,有几率出现数据倾斜的情况,既大量的数据映射到某一区间,其它区间没有数据映射,造成了资源分配不均匀,为了解决这个问题,xxl-job源码引入了虚拟节点,既将每台服务器的节点都生成所对应的100个虚拟节点,这应少量的服务器节点通过引入虚拟节点,就会加大节点的数量,这样大量的节点分配到hash环上是比较均匀的,从而很容易的解决数据分配不均匀问题。

9.最不经常使用 (LFU)

package com.xxl.job.admin.core.route.strategy;
 
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
 
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
 
/**
 * 单个JOB对应的每个执行器,使用频率最低的优先被选举
 *      a(*)、LFU(Least Frequently Used):最不经常使用,频率/次数
 *      b、LRU(Least Recently Used):最近最久未使用,时间
 *
 *      算法思想:
 *      构建一个作业和地址map    jobid -> addressList
 *      第一次随机的将任务所对应的执行器的注册地址编一个序列号
 *      然后将执行器的注册地址按照从小到大进行排序
 *      筛选过程找第一个序列号最小的作为下一次的路由地址
 *      随后将当前选中的地址编号值+1
 *      这样最终我们都会挑选编号最小的注册器地址作为下一个路由地址,既最不常使用的
 *
 * Created by xuxueli on 17/3/10.
 */
public class ExecutorRouteLFU extends ExecutorRouter {
 
    private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();
    private static long CACHE_VALID_TIME = 0;
 
    public String route(int jobId, List<String> addressList) {
 
        // cache clear
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            jobLfuMap.clear();
            //有效缓存时间为一天
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
        }
 
        // lfu item init
        HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId);     // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;
        if (lfuItemMap == null) {
            lfuItemMap = new HashMap<String, Integer>();
            jobLfuMap.putIfAbsent(jobId, lfuItemMap);   // 避免重复覆盖
        }
 
        // put new
        for (String address: addressList) {
            if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
                lfuItemMap.put(address, new Random().nextInt(addressList.size()));  // 初始化时主动Random一次,缓解首次压力
            }
        }
        // remove old
        List<String> delKeys = new ArrayList<>();
        for (String existKey: lfuItemMap.keySet()) {
            if (!addressList.contains(existKey)) {
                delKeys.add(existKey);
            }
        }
        if (delKeys.size() > 0) {
            for (String delKey: delKeys) {
                lfuItemMap.remove(delKey);
            }
        }
 
        // load least userd count address
        List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());
        Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
            @Override
            public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
                return o1.getValue().compareTo(o2.getValue());
            }
        });
 
        Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
        String minAddress = addressItem.getKey();
        addressItem.setValue(addressItem.getValue() + 1);
 
        return addressItem.getKey();
    }
 
    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = route(triggerParam.getJobId(), addressList);
        return new ReturnT<String>(address);
    }
 
}

10.最近最久未使用(LRU)

package com.xxl.job.admin.core.route.strategy;
 
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
 
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
 
/**
 * 单个JOB对应的每个执行器,最久为使用的优先被选举
 *      a、LFU(Least Frequently Used):最不经常使用,频率/次数
 *      b(*)、LRU(Least Recently Used):最近最久未使用,时间
 *
 * Created by xuxueli on 17/3/10.
 */
public class ExecutorRouteLRU extends ExecutorRouter {
 
    private static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();
    private static long CACHE_VALID_TIME = 0;
 
    public String route(int jobId, List<String> addressList) {
 
        // cache clear
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            jobLRUMap.clear();
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
        }
 
        // init lru
        LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
        if (lruItem == null) {
            /**
             * LinkedHashMap
             *      a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;
             *      b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
             */
            lruItem = new LinkedHashMap<String, String>(16, 0.75f, true);
            jobLRUMap.putIfAbsent(jobId, lruItem);
        }
 
        // put new
        for (String address: addressList) {
            if (!lruItem.containsKey(address)) {
                lruItem.put(address, address);
            }
        }
        // remove old
        List<String> delKeys = new ArrayList<>();
        for (String existKey: lruItem.keySet()) {
            if (!addressList.contains(existKey)) {
                delKeys.add(existKey);
            }
        }
        if (delKeys.size() > 0) {
            for (String delKey: delKeys) {
                lruItem.remove(delKey);
            }
        }
 
        // load
        String eldestKey = lruItem.entrySet().iterator().next().getKey();
        String eldestValue = lruItem.get(eldestKey);
        return eldestValue;
    }
 
    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = route(triggerParam.getJobId(), addressList);
        return new ReturnT<String>(address);
    }
 
}

11.故障转移

package com.xxl.job.admin.core.route.strategy;
 
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
 
import java.util.List;
 
/**
 * Created by xuxueli on 17/3/10.
 * 故障转移路由策略
 * 思想:遍历所有的该组下的所有注册节点地址集合,然后分别进行心跳处理,直到找到一个发送心跳成功的节点作为下一次路由的节点
 */
public class ExecutorRouteFailover extends ExecutorRouter {
 
    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
 
        StringBuffer beatResultSB = new StringBuffer();
        for (String address : addressList) {
            // beat
            ReturnT<String> beatResult = null;
            try {
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                beatResult = executorBiz.beat();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
            }
            beatResultSB.append( (beatResultSB.length()>0)?"<br><br>":"")
                    .append(I18nUtil.getString("jobconf_beat") + ":")
                    .append("<br>address:").append(address)
                    .append("<br>code:").append(beatResult.getCode())
                    .append("<br>msg:").append(beatResult.getMsg());
 
            // beat success
            if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
 
                beatResult.setMsg(beatResultSB.toString());
                beatResult.setContent(address);
                return beatResult;
            }
        }
        return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());
 
    }
}

这个算法很好理解,就是过滤所有故障的节点,找到一个健康节点运行任务,算法很简单,就是拿到节点的地址集合,然后一个个发心跳,若收到正常的心跳响应,则选择此节点作为执行任务的节点

12.忙碌转移

package com.xxl.job.admin.core.route.strategy;
 
import com.xxl.job.admin.core.scheduler.XxlJobScheduler;
import com.xxl.job.admin.core.route.ExecutorRouter;
import com.xxl.job.admin.core.util.I18nUtil;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.IdleBeatParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
 
import java.util.List;
 
/**
 * Created by xuxueli on 17/3/10.
 * 忙碌转移
 * 原理遍历所有的执行器,对所有执行器发送空闲心跳数据包
 * 收集所有的返回信息,若当前机器繁忙则响应getCode==500 否则空闲则getCode==200
 * 找到空闲的机器则返回该空闲机器的地址
 */
public class ExecutorRouteBusyover extends ExecutorRouter {
 
    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        StringBuffer idleBeatResultSB = new StringBuffer();
        for (String address : addressList) {
            // beat
            ReturnT<String> idleBeatResult = null;
            try {
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
            }
            idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"")
                    .append(I18nUtil.getString("jobconf_idleBeat") + ":")
                    .append("<br>address:").append(address)
                    .append("<br>code:").append(idleBeatResult.getCode())
                    .append("<br>msg:").append(idleBeatResult.getMsg());
 
            // beat success
            if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
                idleBeatResult.setMsg(idleBeatResultSB.toString());
                idleBeatResult.setContent(address);
                return idleBeatResult;
            }
        }
 
        return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
    }
 
}

忙碌转移也很容易理解,就是发送idleBeat(空闲心跳包),检测当前机器是否空闲,怎么判断当前机器是否空闲那,
在这里插入图片描述
就是EmbedServer来处理这个请求,判断当前执行器节点是否执行当前任务或者当前执行器节点的任务队列是否为空,若既不是执行当前任务的节点或者任务队列为空则返回SUCCESS,以下代码就是上述所说。
在这里插入图片描述

直到筛选出一个空闲节点为止,就选择当前空闲节点为下一个需要执行任务的节点

7.分片广播任务

执行器集群部署时,任务路由策略选择”分片广播”情况下,一次任务调度将会广播触发对应集群中所有执行器执行一次任务。

在这里插入图片描述
内部实现方式:如果有n个实例,那用任务项ID%n,由此来选择实例
在这里插入图片描述
需求:让两个节点同时执行10000个任务,每个节点分别执行5000个任务

创建分片执行器:xxl-job-sharding-sample
在这里插入图片描述
创建任务,路由策略指定为分片广播
在这里插入图片描述
分片广播代码

  • 分片参数
    • index:当前分片序号(从0开始),执行器集群列表中当前执行器的序号;
    • total:总分片数,执行器集群的总机器数量;

案例实现
配置执行器的名称
在这里插入图片描述
创建任务

package com.xxl.job.config;

import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

@Component
public class HelloJob1 {

    @XxlJob("shardingJobHandler")
    public void shardingJobHandler() {
        //分片的参数
        //1.当前分片序号(从0开始),执行器集群列表中当前执行器的序号;
        int shardIndex = XxlJobHelper.getShardIndex();
        //2.总分片数,执行器集群的总机器数量;
        int shardTotal = XxlJobHelper.getShardTotal();

        //业务逻辑
        List<Integer> list = getList();
        for (Integer integer : list) {
            if (integer % shardTotal == shardIndex) {
                System.out.println("当前第" + shardIndex + "分片执行了,任务项为:" + integer);
            }
        }
    }

    public List<Integer> getList() {
        List<Integer> list = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            list.add(i);
        }
        return list;
    }
}


在这里插入图片描述
启动多个实例,进行测试
在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/354980.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

中外互免签证协定一览表(普通护照与公务普通护照)

普通护照&#xff1a;由公安部出入境管理机构或者公安部委托的县级以上地方人民政府公安机关出入境管理机构以及中华人民共和国驻外使馆、领馆和外交部委托的其他驻外机构签发&#xff0c;主要颁发给出国定居、探亲、访友、继承财产、留学、就业、旅游等因私事出国的中国公民。…

[REDIS]redis的一些配置文件

修改配置文件 vim /etc/redis/redis.conf目录 protected-mode tcp-backlog timeout tcp-keepalive daemonize pidfile loglevel databases 设置密码 maxclients maxmemory maxmemory-policy maxmemory-samples 默认情况下 bind127.0.0.1 只能接受本机的访问请求。在不写的情况…

算法导论【字符串匹配】—朴素算法、Rabin-Karp、有限自动机、KMP

算法导论【字符串匹配】—朴素算法、Rabin Karp、有限自动机、KMP朴素字符串匹配算法Rabin-Karp算法有限自动机KMP算法朴素字符串匹配算法 预处理时间&#xff1a;0匹配时间&#xff1a;O((n-m1)m) Rabin-Karp算法 预处理时间&#xff1a;Θ(m)&#xff0c;需要预先算出匹…

Lua脚本执行redis指令报错【java.lang.IllegalStateException】

Lua脚本执行redis指令报错【java.lang.IllegalStateException】 问题出现背景 今天在学习redis时&#xff0c;为了让redis的多条指令&#xff08;取锁、比锁、释放锁&#xff09;保障原子性&#xff0c;我通过使用一个lua脚本统一去执行redis的的多条指令。在执行lua脚本时报错…

Python3 File(文件) 方法讲解

open() 方法 Python open() 方法用于打开1个文件&#xff0c;并返回文件对象。 在对文件进行处理过程都需要使用到这个函数&#xff0c;如果这个文件无法被打开&#xff0c;会抛出 OSError。 注意&#xff1a;使用 open() 方法一定要保证关闭文件对象&#xff0c;即调用 clo…

vrrp+mstp+osfp经典部署案例

LSW1和LSW2和LSW3和LSW4上面启用vrrpmstp组网&#xff1a; vlan 10 全走LSW1出再走AR2到外网&#xff0c;vlan 20 全走LSW2出再走AR3到外网 配置注意&#xff1a;mstp实例的根桥在哪&#xff0c;vrrp的主设备就是谁 ar2和ar3上开nat ar2和ar3可以考虑换成两台防火墙来做&…

Java基础 -- List集合

Java基础 -- List集合1. Introduction1.1 好处1.2 常用泛型2. 交集&#xff0c;差集等2.1 自身的方法2.2 1.8jdk stream 新特性2.3 Apache的CollectionUtils工具类&#xff08;推荐&#xff09;3. 限定泛型范围4. Awakening1. Introduction 1.1 好处 代码复用&#xff0c;多种…

分布式任务处理:XXL-JOB分布式任务调度框架

文章目录1.业务场景与任务调度2.任务调度的基本实现2.1 多线程方式实现2.2 Timer方式实现2.3 ScheduledExecutor方式实现2.4 第三方Quartz方式实现3.分布式任务调度4.XXL-JOB介绍5.搭建XXL-JOB —— 调度中心5.1 下载与查看XXL-JOB5.2 创建数据库表5.3 修改默认的配置信息5.4 启…

斐波那契数列(递归+迭代)

目录什么是斐波那契数列递归写法使用递归写法的缺点迭代写法(效率高)什么是斐波那契数列 斐波那契数列&#xff08;Fibonacci sequence&#xff09;&#xff0c;又称黄金分割数列&#xff0c;因数学家莱昂纳多斐波那契&#xff08;Leonardo Fibonacci&#xff09;以兔子繁殖为例…

Redis:数据结构

简单动态字符串SDS Redis没有直接使用C语言传统的字符串表示(以空字符结尾的字符数组&#xff0c;以下简称C字符串)&#xff0c;而是自己构 建了一种名为简单动态字符串(simple dynamic string, SDS)的抽象类型&#xff0c;并将SDS用作Redis的默认字符 串表示。 SDS 的实现…

aws lambda rust的sdk和自定义运行时

rust的aws sdk 参考资料 https://docs.aws.amazon.com/sdk-for-rust/latest/dg/getting-started.htmlhttps://awslabs.github.io/aws-sdk-rust/https://github.com/awslabs/aws-sdk-rusthttps://github.com/awsdocs/aws-doc-sdk-examples/tree/main/rust_dev_preview rus sd…

XlsReadWriteII EXCEL Cell Font 单元字体设置

XlsReadWriteII EXCEL Cell Font 单元字体设置 通过XLSReadWriteII5在写EXCEL时&#xff0c;由于XLSReadWriteII5中使用的是个性化的TFont&#xff0c;因而通过参数带入TFont&#xff0c;不能完整地将TFont带入Cell的Font&#xff0c;问题解决如下&#xff1a; 一、问题说明 1、…

Python学习-----模块2.0(常用模块之时间模块-->time)

目录 前言&#xff1a; time简介 导入模块 1.时间戳 2.时间元组 &#xff08;1&#xff09;把时间戳转换为元组形式 &#xff08;2&#xff09;元组转换为时间戳输出 &#xff08;3&#xff09;把元组转换为格式化时间 &#xff08;4&#xff09;把时间戳转换为格式化时间…

【深度学习】模型评估

上一章——多分类问题和多标签分类问题 文章目录算法诊断模型评估交叉验证测试算法诊断 如果你为问题拟合了一个假设函数&#xff0c;我们应当如何判断假设函数是否适当拟合了&#xff1f;我们可以通过观察代价函数的图像&#xff0c;当代价函数达到最低点的时候&#xff0c;此…

手机/移动端的UI框架-Vant和NutUI

下面推荐2款手机/移动端的UI框架。 其实还有很多的框架&#xff0c;各个大厂都有UI框架。目前&#xff0c;找来找去&#xff0c;只有腾讯的移动端是setup语法写的TDesign&#xff0c;其他大厂&#xff0c;虽然都是VUE3写的&#xff0c;但是都还未改成setup的语法&#xff0c;而…

张晨光-JAVA零基础保姆式JDBC技术教程

JDBC文档 JDBC概述 JDBC概述 Java DataBase Connectivity Java 数据库连接技术 JDBC的作用 通过Java语言操作数据库&#xff0c;操作表中的数据 SUN公司为**了简化、**统一对数据库的操作&#xff0c;定义了一套Java操作数据库的规范&#xff0c;称之为JDBC JDBC的本质 是官方…

【JavaSE专栏10】Java的顺序结构、选择结构和循环结构

作者主页&#xff1a;Designer 小郑 作者简介&#xff1a;Java全栈软件工程师一枚&#xff0c;来自浙江宁波&#xff0c;负责开发管理公司OA项目&#xff0c;专注软件前后端开发&#xff08;Vue、SpringBoot和微信小程序&#xff09;、系统定制、远程技术指导。CSDN学院、蓝桥云…

金融错配程度/信贷错配程度/资本错配程度/资本资源错配程度(1998-2021年)

数据来源&#xff1a;根据沪深A股上市公司数据进行测算 时间跨度&#xff1a;1998-2021年 区域范围&#xff1a;沪深A股上市公司 指标说明&#xff1a; 参考邵挺(2010)、周煜皓和张胜勇(2014)的研究&#xff0c;运用金融错配负担水平来衡量信贷错配&#xff08;Fd&#xff…

Redis-哨兵模式以及集群

在开始这部分内容之前需要先说一下复制功能&#xff0c;因为这是Redis实现主从数据同步的实现方式。复制功能如果存在两台服务器的话&#xff0c;我们可以使用redis的复制功能&#xff0c;让一台服务器去同步另一台服务器的数据。现在我启动了两台redis服务器&#xff0c;一个端…

【大数据】HADOOP-Yarn集群界面UI指标项详解(建议收藏哦)

目录首页&#xff08;Cluster&#xff09;节点信息Scheduler Metrics&#xff1a;集群调度信息节点信息详解&#xff08;Nodes&#xff09;应用列表信息&#xff08;applications&#xff09;队列详情页&#xff08;Scheduler&#xff09;指标详细说明&#xff08;非常重要&…