分布式定时任务系列10:XXL-job源码分析之路由策略

news2025/1/18 11:50:16

 传送门

分布式定时任务系列1:XXL-job安装

分布式定时任务系列2:XXL-job使用

分布式定时任务系列3:任务执行引擎设计

分布式定时任务系列4:任务执行引擎设计续

分布式定时任务系列5:XXL-job中blockingQueue的应用

分布式定时任务系列6:XXL-job触发日志过大引发的CPU告警

分布式定时任务系列7:XXL-job源码分析之任务触发

分布式定时任务系列8:XXL-job源码分析之远程调用

 分布式定时任务系列9:XXL-job路由策略

 Java并发编程实战1:java中的阻塞队列

不忘初心

好几个月前就打算分析一下XXL-job路由策略的源码,所以有了XXL-job路由策略。不过当时偷懒,只从官网上把介绍贴出来了:

路由策略:当执行器集群部署时,提供丰富的路由策略,包括;

  1. FIRST(第一个):固定选择第一个机器;
  2. LAST(最后一个):固定选择最后一个机器;
  3. ROUND(轮询):;
  4. RANDOM(随机):随机选择在线的机器;
  5. CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
  6. LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
  7. LEAST_RECENTLY_USED(最近最久未使用):最久未使用的机器优先被选举;
  8. FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
  9. BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
  10. SHARDING_BROADCAST(分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务;

再谈前置条件

一般提到路由,可能更多的是理解为对请求做转发时的路由匹配:比如nginx的Location路由,或者SpringCloud组件的Gateway网关上请求Predicate的URL路由匹配。

不过这里说的路由,其实指的在集群条件下对执行器进行的路由选择,是一种负载均衡策略。所以这里假设了一个场景就是,在分布式环境下,有多个执行器组成的集群。这里回顾一下xxl-rpc部署示意图

  •  这里指的路由策略是执行器集群部署
  • 关于调度器集群部署不在此范围暂不讨论,
  • 后面会单开一节具体讨论如何集群部署,达到高性能、高可用目的!

路由策略

这里继续引用XXL-job源码分析之任务触发里面关于代码执行的流程

可以看到路由策略的执行代码类路径在:com.xxl.job.admin.core.trigger.XxlJobTrigger ,方法路径在:com.xxl.job.admin.core.trigger.XxlJobTrigger#processTrigger:

executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());

策略定义

XXL-job定义了策略枚举:

public enum ExecutorRouteStrategyEnum {

    /** FIRST(第一个):固定选择第一个机器; */
    FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
    /** (最后一个):固定选择最后一个机器; */
    LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
    /** (轮询):; */
    ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
    /** (随机):随机选择在线的机器; */
    RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
    /** (一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。 */
    CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
    /** (最不经常使用):使用频率最低的机器优先被选举; */
    LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
    /** (最近最久未使用):最久未使用的机器优先被选举; */
    LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
    /** (故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度; */
    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
    /** (忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度; */
    BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
    /** (分片广播):广播触发对应集群中所有机器执行一次任务,同时系统自动传递分片参数;可根据分片参数开发分片任务; */
    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);

    ExecutorRouteStrategyEnum(String title, ExecutorRouter router) {
        this.title = title;
        this.router = router;
    }

    private String title;
    private ExecutorRouter router;

    public String getTitle() {
        return title;
    }
    public ExecutorRouter getRouter() {
        return router;
    }

    public static ExecutorRouteStrategyEnum match(String name, ExecutorRouteStrategyEnum defaultItem){
        if (name != null) {
            for (ExecutorRouteStrategyEnum item: ExecutorRouteStrategyEnum.values()) {
                if (item.name().equals(name)) {
                    return item;
                }
            }
        }
        return defaultItem;
    }

}

其中枚举里面有一个属性router,真正的路由策略实现都在这个接口:com.xxl.job.admin.core.route.ExecutorRouter。

路由接口

看一看路由接口定义代码:

public abstract class ExecutorRouter {
    protected static Logger logger = LoggerFactory.getLogger(ExecutorRouter.class);

    /**
     * route address
     *
     * @param addressList
     * @return  ReturnT.content=address
     */
    public abstract ReturnT<String> route(TriggerParam triggerParam, List<String> addressList);

}

而上面的各种策略都实现了这个接口:

这种是典型的策略模式应用,这里也可以看出好的代码通过设计模式可以很方便的做到扩展! 

路由策略详解

对于这些路由策略实现,从简单到复杂一个个的来解析。

FIRST(第一个)

此策略的定义是:固定选择第一个机器!意思就是不论执行器有多少个,始终选择执行器列表的第一个进行任务执行。

这个策略的实现也相当简单:

public class ExecutorRouteFirst extends ExecutorRouter {

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList){
        return new ReturnT<String>(addressList.get(0));
    }

}

这个代码里面就是固定从addressList里面get第一个执行器

LAST(最后一个)

此策略的定义是:固定选择最后一个机器!意思就是不论执行器有多少个,始终选择执行器列表的最后一个进行任务执行。

这个策略的实现也相当简单:

public class ExecutorRouteLast extends ExecutorRouter {

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        return new ReturnT<String>(addressList.get(addressList.size()-1));
    }

}

 这个代码里面就是固定从addressList里面get最后一个个执行器

ROUND(轮询)

此策略的定义是:意思就是不论执行器有多少个,从执行器列表逐个选择进行任务执行。

这个策略的实现如下:

public class ExecutorRouteRound extends ExecutorRouter {

    private static ConcurrentMap<Integer, AtomicInteger> routeCountEachJob = new ConcurrentHashMap<>();
    private static long CACHE_VALID_TIME = 0;

    private static int count(int jobId) {
        // cache clear
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            routeCountEachJob.clear();
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
        }

        AtomicInteger count = routeCountEachJob.get(jobId);
        if (count == null || count.get() > 1000000) {
            // 初始化时主动Random一次,缓解首次压力
            count = new AtomicInteger(new Random().nextInt(100));
        } else {
            // count++
            count.addAndGet(1);
        }
        routeCountEachJob.put(jobId, count);
        return count.get();
    }

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = addressList.get(count(triggerParam.getJobId())%addressList.size());
        return new ReturnT<String>(address);
    }

}

这个代码稍微复杂一点,为了实现轮询的效果在内存中声明了一个Map来来进行计数,其中的key为任务jobId,value为每个任务的调用次数:

  • 声明一个Map类型字段routeCountEachJob来进行每个任务的调用次数计数,其中为ConcurrentMap、AtomicInteger类型的原因是防止并发
  • count(int jobId)方法的作用对当前触发的任务进行计数,这里AtomicInteger原子类对每次执行后就+1
  • 每24小时后重新开始计数
  • 然后根据当前任务(jobId)的调用次数从addressList里面选择执行器:即count % 执行器个数

这样文字可能理解起来还是不太直接,其实就是类似如下的示例:

RANDOM(随机)

此策略的定义是:意思就是不论执行器有多少个,从执行器列表随机选择在线的机器。

这个策略的实现也比较直观:

public class ExecutorRouteRandom extends ExecutorRouter {

    private static Random localRandom = new Random();

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = addressList.get(localRandom.nextInt(addressList.size()));
        return new ReturnT<String>(address);
    }

}

这个代码里面就是随机从addressList里面get一个执行器:

localRandom.nextInt(addressList.size())

LEAST_FREQUENTLY_USED(最不经常使用) 

此策略的定义是:意思就是不论执行器有多少个,使用频率最低的机器优先被选择出来进行任务执行。

这个策略的实现如下:

public class ExecutorRouteLFU extends ExecutorRouter {

    // 任务调用计算器,其中key为jobId-任务ID,value为HashMap:记录每个实例的调用次数
    private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();
    private static long CACHE_VALID_TIME = 0;

    public String route(int jobId, List<String> addressList) {

        // 缓存1天(24小时),然后重新计数
        // cache clear
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            jobLfuMap.clear();
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
        }

        // 初始化
        // lfu item init
        HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId);     // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;
        if (lfuItemMap == null) {
            lfuItemMap = new HashMap<String, Integer>();
            jobLfuMap.putIfAbsent(jobId, lfuItemMap);   // 避免重复覆盖
        }

        // put new
        for (String address: addressList) {
            if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) >1000000 ) {
                lfuItemMap.put(address, new Random().nextInt(addressList.size()));  // 初始化时主动Random一次,缓解首次压力
            }
        }
        // 这里有一个删除动作,其实是因为实例可能动态上下线,对于下线的节点需要排除
        // remove old
        List<String> delKeys = new ArrayList<>();
        for (String existKey: lfuItemMap.keySet()) {
            if (!addressList.contains(existKey)) {
                delKeys.add(existKey);
            }
        }
        // 移除下线节点,尽量防止调度到下线的节点上导致失败
        if (delKeys.size() > 0) {
            for (String delKey: delKeys) {
                lfuItemMap.remove(delKey);
            }
        }
       
        // 进行调用次数排序
        // load least userd count address
        List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());
        Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
            @Override
            public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
                return o1.getValue().compareTo(o2.getValue());
            }
        });

        // 调用次数+1
        Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
        String minAddress = addressItem.getKey();
        addressItem.setValue(addressItem.getValue() + 1);

        return addressItem.getKey();
    }

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = route(triggerParam.getJobId(), addressList);
        return new ReturnT<String>(address);
    }

}

这个代码比较复杂一点, 不过如果对比缓存的淘汰策略的话,这个其实就是所谓的"LFU":

LFU(The Least Frequently Used)最近不多使用算法,与LRU的区别在于LRU是以时间衡量,LFU是以时间段内的次数

  • 算法:若是一个数据在必定时间内被访问的次数很低,那么被认为在将来被访问的几率也是最低的,当规定空间用尽且需要放入新数据的时候,会优先淘汰时间段内访问次数最低的数据。

  • 优势:LFU也能够有效的保护缓存,相对场景来说,比LRU有更好的缓存命中率。由于是以次数为基准,因此更加准确,天然能有效的保证和提升命中率。

  • 缺点:由于LFU须要记录数据的访问频率,所以需要额外的空间;当访问模式改变的时候,算法命中率会急剧降低,这也是他最大弊端

所以这个策略里面为了实现LFU的效果在内存中声明了一个Map来来进行计数,其中的key为任务jobId,value为每个任务的调用次数:

  • 声明一个Map类型字段jobLfuMap来进行每个任务的调用次数计数,其中为ConcurrentMap原因是防止并发
  • jobLfuMap的value是一个HashMap<String, String>:其中key为任务的实例地址address,value为调用次数。这样设计的目的是为了通过这样的数据结构来达到,记录每一个任务jobId在每一个实例上的调用次数!与ROUND的区别是:ROUND记录的每个任务jobId的所有调用数次,LFU多了一个维度
  • 通过上面的这种数据结构,最终可以对每个实例的调用次数进行排序:所以要依赖一个ArrayList来排序,最终选取调用次数最少的实例来作为任务执行目标机器!

LEAST_RECENTLY_USED(最近最久未使用)

此策略的定义是:意思就是不论执行器有多少个,最久未使用的机器优先被选举。

这个策略的实现如下:


public class ExecutorRouteLRU extends ExecutorRouter {

    private static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();
    private static long CACHE_VALID_TIME = 0;

    public String route(int jobId, List<String> addressList) {

        // 缓存1天(24小时),然后重新计数
        // cache clear
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            jobLRUMap.clear();
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
        }

        // init lru
        LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
        if (lruItem == null) {
            /**
             * LinkedHashMap
             *      a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;
             *      b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
             */
            lruItem = new LinkedHashMap<String, String>(16, 0.75f, true);
            jobLRUMap.putIfAbsent(jobId, lruItem);
        }
        // 新加入的节点处理:添加到lru中进行统计
        // put new
        for (String address: addressList) {
            if (!lruItem.containsKey(address)) {
                lruItem.put(address, address);
            }
        }
         // 这里有一个删除动作,其实是因为实例可能动态上下线,对于下线的节点需要排除
        // remove old
        List<String> delKeys = new ArrayList<>();
        for (String existKey: lruItem.keySet()) {
            if (!addressList.contains(existKey)) {
                delKeys.add(existKey);
            }
        }
        if (delKeys.size() > 0) {
            for (String delKey: delKeys) {
                lruItem.remove(delKey);
            }
        }
        // 排序最后一个节点
        // load
        String eldestKey = lruItem.entrySet().iterator().next().getKey();
        String eldestValue = lruItem.get(eldestKey);
        return eldestValue;
    }

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = route(triggerParam.getJobId(), addressList);
        return new ReturnT<String>(address);
    }

}

 这个代码比较复杂一点, 不过如果对比缓存的淘汰策略的话,这个其实就是所谓的"LRU":

LRU(The Least Recently Used)最近最久未使用算法。相比于FIFO算法智能些。

  • 算法:若是一个数据最近不多被访问到,那么被认为在将来被访问的几率也是最低的,当规定空间用尽且需要放入新数据的时候,会优先淘汰最久未被访问的数据。

  • 优势:LRU能够有效的对访问比较频繁的数据进行保护,也就是针对热点数据的命中率提升有明显的效果。

缺点:对于周期性、偶发性的访问数据,有大几率可能形成缓存污染,也就是置换出去了热点数据,把这些偶发性数据留下了,从而致使LRU的数据命中率急剧降低。

所以这个策略里面为了实现LFU的效果在内存中声明了一个Map来来进行计数,其中的key为任务jobId,value为每个任务的调用次数: 

  • 声明一个Map类型字段jobLRUMap来进行每个任务的调用次数计数,其中为ConcurrentMap原因是防止并发
  • jobLRUMap的value是一个LinkedHashMap<String, String>:其中key为任务的实例地址address,value为address。这里比较巧妙的是直接利用了LinkedHashMap的排序能力
   /**
     * Constructs an empty <tt>LinkedHashMap</tt> instance with the
     * specified initial capacity, load factor and ordering mode.
     *
     * @param  initialCapacity the initial capacity
     * @param  loadFactor      the load factor
     * @param  accessOrder     the ordering mode - <tt>true</tt> for
     *         access-order, <tt>false</tt> for insertion-order
     * @throws IllegalArgumentException if the initial capacity is negative
     *         or the load factor is nonpositive
     */
    public LinkedHashMap(int initialCapacity,
                         float loadFactor,
                         boolean accessOrder) {
        super(initialCapacity, loadFactor);
        this.accessOrder = accessOrder;
    }
LinkedHashMap的排序模式

a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排期;

LinkedHashMap中'accessOrder‘字段的用途是什么?

LinkedHashMap是Java中的一个类,它是HashMap的一个子类,具有HashMap的所有特性,并且还保持了插入顺序或访问顺序的特性。

'accessOrder'字段是LinkedHashMap类中的一个布尔类型的属性,用于指定迭代顺序是否基于访问顺序。当accessOrder为true时,表示迭代顺序将基于最近访问顺序,即最近访问的元素将排在迭代顺序的末尾;当accessOrder为false时,表示迭代顺序将基于插入顺序,即元素将按照插入的顺序进行迭代。

使用accessOrder字段可以方便地实现LRU(Least Recently Used,最近最少使用)缓存淘汰算法。通过将accessOrder设置为true,当访问某个元素时,该元素会被移到链表的末尾,这样在需要淘汰元素时,只需要移除链表头部的元素即可。

LinkedHashMap的应用场景包括但不限于:

  1. 缓存系统:通过设置accessOrder为true,可以实现基于访问顺序的缓存淘汰策略。
  2. LRU缓存:通过继承LinkedHashMap并重写removeEldestEntry方法,可以实现固定大小的LRU缓存。
  3. 记录访问顺序:当需要按照访问顺序记录某些数据时,可以使用LinkedHashMap。

FAILOVER(故障转移)

此策略的定义是:按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度。

这个策略的实现如下:

public class ExecutorRouteFailover extends ExecutorRouter {

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {

        StringBuffer beatResultSB = new StringBuffer();
        for (String address : addressList) {
            // beat
            ReturnT<String> beatResult = null;
            try {
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                beatResult = executorBiz.beat();
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
            }
            beatResultSB.append( (beatResultSB.length()>0)?"<br><br>":"")
                    .append(I18nUtil.getString("jobconf_beat") + ":")
                    .append("<br>address:").append(address)
                    .append("<br>code:").append(beatResult.getCode())
                    .append("<br>msg:").append(beatResult.getMsg());

            // beat success
            if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {

                beatResult.setMsg(beatResultSB.toString());
                beatResult.setContent(address);
                return beatResult;
            }
        }
        return new ReturnT<String>(ReturnT.FAIL_CODE, beatResultSB.toString());

    }
}

这个策略的实现也比较直观,根据注册的实例列表依次发起心跳检测,如果成功,则选取为执行节点!代码并不复杂,不过可能对FAILOVER(故障转移)这个术语所震撼,觉得很高大上。

对于FAILOVER这种故障处理策略来说,不同的框架或者场景实现不同,难易程度也不同,而且也不是所有的系统/接口适合故障转移。比如对一个有超时机制的微服务架构来说:

如果链路比较多,一个业务请求需要经过A->B->C3个服务,每个服务有2个节点。假设在A服务调用B的时候,存在网络问题,A的实例A1失败,这里转移到A2成功;A->B的时候,B1失败,B2成功;同理B->C,C1失败,C2成功,虽然最终请求成功,但是整体耗时会增大一倍,早已经进过了网关(整体)响应时长导致timeout了,所以有些专题的FAILOVER并不一定是有益甚至有害的(重试也增加了服务调用次数,服务的压力)。关于这一块会后面单独开一节服务故障模式的讨论

BUSYOVER(忙碌转移)

此策略的定义是:按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度。

这个策略的实现如下:

public class ExecutorRouteBusyover extends ExecutorRouter {

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        StringBuffer idleBeatResultSB = new StringBuffer();
        for (String address : addressList) {
            // beat
            ReturnT<String> idleBeatResult = null;
            try {
                ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
                idleBeatResult = executorBiz.idleBeat(new IdleBeatParam(triggerParam.getJobId()));
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                idleBeatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
            }
            idleBeatResultSB.append( (idleBeatResultSB.length()>0)?"<br><br>":"")
                    .append(I18nUtil.getString("jobconf_idleBeat") + ":")
                    .append("<br>address:").append(address)
                    .append("<br>code:").append(idleBeatResult.getCode())
                    .append("<br>msg:").append(idleBeatResult.getMsg());

            // beat success
            if (idleBeatResult.getCode() == ReturnT.SUCCESS_CODE) {
                idleBeatResult.setMsg(idleBeatResultSB.toString());
                idleBeatResult.setContent(address);
                return idleBeatResult;
            }
        }

        return new ReturnT<String>(ReturnT.FAIL_CODE, idleBeatResultSB.toString());
    }

}

 这个策略的实现也比较直观,根据注册的实例列表依次发起空闲检测,如果成功,则选取为执行节点!代码并不复杂,不过可能需要联合起前面XXL-rpc的章节来配合理解

CONSISTENT_HASH(一致性HASH)

此策略的定义是:每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。

这个策略的实现如下:

/**
 * 分组下机器地址相同,不同JOB均匀散列在不同机器上,保证分组下机器分配JOB平均;且每个JOB固定调度其中一台机器;
 *      a、virtual node:解决不均衡问题
 *      b、hash method replace hashCode:String的hashCode可能重复,需要进一步扩大hashCode的取值范围
 * Created by xuxueli on 17/3/10.
 */
public class ExecutorRouteConsistentHash extends ExecutorRouter {

    private static int VIRTUAL_NODE_NUM = 100;

    /**
     * get hash code on 2^32 ring (md5散列的方式计算hash值)
     * @param key
     * @return
     */
    private static long hash(String key) {

        // md5 byte
        MessageDigest md5;
        try {
            md5 = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("MD5 not supported", e);
        }
        md5.reset();
        byte[] keyBytes = null;
        try {
            keyBytes = key.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException("Unknown string :" + key, e);
        }

        md5.update(keyBytes);
        byte[] digest = md5.digest();

        // hash code, Truncate to 32-bits
        long hashCode = ((long) (digest[3] & 0xFF) << 24)
                | ((long) (digest[2] & 0xFF) << 16)
                | ((long) (digest[1] & 0xFF) << 8)
                | (digest[0] & 0xFF);

        long truncateHashCode = hashCode & 0xffffffffL;
        return truncateHashCode;
    }

    public String hashJob(int jobId, List<String> addressList) {

        // ------A1------A2-------A3------
        // -----------J1------------------
        TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
        for (String address: addressList) {
            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                long addressHash = hash("SHARD-" + address + "-NODE-" + i);
                addressRing.put(addressHash, address);
            }
        }

        long jobHash = hash(String.valueOf(jobId));
        SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
        if (!lastRing.isEmpty()) {
            return lastRing.get(lastRing.firstKey());
        }
        return addressRing.firstEntry().getValue();
    }

    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = hashJob(triggerParam.getJobId(), addressList);
        return new ReturnT<String>(address);
    }

}

 这个策略的实现也比较直观,这里就不展开讨论了,有兴趣的可以在评论区晒出理解分享给大家!

路由策略的选择

上面介绍了各种路由策略的实现,关于这里面的路由策略的选择就不过多讨论了,建议默认情况采用下轮询!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1853234.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SLAM ORB-SLAM2(27)词袋模型

SLAM ORB-SLAM2(27)词袋模型 1. 词袋模型1.1. 词汇树1.2. 逆向索引表1.3. 逆向索引表2. 词袋向量3. 匹配候选帧3.1. 找出和当前帧具有公共单词的所有关键帧3.2. 找出和当前帧最多公共单词的关键帧3.3. 剔除共享单词数较少的关键帧3.4. 计算关键帧的共视关键帧组的总得分3.5. …

CentOS系统查看版本的各个命令

cat /etc/centos-release 查看CentOS版本 uname -a 命令的结果分别代表&#xff1a;当前系统的内核名称、主机名、内核发型版本、节点名、系统时间、硬件名称、硬件平台、处理器类型以及操作系统名称 cat /proc/version 命令用于查看Linux内核的版本信息。执行该命令后&#xf…

Ubuntu/Linux系统安装JDK1.8(带jdk1.8资源和操作教程)

文章目录 前言一、JDK1.8下载二、上传三、安装四、配置环境变量五、查看总结 前言 &#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;Ubuntu/Linux jdk1.8安装包&#xff…

JetBrains PyCharm 2024 mac/win版编程艺术,智慧新篇

JetBrains PyCharm 2024是一款功能强大的Python集成开发环境(IDE)&#xff0c;专为提升开发者的编程效率和体验而设计。这款IDE不仅继承了前代版本的优秀特性&#xff0c;还在多个方面进行了创新和改进&#xff0c;为Python开发者带来了全新的工作体验。 JetBrains PyCharm 20…

初阶《操作符详解》 3. 移位操作符

3. 移位操作符 <<  左移操作符 >>  右移操作符 注&#xff1a;移动的是二进制位&#xff0c;移位操作符的操作数只能是整数。 一个存储的二进制码分原码、反码、补码 1.十进制数据的二进制表现形式就是原码&#xff0c;原码最左边的一个数字就是符号位&#xff…

leetcode 二分查找·系统掌握

题目&#xff1a; 题解&#xff1a; 在阶梯数达到某一值后已有的硬币数量就小于了阶梯可以装的硬币数量&#xff0c;根据题意可以使用~10~泛型查找出最后一个可以被填满的阶梯。对于这类型可以二分答案的题目关键在于二分答案的上下界&#xff0c;本题的下界就是1上界就是硬币…

Redis-数据类型-Set(不允许重复)

文章目录 1、查看redis是否启动2、通过客户端连接redis3、切换到2数据库4、给key指定的set集合中存入数据&#xff0c;set会自动去重5、返回可以指定的set集合中所有的元素6、返回集合中元素的数量(set cardinality)7、检查当前指定member是否是集合中的元素8、从集合中删除元素…

[JS]数据类型

介绍 在计算中一切事物都是数据, 为了提高数据的存储和使用效率, 要对数据进行类型的分类 栈(操作系统): 由操作系统自动分配释放函数的参数值, 局部变量的值等, 其操作方式类似于数据结构中的栈; 基本数据类型存放在栈里面string, number, boolean, undefined, null 堆(操作…

C++STL 初阶(5)vector的简易实现(上)

不同于string只实现一个最简单的版本&#xff0c;vector在此处我们要实现的是模版类&#xff0c;类模版的声明和定义分离非常不方便&#xff08;会在链接时报错&#xff09;&#xff0c;所以我们都只在一个vector.h下去实现声明和定义。后续我们提及到的库里面实现的vector也是…

UEC++ 虚幻5第三人称射击游戏(一)

UEC 虚幻5第三人称射击游戏&#xff08;一&#xff09; 创建一个空白的C工程 人物角色基本移动 创建一个Character类添加一些虚幻商城中的基础动画 给角色类添加Camera与SPringArm组件 UPROPERTY(VisibleAnywhere, BlueprintReadOnly, Category "SpringArm")clas…

正点原子rk3588烧录linux和安卓镜像

1、烧录 Linux buildroot 系统镜像 1.1 进入 Loader 模式&#xff1a; 按住开发板上的 V&#xff08;音量&#xff09;按键不松&#xff0c;给开发板 上电或复位&#xff0c;此时烧录工具会提示&#xff1a;发现一个 LOADER 设备&#xff0c;表示开发板此时已经处于 Loader 模…

什么是深度神经网络?与深度学习、机器学习、人工智能的关系是什么?

什么是深度神经网络&#xff1f;与深度学习、机器学习、人工智能的关系是什么&#xff1f; &#x1f916;什么是深度神经网络&#xff1f;与深度学习、机器学习、人工智能的关系是什么&#xff1f;摘要引言正文内容1. 什么是深度神经网络&#xff1f;&#x1f9e0;1.1 深度神经…

IDEA Plugins中搜索不到插件解决办法

IDEA中搜不到插件有三种解决方案&#xff1a; 设置HTTP选项&#xff0c;可以通过File->Settings->Plugins->⚙->HTTP Proxy Settings进行设置 具体可参考这篇博文&#xff1a;IDEA Plugins中搜索不到插件解决办法本地安装&#xff0c;ile->Settings->Plugin…

Linux下Cmake安装或版本更新

下载Cmake源码 https://cmake.org/download/ 找到对应的版本和类型 放进linux环境解压 编译 安装 tar -vxvf cmake-3.13.0.tar.gz cd cmake-3.13.0 ./bootstrap make make install设置环境变量 vi ~/.bashrc在文件尾加入 export PATH/your_path/cmake-3.13.0/bin:$PAT…

App推广告别邀请码,Xinstall助您一键触达海量用户!

在移动互联网高速发展的今天&#xff0c;App的推广与运营已成为每个开发者都必须面对的问题。然而&#xff0c;随着互联网流量的日益分散和用户需求的不断变化&#xff0c;传统的App推广方式已经难以满足现代市场的需求。尤其是在获取用户时&#xff0c;很多开发者还在采用传统…

面向对象修炼手册(二)(消息与继承)(Java宝典)

&#x1f308; 个人主页&#xff1a;十二月的猫-CSDN博客 &#x1f525; 系列专栏&#xff1a; &#x1f3c0;面向对象修炼手册 &#x1f4aa;&#x1f3fb; 十二月的寒冬阻挡不了春天的脚步&#xff0c;十二点的黑夜遮蔽不住黎明的曙光 目录 前言 消息传递 1 基本概念 1.…

关于read,write,open时出现的文本文件和二进制文件读写的问题(怎么写入怎么读)

1、发现问题 使用read读取文本文件&#xff0c;一般采用字符空间作为缓存&#xff0c;最后输出&#xff1b; 使用read读取二进制文件&#xff0c;这里采用整数读取的展示&#xff1a; 首先创建文本文件&#xff0c;用write写入i的值到文件中&#xff1b; 再通过lseek改变读写一…

1.Triangle

一、你好&#xff0c;三角形 在OpenGL中&#xff0c;任何事物都在3D空间中&#xff0c;而屏幕和窗口却是2D像素数组&#xff0c;这导致OpenGL的大部分工作都是关于把3D坐标转变为适应你屏幕的2D像素。 3D坐标转为2D坐标的处理过程是由OpenGL的图形渲染管线&#xff08;Graphi…

松下课堂 | 什么是EPS?通过马达来辅助转向操作的系统

EPS , 松下 EPS是一种通过马达来减轻和辅助驾驶员在转向操作时所需力量的设备。此外&#xff0c;通过采用EPS&#xff0c;可望提高燃效&#xff0c;降低车辆重量。。。 背景 EPS是一种通过马达来减轻和辅助驾驶员在转向操作时所需力量的设备。此外&#xff0c;通过采用EPS&…

深度学习笔记: 最详尽解释欠拟合(高偏差)和过拟合(高方差)

欢迎收藏Star我的Machine Learning Blog:https://github.com/purepisces/Wenqing-Machine_Learning_Blog。如果收藏star, 有问题可以随时与我交流, 谢谢大家&#xff01; 欠拟合&#xff08;高偏差&#xff09;和过拟合&#xff08;高方差&#xff09; 在机器学习和统计建模中…