看到今年Pulsar 峰会上挺多人分享负载均衡的内容,这里也整理分享一下相关的内容。
社区现有策略的分析
LoadSheddingStrategy
pulsar进行shedding的时候,使用的是ThresholdShedder类,ThresholdShedder类是LoadSheddingStrategy接口的其中一个实现,默认使用它。
不同实现的关键不同点在于:
·什么样的broker判定为超载
·判定为超载后,对哪些bundles进行unload
ThresholdShedder 是
·计算出所有broker的资源平均使用率avg,如果某个broker资源使用率>avg+y,则判定为超载
·判定为超载后,计算出需要卸载多少流量(卸载到avg+y-0.05),然后优先选大的bundle来卸载
而OverloadShedder是
·配置loadBalancerBrokerOverloadedThresholdPercentage资源使用率阈值,当某个broker的最大资源使用率超过该值,则判断为超载
·判定为超载后,计算出需要卸载多少流量(卸载到 资源使用率阈值-0.05),然后优先选大的bundle来卸载
深入看看ThresholdShedder算法逻辑:
ThresholdShedder 首先使用如下公式计算出所有 Broker 的平均资源使用率。
对每个 Broker:
usage =
max (
%cpu * cpuWeight
%memory * memoryWeight,
%bandwidthIn * bandwidthInWeight,
%bandwidthOut * bandwidthOutWeight) / 100;
usage = x * prevUsage + (1 - x) * usage
avgUsage = sum(usage) / numBrokers
如果 Broker 的资源使用率大于 avgUsage + y,则被认为过载。
• 资源使用率的权重(Weight)默认为 1.0,可通过 loadBalancerResourceWeight 进行配置。
• 历史使用率乘子 x 可通过 loadBalancerHistoryResourcePercentage 进行配置。其默认值是 0.9,历史使用率比最近使用率的权重更大。
• avgUsage 缓冲值 y 可通过 loadBalancerBrokerThresholdShedderPercentage 进行配置,默认值是 10%。
后面的分析我们可以看到,这是使用打分算法是很有问题的!为了避免流量毛刺导致没必要负载均衡,不应该采用这种方法。
负载均衡能力 | bundles与broker绑定关系的稳定性 | |
ThresholdShedder | 良好,只要某个broker的负载超过集群平均水平一定阈值就会卸载流量 | 一般,当集群整体负载都不高,但是某些broker机器相对较高时也会卸载流量。 |
OverloadShedder | 良好,最大资源使用率达到阈值的broker会卸载流量给低负载broker; 但是只要没达到阈值就不会卸载流量。 | 良好,除非最大资源使用率达到阈值,否则都不改变。 而且如果加入新broker,会导致长时间没有流量均衡到该 broker 上,因为其他 broker 节点均没有达到 bundle unload 阈值。 |
ThresholdShedder 是社区默认的实现,也是大多数公司使用的,一般都是基于它进行优化,但是后面可以分析发现,它是有些瑕疵的。OverloadShedder几乎没啥人用,缺点明显。
UniformLoadShedder
UniformLoadShedder是在2.10.x版本提供的新的Shedder。
[pulsar-broker] add uniform load shedder strategy to distribute traffic uniformly across brokers by rdhabalia · Pull Request #12902 · apache/pulsar (github.com)
Optimize ThresholdShedder strategy: the low-load Broker cannot be fully utilized by lordcheng10 · Pull Request #12471 · apache/pulsar (github.com)
该Shedder解决ThresholdShedder无法平衡流量的一个场景。如
broke1 brokerAvgResourceUsage 80
......
broker100 brokerAvgResourceUsage 80
broker101 brokerAvgResourceUsage 10
则avgUsage=(80*100+10)/101=79,threshold使用默认的10时,此时就不会触发sheddling,因此无法把负载切到低负载的机器上。
但是,测试过程中这个shedder也效果不咋地。
除了shedding策略,还有放置策略。
ModularLoadManagerStrategy
从已过滤的可用 Broker 列表中选定一个 Broker来放置bundle,使用 ModularLoadManagerStrategy接口(默认LeastLongTermMessageRate)来挑选。
LeastLongTermMessageRate 策略计算 Broker 的负载分数scores,并从分数最小的 Broker 中随机选择一个,计分规则如下:
• 如果 CPU、内存和网络的最大本地使用率大于
LoadBalancerBrokerOverloadedThresholdPercentage(默认 85%),则设置 score=INF。
• 否则设置score= longTermMsgIn rate + longTermMsgOut rate。
还有新的放置策略。
LeastResourceUsageWithWeight
[feature][broker]Provide new load balance placement strategy implementation based on the least resource usage with weight by HQebupt · Pull Request #16281 · apache/pulsar · GitHub
PIP-182: Provide new load balance placement strategy implementation for ModularLoadManagerStrategy · Issue #16274 · apache/pulsar · GitHub
这个issue说,ThresholdShedder是根据负载来决定哪些bundles进行unload的,但是LeastLongTermMessageRate是根据长期速率来卸载的,这种不一致的标准会导致负载均衡的效果不太行,于是提出了LeastResourceUsageWithWeight。
The bundle placement strategy is LeastLongTermMessageRate, which selects a broker based on which one has the least long term message rate instead of load metric. The LeastLongTermMessageRate does not get along with ThresholdShedder well. Therefore, a load-based bundle placement strategy is necessary to cooperate with ThresholdShedder.
这个放置策略还有一个优化点:不是根据最低分来选择owner broker,而是把低于avgUsage一定的阈值的哪些broker当成候选broker,然后从owner broker列表随机选取。
若是使用最低分的broker作为候选broker,那么会有small randomization pool的问题,因为不同broker的打分大概率不相同,那么候选列表大概率只有一两个broker,那么一次shedding卸载出来的bundles可能全部加载到它身上了,导致这个broker又超载了。(因为doShedding的时候是先根据当前负载信息来决定新owner broker的,决定好之后只是把该bundle放入preallocatedBundleToBroker,而不是立马将它的负载加到broker上去了,而且,pulsar搜集主机的负载、资源使用率的信息默认是每一分钟执行一次的,这也会造成延迟。这些延迟就可能导致卸载出来的bundles全部加载到某个broker上,导致它也超载)
解决办法是,扩大随机池,池子大了,那么流量就会均摊到这几个broker上。
但是现在这个算法也会有small randomization pool的问题。如下讨论。
如果b1:4, b2:20, b3:20,则随机池里面只有b1,那么所有流量都会往b1上去,最终可能导致b1:90%, b2:20, b3:20。
这种情况可以把diffThreshold调成负数,则all of brokers can be candidates。
还有一些边缘情况。
·threshold =-15, {b1=25, b2=100, b3=100, b4=100, b5=100} => b2, b3, b4, b5 will be selected.
·threshold =10, {b1=24, b2=25, b3=26} => none will be selected.(代码里使用的兜底逻辑是,当没有候选brokers时,所有brokers都作为候选brokers)
需要遵循下面的规则来设置阈值,当集群低负载的brokers较多时,则使用负数阈值可以扩大随机池,当高负载的brokers较多时,则使用正数阈值可以避免高负载brokers成为候选brokers。
·negative threshold applies when a large number of brokers have less load.
·positive threshold applies when a large number of brokers have heavy load.
但其实还是不好确定阈值该如何选择,因为集群流量是变化的!
small randomization pool问题
这个问题的根本原因是,bundle卸载与加载是没有关联的,社区把这两个操作拆分成两个接口,目前的社区实现两者毫无关联,bundle卸载之后,要加载到哪里是不确定的,导致了small randomization pool问题,因此,要解决这个问题,最好的办法不是扩大随机池,而是在卸载时把信息传递给加载策略。
历史权值算法对shedding的影响
如当前集群有1一台broker1,负载为90%,由于其稳定运行了,因此打分为90,现加入一台broker2,其当前负载为10%,由于是新加入的,打分为null。
则
第一次执行shedding:broker1打分90,broker2打分10,如果不考虑历史数据,算法会移动部分bundles以使得两者的score相同,则shedding完成后broker1、broker2的负载均为50。
第二次执行shedding:broker1打分90*0.9+50*0.1=86,broker2打分10*0.9+50*0.1=14,两者打分仍然悬殊,虽然两者的负载已经相同,但是还会继续卸载流量。算法把打分当成了负载,因此,会从broker1中卸载36分对应的流量给broker2,导致broker1的负载变成14,broker2的负载变成86,负载倒过来了。
第三次执行shedding:broker1打分86*0.9+14*0.1=78.8,broker2打分14*0.9+86*0.1=21.2,这个时候还是判断broker2负载比broker1高,竟然还会从broker1卸载流量给broker2,那么会把broker1身上剩下的所有负载都鞋了,broker1变成跟broker2刚加入一样,不承担任何bundles,所有负载加到broker2上了。
经过多轮的迭代,broker1打分小于broker2打分一定阈值了,这个时候会从broker2身上卸载bundle给broker1,这样经过许多轮的迭代,broker1与broker2之间的负载才会控制在阈值25以内,而打分也趋近于真实负载值。
因此,这种历史权重高的打分算法,容易导致shedding次数大大增加。而该算法的设计初衷是:避免性能抖动导致无意义的shedding。
负载数据搜集时间间隔 与 执行负载均衡任务的时间间隔
默认执行shedding任务的时间间隔为1分钟,默认搜集资源使用率的时间间隔也是1分钟,那么可能有如下情况:
0分0秒时,执行shedding,0分5秒时搜集资源使用率,这个时候接收bundle的broker负载还没上去,因此搜集到的资源使用率还是shedding生效前的,那么1分0秒时执行第二次shedding任务,使用的资源使用率是0分5秒时搜集的数据,而这是不对的,错误的数据会导致错误的决策!
解决办法
AvgShedder为了解决上面两个问题,使用如下方法:统计次数,当连续多次执行shedding任务(如三次),都判定某个broker超载,则认为该broker确实要卸载流量。这样既避免了性能抖动,又避免了搜集到错误的数据来执行shedding。
如何实现也是个值得讨论的问题:
维护一个Map<Broker,Integer>,执行一次shedding时,当负载大小第一和倒数第一的差值超过阈值,则判断负载大小第一的broker为超载,记录进Map里;当负载大小第二和倒数第二的差值超过阈值,则也判断负载大小第二的broker为超载,记录进Map里,...
当第二次shedding发现某个broker没超载了,就剔除掉,连续多次都超载,就执行shedding。
但是会有下面特例:
有三台broker,broker1负载80,broker2负载80,broker3负载20。
则由于broker1和broker2的负载接近,可能第一次shedding broker1排第一位,与broker构成一个Pair<broker1,broker3>,记录Entry<broker1,1>;
然后第二次shedding broker2排第一位,与broker构成一个Pair<broker2,broker3>,记录Entry<broker2,1>,没判定broker1超载,因此去除了Entry<broker1,1>。
那么第三次shedding检查时,肯定就无法执行shedding。
为了避免这种情况,把Pair<broker1,broker3>中的broker3也记录进Map里,只要第三次shedding发现Pair<brokerX,brokerY>,brokerX或者brokerY中有一个在Map中统计次数达到3就执行shedding。
即上面流程:
第一次shedding:记录Entry<broker1,1>,Entry<broker3,1>
第二次shedding:记录Entry<broker2,1>,Entry<broker3,2>,剔除Entry<broker1,1>
第三次shedding:则无论第一名是broker1还是broker2,都会有Entry<broker3,3>,从而触发shedding。
对于更一般都情况,也是可以证明这个算法是正确的。
先抽象出来问题,设集合A={A1,...Am},B={B1,...,Bn},且min{A}-max{B}>=Threshold,且A集合内部各元素会交换值,B集合也是,每次shedding时,把选出来的Pair(Ai,Bj)记录进Entry<Ai,x+1>,Entry<Bj,y+1>,x、y分别为原来的entry value,并把这次没更新到的entry给删除掉。如何证明:第三次shedding时,必存在Entry<Ai,3>或Entry<Bi,3>。
证明:
不妨设m>=n,则每次shedding必选出如下n对pair。
则第三次shedding时,必有
证毕。
可见,前面这些策略都有各种各样的问题。对于一个流量周期稳定变化的集群,不同broker之间不应该频繁发生bundle切换,峰会上看到vivo在实践时每天发生bundle切换上百次,他们根据经验优化了参数后降低到了10来次,但是目前而言,pulsar发生bundle切换的成本是很高的,unload一个bundle就要断开所有连接,在现在pulsar事务功能还不稳定(存在较多bug,社区使用案例不多),这就容易造成数据重复等问题。
而实际上,几乎所有集群的流量特征都满足,只需要在集群启动时或者是集群broker发生上下线的时候需要做负载均衡,其他时间进行负载均衡几乎没有收益。
因为,我们重新设计了一个新的Shedder -- AvgShedder。
AvgShedder
它是一个新的shedder和新的放置策略,即把两个策略绑定了!
跟ThresholdShedder使用一样的打分算法,首先使用如下公式计算出所有 Broker 的平均资源使用率。
对每个 Broker:
usage =
max (
%cpu * cpuWeight
%memory * memoryWeight,
%bandwidthIn * bandwidthInWeight,
%bandwidthOut * bandwidthOutWeight) / 100;
usage = x * prevUsage + (1 - x) * usage
avgUsage = sum(usage) / numBrokers
然后比较最高和最低的broker之间的使用率差距,如果超过某个阈值(如30%),则执行shedding,卸载打分最高的broker的流量,尽量让两个broker的负载变成他们的平均值。
selectBroker:选取打分最低的broker作为bundle的owner。
这种策略的想法就是:每次负载均衡,只把最低和最高两个broker的流量平均,在没有超大bundles的情况下,基本能实现把负载从压力最大的broker卸载到最低的broker身上,且让两者负载基本相同。
优点:
·这种方式不会遇到重启broker流量不会切到新broker的问题,即没有超低负载的broker
·也不会让接收bundles的broker超载,small randomization pool问题。
·当集群整体负载都很高,也不会像OverLoadShedder那样做无意义的负载均衡。
·只要阈值设置好,shedding也不会很频繁,实践中,集群稳定运行时几乎不会发生负载均衡集群内broker的最高负载与最低负载(指cpu使用率)稳定在15%以内,对于小集群稳定在10%以内。
·而且阈值的意义很明确,不像前面的算法,阈值的设定相当麻烦,只能根据经验,而且经验值很有可能对当前集群使用效果不好。
实践效果很好,加入新broker或者下线broker时,触发负载均衡,能保证在5次以内就达到稳定状态(基本在3次以内),之后不会再触发负载均衡。
缺点:
·对于大集群而言,负载均衡总耗时会很长,因为每次只平均两个broker的负载,极端情况下,N个broker需要shedding N/2次,而默认每一分钟执行一次shedding检查,因此最长耗时N/2分钟,120个broker的大集群,极端情况可能要1个小时来完成均衡。但是我们现在的集群规模较小,10来个brokers,一般执行3、4次就能完成了。
·集群刚启动时,如果使用该方法来挑选owner broker,那么可能会导致所有bundles都绑定到同一个broker上,接下来可能要执行多次shedding才能均衡下来,发生大量的bundles位置切换,这个是不能容忍的。
因此,集群刚启动的时候,也就是bundle在跟broker第一次绑定的时候,我们不能根据这个放置策略来挑选owner broker,要采用其他的放置策略,如使用HashAssign。
其实第一个缺点也可以改进的,在Shedder的实现里,维护一个Map<bundle,broker>,doShedding的时候就把要unload的bundles记录进该Map,value是要放置的新broker。在selectBroker的时候就直接根据该Map来选择新broker。那么一次shedding就可以完成所有工作了,比如说,负载第一的和倒数第一的平均,负载第二的和倒数第二的平均,...,只要超过阈值,就执行平均,这样就可以一次完成所有的broker平均工作了。
集群刚启动时,还未执行过Shedding,bundles也还没绑定任何broker,则当该Map里没有以该bundle为key的映射时,就使用HashAssign,或者随机分配。这样就处理好前面两个缺点了。
AvgShedder的实现已经提交PR到社区,对应还有一个PIP,详细分析了当前负载均衡策略。
[improve] [broker] AvgShedder by thetumbled · Pull Request #18186 · apache/pulsar (github.com)
PIP-217: LoadShedding Strategy Improment · Issue #18173 · apache/pulsar (github.com)
但是,社区方面计划是做出更大的改动,想要重构整个负载均衡器,比如说bundle切换时,先在内部把bundle切换好,然后再把客户端的连接断开,这样就可以降低不可用时间了;还有把ZK的负载数据迁移到内部的系统topic里,降低ZK的负载;还有把中心化决策的负载均衡器改成分布式决策的,即把部分工作分摊给非leader broker,等等。这些feature的工作量不小,就交给社区来完成了,而我们设计实现AvgShedder,就能很好地满足我们的需要了。
至于我PR、issue里提出的一些想法与分析,也采纳到了新负载均衡器里的新shedder。
PIP-220: TransferShedder (Only for PIP-192 New Broker Load Manager ) · Issue #18215 · apache/pulsar (github.com)
PIP-192: New Pulsar Broker Load Balancer · Issue #16691 · apache/pulsar (github.com)