源码地址:https://github.com/lhj502819/IRpc/tree/v3
系列文章:
- 注册中心模块实现
- 路由模块实现
- 序列化模块实现
- 过滤器模块实现
为什么需要路由模块?
在当今互联网日益发展的情况下,我们一个服务一般都会部署多个,一方面可以均摊压力,另一方面也可以增加容错性,提高我们系统的稳定性。
但这种情况无疑会提升系统的复杂性,这里我们只讨论在进行RPC远程调用的时候我们需要考虑的事情。如果只有一个服务提供者Provider的情况下,直接根据ip + port请求即可,如果有多个Provider的话,那么就需要一套合适的负载均衡算法去选择一个合适的Provider。
如果没有路由模块的话,我们也可以很简单的实现,比如在上一版本中我们通过jdk自带的Random
函数进行的随机选择。
但这样做有以下几个弊端:
- 假设目标机器的性能不一致,如何对机器进行权重分配?
- 每次都要执行
Random
函数,在高并发情况下对CPU的消耗较高; - 如何基于路由策略做ABTest?
因此我们单独抽象出一个模块来做这些工作,也就是路由模块。
jdk Random随机函数的缺点
通过查看Random
函数的源码我们就能知道,由于Random
函数底层会调用System.nanTome()
,此函数会发起一次系统调用,而系统调用就涉及到CPU的状态切换,对性能的消耗是极大的。因此我们如果需要用到随机算法的话,最好自己实现一套。
路由抽象
public interface IRouter {
/**
* 刷新路由数组
* @param selector
*/
void refreshRouterArr(Selector selector);
/**
* 获取对应provider的连接通道
* @param selector
* @return
*/
ChannelFutureWrapper select(Selector selector);
/**
* 更新权重值
*/
void updateWeight(URL url);
}
负载均衡算法
随机算法
对应源代码中的cn.onenine.irpc.framework.core.router.RandomRouterImpl
实现思想:提前将所有的连接打乱顺序,随机放到数组中,也能达到随机访问的效果,但访问的顺序是不变的。当Client连接完成后,则调用此方法打乱顺序。
public void refreshRouterArr(Selector selector) {
List<ChannelFutureWrapper> channelFutureWrappers = CONNECT_MAP.get(selector.getProviderServiceName());
ChannelFutureWrapper[] arr = new ChannelFutureWrapper[channelFutureWrappers.size()];
//提权生成调用先后顺序的随机数组
int[] result = createRandomIndex(arr.length);
//按照随机数组中的数字顺序,将所有的provider channel放入新的Channel数组中
for (int i = 0; i < result.length; i++) {
arr[i] = channelFutureWrappers.get(result[i]);
}
SERVICE_ROUTER_MAP.put(selector.getProviderServiceName(), arr);
}
/**
* 创建随机乱序数组
*/
public static Integer[] createRandomArr(Integer[] arr) {
int total = arr.length;
Random ra = new Random();
for (int i = 0; i < total; i++) {
int j = ra.nextInt(total);
if (i == j) {
continue;
}
int temp = arr[i];
arr[i] = arr[j];
arr[j] = temp;
}
return arr;
}
权重算法
每个Provider在向注册中心注册的时候,都会设置自身的权重值为100,Client会在与Provider建立连接之后开启一个NodeData Watcher,当监听到Provider节点数据发生变化时,则会发起一个自定义的事件IRpcNodeChangeEvent
,通知我们的路由策略进行权重刷新(updateWeight
)。
如下为核心实现逻辑:
@Override
public void updateWeight(URL url) {
List<ChannelFutureWrapper> channelFutureWrappers = CONNECT_MAP.get(url.getServiceName());
//创建根据权重值创建对应的数组,权重大的其index在数组中占比大
//比如channelFutureWrappers的第3个weight占比为50%,其他的4个总共占比50%
//那么weightArr中则大概长这样:3,3,3,3,0,1,2,4
Integer[] weightArr = createWeightArr(channelFutureWrappers);
Integer[] randomArr = createRandomArr(weightArr);
ChannelFutureWrapper[] finalChannelFutureWrappers = new ChannelFutureWrapper[randomArr.length];
for (int i = 0; i < randomArr.length; i++) {
finalChannelFutureWrappers[i] = channelFutureWrappers.get(randomArr[i]);
}
SERVICE_ROUTER_MAP.put(url.getServiceName(),finalChannelFutureWrappers);
}
public static Integer[] createWeightArr(List<ChannelFutureWrapper> channelFutureWrappers) {
List<Integer> weightArr = new ArrayList<>();
for (int k = 0; k < channelFutureWrappers.size(); k++) {
Integer weight = channelFutureWrappers.get(k).getWeight();
int c = weight / 100;
for (int i = 0; i < c; i++) {
weightArr.add(k);
}
}
Integer[] arr = new Integer[weightArr.size()];
return weightArr.toArray(arr);
}
/**
* 创建随机乱序数组
*/
public static Integer[] createRandomArr(Integer[] arr) {
int total = arr.length;
Random ra = new Random();
for (int i = 0; i < total; i++) {
int j = ra.nextInt(total);
if (i == j) {
continue;
}
int temp = arr[i];
arr[i] = arr[j];
arr[j] = temp;
}
return arr;
}
轮询算法
通过自增计数,对数组长度取余的方式进行轮询访问。
public class ChannelFuturePollingRef {
private AtomicLong referenceTimes = new AtomicLong(0);
/**
* 对Providers实现轮询访问
*/
public ChannelFutureWrapper getChannelFutureWrapper(String serviceName) {
ChannelFutureWrapper[] wrappers = SERVICE_ROUTER_MAP.get(serviceName);
//自增取余,顺序访问
//0 % 10 = 0; 1 % 10 = 1; 2 % 10 = 2 ;....;11 % 10 = 1
long i = referenceTimes.getAndIncrement();
int index = (int) (i % wrappers.length);
return wrappers[index];
}
}
其他路由算法
- 最小连接数
需要记录每个应用服务器正在处理的连接数,然后将新来的请求转发到最少的那台上。
- 分布式哈希一致性算法
分布式哈希一致性算法在实际使用时可能会出现“哈希倾斜”问题,为了解决这类问题,通常在算法的内部会设计一些虚拟节点,从而平衡请求的均匀性。
- ip的hash算法
通过将源地址通过hash计算,定位到具体的一台机器上,但是如果一旦某台机器崩溃的话,该IP的请求就会直接失败,容错性不强。
路由策略配置化
将具体的路由策略通过配置的方式,使用起来更加灵活。在Client初始化的时候,会根据不同的配置选择对应的路由策略实现。
private void initConfig() {
//初始化路由策略
String routeStrategy = clientConfig.getRouteStrategy();
if (RANDOM_ROUTER_TYPE.equals(routeStrategy)) {
IROUTER = new RandomRouterImpl();
} else if (ROTATE_ROUTER_TYPE.equals(routeStrategy)) {
IROUTER = new RotateRouterImpl();
}
}
总结
本次我们完成了RPC框架中路由层的设计与实现,并实现了随机路由算法、根据权重进行访问和轮询算法。