Dubbo 源码解读:负载均衡策略

news2024/11/18 23:45:52

概览

org.apache.dubbo包下META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.LoadBalance中内部spi实现类有以下几种:

random=org.apache.dubbo.rpc.cluster.loadbalance.RandomLoadBalance
roundrobin=org.apache.dubbo.rpc.cluster.loadbalance.RoundRobinLoadBalance
leastactive=org.apache.dubbo.rpc.cluster.loadbalance.LeastActiveLoadBalance
consistenthash=org.apache.dubbo.rpc.cluster.loadbalance.ConsistentHashLoadBalance
shortestresponse=org.apache.dubbo.rpc.cluster.loadbalance.ShortestResponseLoadBalance

在这里插入图片描述
通过ConsumerConfig可设置负载均衡策略:

		...
        //<editor-fold desc="consumer 配置">
        // 负载均衡策略
        // * random - 随机;
        // * roundrobin - 轮询;
        // * leastactive - 最少活跃调用;
        // * consistenthash - 哈希一致 (2.1.0以上版本);
        // * shortestresponse - 最短响应 (2.7.7以上版本);
        ConsumerConfig consumerConfig = new ConsumerConfig();
        consumerConfig.setLoadbalance("roundrobin");
        //</editor-fold>
        ...

另外,对于加权随机,加权轮询等策略都集成自以上的策略中,consumer会检查注册中心中provider是否提供weight参数自动开启加权负载均衡。除了哈希一致策略,其他的均有加权版本(不提供weight参数即权重一样)。

在provider端,我们可以设置ServiceConfig中设置weight,注意:在RegistryConfig中也可以设置weight,不过这是在多注册中心的环境下对该注册中心负载均衡的权重,不是某个服务调用的负载均衡权重。

		//<editor-fold desc="服务配置">
        ServiceConfig<GreetingsService> hiConfig = new ServiceConfig<>();
        hiConfig.setInterface(GreetingsService.class);
        hiConfig.setRef(new GreetingsServiceImpl());
        // 权重
        hiConfig.setWeight(2);
		//</editor-fold>
		
		//<editor-fold desc="registry配置">
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress("zookeeper://127.0.0.1:2181");
        // 多个registry时,该registry的权重
        registryConfig.setWeight(2);
        //</editor-fold>

此时,zk为例注册的服务如下:
在这里插入图片描述
该节点名称最后会显示weight参数:

dubbo%3A%2F%2F192.168.247.1%3A20880%2Forg.example.protocol.dubbo.GreetingsService%3Fanyhost%3Dtrue%26application%3Dfirst-dubbo-provider%26application.version%3D1.0.0%26background%3Dfalse%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dorg.example.protocol.dubbo.GreetingsService%26methods%3DsayHi%26owner%3Dbty%26pid%3D2716%26release%3D3.1.6%26service-name-mapping%3Dtrue%26side%3Dprovider%26timestamp%3D1677208111268%26weight%3D2

AbstractLoadBalance

该abstract类只有一个作用:获取provider的权重,提升代码复用率。
注:如果provider没有提供weight参数,则默认为100.
其中,randomroundrobin加权时每次都起作用;而leastactiveshortestresponse是在存在符合选取条件的provider有多个时使用加权随机在其中选一个;consistenthash没用到。

RandomLoadBalance


public class RandomLoadBalance extends AbstractLoadBalance {
	
	// invokers代表需要负载均衡的provider列表
	// URL代表该consumer的metadata,和zk中/dubbo/metadata/com.xxx.service/consumer/xxx-consumer节点内容相同
	@Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        int length = invokers.size();
        
		// 根据从registry拿到的参数中寻找是否有weight参数或timestamp参数来判断
		// 是否需要加权操作
        if (!needWeightLoadBalance(invokers, invocation)) {
        	// 没有直接random选取
            return invokers.get(ThreadLocalRandom.current().nextInt(length));
        }

        // 标志位,权重是否都一样
        boolean sameWeight = true;
        // weights序列,不过 weights[i]是0到i的provider节点实际weight的和
        // 如:
        // 节点索引i    : 0    1    2
        // 节点i的weight: 2    3    5
        //  weights[i] : 2  2+3=5 2+3+5=10
        // 应该叫weightSum更合适
        // 这是为了后面随机选取节点用,随机数大小落入哪个区间,就选哪个节点
        int[] weights = new int[length];
        // The sum of weights
        int totalWeight = 0;
        for (int i = 0; i < length; i++) {
            int weight = getWeight(invokers.get(i), invocation);
            // Sum
            totalWeight += weight;
            // save for later use
            weights[i] = totalWeight;
            // 如果当前weight不是之前weight的和的索引倍数,则清空same标志
            // 索引:   0    1    2
            //   w:    3    3    2
            // total:  3    6    8
            // w*(i+1) 3    6    6
            //  same?  √    √    ×
            if (sameWeight && totalWeight != weight * (i + 1)) {
                sameWeight = false;
            }
        }
        // 如果权重不等 且 至少有一个invoker的权重大于0
        if (totalWeight > 0 && !sameWeight) {
            // 根据权重和计算随机offset
            int offset = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < length; i++) {
            	// 选取第一个小于offset的节点i做调用
                if (offset < weights[i]) {
                    return invokers.get(i);
                }
            }
        }
        // 权重相等
        return invokers.get(ThreadLocalRandom.current().nextInt(length));
    }


}


ThreadLocalRandom随机数生成

RandomLoadBalance中随机数生成使用ThreadLocalRandom,该类始于JDK1.7,由Doug Lea操刀编写。

优点:When applicable, use of ThreadLocalRandom rather than shared Random objects in concurrent programs will typically encounter much less overhead and contention.
用法:ThreadLocalRandom.current().nextX(…) (where X is Int, Long, etc).
缺点:Instances of ThreadLocalRandom are not cryptographically secure. Consider instead using java.security.SecureRandom in security-sensitive applications. Additionally, default-constructed instances do not use a cryptographically random seed unless the system property java.util.secureRandomSeed is set to true.

RoundRobinLoadBalance

public class RoundRobinLoadBalance extends AbstractLoadBalance {
	
	 protected static class WeightedRoundRobin {
	 	private int weight;
        private AtomicLong current = new AtomicLong(0);
        private long lastUpdate;
		...
		// 每次加一个权值
		public long increaseCurrent() {
            return current.addAndGet(weight);
        }
	}
	...
	// 接口名称:<providerId:WeightedRoundRobin>
	// 如:
	// org.example.protocol.dubbo.GreetingsService.sayHi : < dubbo://192.168.247.1:20882/org.example.protocol.dubbo.GreetingsService , WeightedRoundRobin >
	 private ConcurrentMap<String, ConcurrentMap<String, WeightedRoundRobin>> methodWeightMap = new ConcurrentHashMap<String, ConcurrentMap<String, WeightedRoundRobin>>();

	
	...
	
	@Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    	// <接口.方法>,如org.example.protocol.dubbo.GreetingsService.sayHi
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        // 初始化
        ConcurrentMap<String, WeightedRoundRobin> map = methodWeightMap.computeIfAbsent(key, k -> new ConcurrentHashMap<>());
        // 权重和
        int totalWeight = 0;
        // 记录当前最大值
        long maxCurrent = Long.MIN_VALUE;
        // 记录更新时间,一个接口的所有provider的时间都一样。
        long now = System.currentTimeMillis();
        // 选中的
        Invoker<T> selectedInvoker = null;
        // 选中的
        WeightedRoundRobin selectedWRR = null;
        // 每次都会循环一遍,不会中途退出
        for (Invoker<T> invoker : invokers) {
        	// dubbo://192.168.247.1:20882/org.example.protocol.dubbo.GreetingsService
            String identifyString = invoker.getUrl().toIdentityString();
            // 获取权重,如果没有权重,则默认权重都一样,均为100
            int weight = getWeight(invoker, invocation);
            // 没有缓存,则为GreetingsService的这个provider设置weight,生成WeightedRoundRobin对象
            // 缓存了直接获取
            WeightedRoundRobin weightedRoundRobin = map.computeIfAbsent(identifyString, k -> {			
                WeightedRoundRobin wrr = new WeightedRoundRobin();
                wrr.setWeight(weight);
                return wrr;
            });
			// 缓存的WeightedRoundRobin可能会过时,这里判定以下,保持最新的weight
            if (weight != weightedRoundRobin.getWeight()) {
                //weight changed
                weightedRoundRobin.setWeight(weight);
            }
            // 更新
            // 每次每个provider都会先增加自己权重的值
            long cur = weightedRoundRobin.increaseCurrent();
            // 标记更新时间
            weightedRoundRobin.setLastUpdate(now);
            // 如果当前值大于最大值,则选取
            if (cur > maxCurrent) {
                maxCurrent = cur;
                selectedInvoker = invoker;
                selectedWRR = weightedRoundRobin;
            }
            // 
            totalWeight += weight;
        }
        if (invokers.size() != map.size()) {
            map.entrySet().removeIf(item -> now - item.getValue().getLastUpdate() > RECYCLE_PERIOD);
        }
        if (selectedInvoker != null) {
        	// 注意,选中的weightedRoundRobin的current会减去totalWeight;
            selectedWRR.sel(totalWeight);
            return selectedInvoker;
        }
        // should not happen here
        return invokers.get(0);
    }
}

轮询算法流程

在这里插入图片描述

轮次结果
0在这里插入图片描述
1在这里插入图片描述
2在这里插入图片描述
3在这里插入图片描述
4在这里插入图片描述
5在这里插入图片描述
6在这里插入图片描述

可以看到调用顺序为:A → \rightarrow B → \rightarrow A → \rightarrow C → \rightarrow B → \rightarrow A。

LeastActiveLoadBalance

按照最小调用次数优先的方式选provider,如果存在多个则加权随机选取。
Active等参数由RpcStatus提供,记录RpcStatus的工作由ActiveLimitFilter完成

public class LeastActiveLoadBalance extends AbstractLoadBalance {

	 @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        // Number of invokers
        int length = invokers.size();
        // The least active value of all invokers
        int leastActive = -1;
        // The number of invokers having the same least active value (leastActive)
        int leastCount = 0;
        // 存储所有具有相同least active value的invoker的index,数组长度==leastCount
        int[] leastIndexes = new int[length];
        // the weight of every invokers
        int[] weights = new int[length];
        // The sum of the warmup weights of all the least active invokers
        int totalWeight = 0;
        // The weight of the first least active invoker
        int firstWeight = 0;
        // Every least active invoker has the same weight value?
        boolean sameWeight = true;


        // 过滤找到调用次数最少的provider,可能存在多个provider
        for (int i = 0; i < length; i++) {
            Invoker<T> invoker = invokers.get(i);
            // Get the active number of the invoker
            int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive();
            // 获取权重参数,provider没有提供则默认100
            int afterWarmup = getWeight(invoker, invocation);
            weights[i] = afterWarmup;
            // 如果是第一次调用,或当前active小于最小的,设置新的leastActive
            if (leastActive == -1 || active < leastActive) {
              ...
            } else 
            // 多个leastActive
            if (active == leastActive) {
              ...
            }
        }
        // 如果符合最少调用次数的provider只有一个,直接返回
        if (leastCount == 1) {
            return invokers.get(leastIndexes[0]);
        }
        // 
        if (!sameWeight && totalWeight > 0) {
            // 权重不等且totalWeight>0则利用权重随机选取
            int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight);
            // Return a invoker based on the random value.
            for (int i = 0; i < leastCount; i++) {
           		// 找到相同权重的provider第一个比随机选取的offsetWeight大的provider
                int leastIndex = leastIndexes[i];
                offsetWeight -= weights[leastIndex];
                if (offsetWeight < 0) {
                    return invokers.get(leastIndex);
                }
            }
        }
        // 如果所有provider的weight相同或totalWeight==0则随机选取.
        return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]);
    }


}

ShortestResponseLoadBalance

同上,利用RpcStatus中获取的参数选取响应最低的provider(滑动时间窗口内),存在多个则加权随机选取。

ConsistentHashLoadBalance

public class ConsistentHashLoadBalance extends AbstractLoadBalance {

	// 对每一个接口方法缓存一个ConsistentHashSelector
	// key为interface.methodName
    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        String methodName = RpcUtils.getMethodName(invocation);
        String key = invokers.get(0).getUrl().getServiceKey() + "." + methodName;
        // 用来代表List<Invoker<T>>,如果内容变化,则该hashcode也变化,为了保持缓存一致性
        int invokersHashCode = invokers.hashCode();
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        // 缓存的ConsistentHashSelector为空,或已经过期
        if (selector == null || selector.identityHashCode != invokersHashCode) {
            selectors.put(key, new ConsistentHashSelector<T>(invokers, methodName, invokersHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        return selector.select(invocation);
    }

	 private static final class ConsistentHashSelector<T> {
		
		// 虚拟节点和Invoker对应关系
        private final TreeMap<Long, Invoker<T>> virtualInvokers;
		
		// 一致性哈希,哈希环虚拟节点个数
        private final int replicaNumber;
		
        private final int identityHashCode;
		// select时候以方法的那个参数为key进行hash映射到hash环,默认第一个参数
        private final int[] argumentIndex;

		ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = identityHashCode;
            URL url = invokers.get(0).getUrl();
            // 如果hash.nodes没有指定,默认每个provider160个节点
            this.replicaNumber = url.getMethodParameter(methodName, HASH_NODES, 160);
            
            String[] index = COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, HASH_ARGUMENTS, "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            // 为每个invoker分配哈希环160个虚拟节点
            for (Invoker<T> invoker : invokers) {
                String address = invoker.getUrl().getAddress();
                for (int i = 0; i < replicaNumber / 4; i++) {
                    byte[] digest = Bytes.getMD5(address + i);
                    for (int h = 0; h < 4; h++) {
                        long m = hash(digest, h);
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }
		 public Invoker<T> select(Invocation invocation) {
            boolean isGeneric = invocation.getMethodName().equals($INVOKE);
            // 获取此次invocation的hash的key
            String key = toKey(invocation.getArguments(),isGeneric); 
            byte[] digest = Bytes.getMD5(key);
            // 映射
            return selectForKey(hash(digest, 0));
        }
	}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/370281.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

这次说说腾讯的一场 35K—55K 的 Android 高工面试

一、面试的由来 事情是这样的&#xff0c;因为跟公司发展一些想法的不同&#xff0c;早在十月份的时候就有了跳槽的想法&#xff0c;但是碍于老大的面子就一直就没有跟人事说出口&#xff0c;打算着等到年后金三银四在试试跳槽。 但是发生一件事终于让我忍不住了&#xff0c;…

阿里限量出产Elasticsearch学习手册,确定不心动?

前言只有光头才能变强。不知道大家的公司用Elasticsearch多不多&#xff0c;反正我公司的是有在用的。平时听同事们聊天肯定避免不了不认识的技术栈&#xff0c;例如说&#xff1a;把数据放在引擎&#xff0c;从引擎取出数据等等。如果对引擎不了解的同学&#xff0c;就压根听不…

一,下载iPerf3最新源代码

本文目录普通下载方式&#xff1a;git下载方式&#xff1a;普通下载方式&#xff1a; 如果你只是要阅读源代码&#xff0c;不涉及到编译安装修改源代码&#xff0c;那么可以简单的通过此方式下载代码。如果你希望编译安装修改源代码&#xff0c;那么建议通过git来进行源代码的…

软件测试之测试模型

软件测试的发展 1960年代是调试时期&#xff08;测试即调试&#xff09; 1960年 - 1978年 论证时期&#xff08;软件测试是验证软件是正确的&#xff09;和 1979年 - 1982年 破坏性测试时期&#xff08;为了发现错误而执行程序的过程&#xff09; 1983年起&#xff0c;软件测…

计算机网络笔记、面试八股(三)—— HTTPS协议

本章目录3. HTTPS协议3.1 HTTPS协议简介3.2 SSL/TLS协议3.2.1 SSL/TLS功能的实现3.3 HTTP和HTTPS的区别3.4 HTTPS协议的优点3.5 HTTPS协议的缺点3.6 HTTPS协议的工作流程3.7 HTTPS是如何解决HTTP的缺点的3.7.1 解决内容可能被窃听的问题——加密3.7.1.1 方法1.对称加密3.7.1.2 …

go cobra初试

cobra开源地址 https://github.com/spf13/cobra cobra是什么 Cobra is a library for creating powerful modern CLI applications. Cobra is used in many Go projects such as Kubernetes, Hugo, and GitHub CLI to name a few. This list contains a more extensive lis…

web中git漏洞的形成的原理及使用

目录 1.Git漏洞的成因 1.不正确的权限设置&#xff1a; 2.代码注入漏洞&#xff1a; 3.未经身份验证的访问&#xff1a; 4.非安全传输&#xff1a; 5.跨站脚本攻击&#xff08;XSS&#xff09;&#xff1a; 2.git泄露环境的搭建 git init&#xff1a; git add&#xff1…

JWT(token) 认证

JSON Web Token&#xff08;缩写 JWT&#xff09;是目前最流行的跨域认证解决方案&#xff0c;本文介绍它的原理和用法。一、跨域认证的问题互联网服务离不开用户认证。一般流程是下面这样。1、用户向服务器发送用户名和密码。2、服务器验证通过后&#xff0c;在当前对话&#…

计算机网络笔记、面试八股(二)——HTTP协议

本章目录2. HTTP协议2.1 HTTP协议简介2.2 HTTP协议的优点2.3 HTTP协议的缺点2.4 HTTP协议属于哪一层2.5 HTTP通信过程2.6 常见请求方法2.7 GET和POST的区别2.8 请求报文与响应报文2.8.1 HTTP请求报文2.8.2 HTTP响应报文2.9 响应状态码2.10 HTTP 1.0和1.1的区别2.10.1 长连接2.1…

Lecture6 逻辑斯蒂回归(Logistic Regression)

目录 1 常用数据集 1.1 MNIST数据集 1.2 CIFAR-10数据集 2 课堂内容 2.1 回归任务和分类任务的区别 2.2 为什么使用逻辑斯蒂回归 2.3 什么是逻辑斯蒂回归 2.4 Sigmoid函数和饱和函数的概念 2.5 逻辑斯蒂回归模型 2.6 逻辑斯蒂回归损失函数 2.6.1 二分类损失函数 2.…

3-1 图文并茂说明raid0,raid1, raid10, raid01, raid5等原理

文章目录简介RAID类型RAID0RAID1RAID5RAID6RAID10RAID01RAID对比图简介 一、RAID 是什么&#xff1f; RAID &#xff08; Redundant Array of Independent Disks &#xff09;即独立磁盘冗余阵列&#xff0c;简称为「磁盘阵列」&#xff0c;其实就是用多个独立的磁盘组成在一起…

Jenkins第一讲

目录 一、Jenkins 1.1 敏捷开发与持续集成 1.1.1 敏捷开发 1.1.2 持续集成 1.2 持续集成工具 1.2.1 jenkins和hudson 1.2.2 技术组合 1.2.3 部署方式对比 1.3 安装Jenkins 1.3.1 下载Jenkins的war包 1.3.2 开启Jenkins 1.4 Jenkins全局安全配置 1.5 使用Jenkins部…

InfluxDB docker安装与界面的使用

influxdb github主页&#xff1a;https://github.com/influxdata/influxdb chronograf github主页&#xff1a;https://github.com/influxdata/chronograf Docker安装InfluxDB docker run -p 8086:8086 --name influxdb-dev influxdb:latest这里博主安装的是2.2.1版本 然后…

Python学习-----排序问题2.0(sort()函数和sorted()函数)

目录 前言&#xff1a; 1.sort() 函数 示例1&#xff1a;阿斯克码比较 示例2&#xff1a;&#xff08;设置reverse&#xff0c;由大到小排序&#xff09; 示例3&#xff1a;基于key排序&#xff08;传入一个参数&#xff09; 示例4&#xff1a;key的其他应用 2.sorted() …

平时技术积累很少,面试时又会问很多这个难题怎么破?别慌,没事看看这份Java面试指南,解决你的小烦恼!

前言技术面试是每个程序员都需要去经历的事情&#xff0c;随着行业的发展&#xff0c;新技术的不断迭代&#xff0c;技术面试的难度也越来越高&#xff0c;但是对于大多数程序员来说&#xff0c;工作的主要内容只是去实现各种业务逻辑&#xff0c;涉及的技术难度并不高&#xf…

Allegro如何画Photoplot_Outline操作指导

Allegro如何画Photoplot_Outline操作指导 在用Allegro进行PCB设计的时候,最后进行光绘输出前,Photoplot_Outline是必备一个图形,所有在Photoplot_Outline中的图形将被输出,Photoplot_Outline以外的图形都将不被输出。 如何绘制Photoplot_Outline,具体操作如下 点击Shape点…

视觉人培训团队把它称之为,工业领域人类最伟大的软件创造,它的名字叫Halcon

目前为止&#xff0c;世界上综合能力强大的机器视觉软件&#xff0c;&#xff0c;它的名字叫Halcon。 视觉人培训团队把它称之为&#xff0c;工业领域人类最伟大的软件创造&#xff0c;它的名字叫Halcon。 持续不断更新最新的图像技术&#xff0c;软件综合能力持续提升。 综…

常量和变量——“Python”

各位CSDN的uu们你们好呀&#xff0c;今天&#xff0c;小雅兰的内容是Python的一些基础语法噢&#xff0c;会讲解一些常量和变量的知识点&#xff0c;那么&#xff0c;现在就让我们进入Python的世界吧 常量和表达式 变量和类型 变量是什么 变量的语法 变量的类型 常量和表达式 …

go面向对象思想封装继承多态

go貌似都没有听说过继承&#xff0c;当然这个继承不像c中通过class类的方式去继承&#xff0c;还是通过struct的方式&#xff0c;所以go严格来说不是面向对象编程的语言&#xff0c;c和java才是&#xff0c;不过还是可以基于自身的一些的特性实现面向对象的功能&#xff0c;面向…

TCP 的演化史-byte stream 和 packet

不想写太多代码&#xff0c;我想直接抄一个 TCP sack 实现&#xff0c;参考了 lwIP TCP&#xff0c;很遗憾&#xff1a;TCP: Implement handling received SACKs 无奈不得不自己实现 sack option 的处理。由于 tso/gso/lro/gro&#xff0c;在软件层面难免遇到下面的情况&#…