目录
一、前言
二、实现Producer的负载均衡
1、负载均衡选取一条消息队列并且高可用
1.1、模拟随机递增取模消息队列数为5
1.2、模拟随机递增取模消息队列数为6
1.3、判断Broker代理是否可用
2、更新故障项维护startTimestamp字段
2.1、退避运算
2.2、更新故障项维护startTimestamp字段
3、总结
一、前言
Producer端在发送消息的时候,会先根据主题Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具体的容错策略均在MQFaultStrategy这个类中定义。这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available的Broker代理。所谓的"latencyFaultTolerance",是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550L ms,就退避30000L ms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。
二、实现Producer的负载均衡
上一节我们讲解到了生产者消息发送与重试的那块逻辑,也提到了容错策略,本篇文章我们先回顾一下以便接下来的内容分析。更新故障项updateFaultItem()方法中接收的数据为Broker代理的服务名、第二个入参数据就是发送消息远程调用到响应成功或失败所花的时间、第3个是boolean类型(成功或者中断异常情况下为false、其它情况下为true)
1、负载均衡选取一条消息队列并且高可用
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
// 随机递增计算索引
int index = tpInfo.getSendWhichQueue().incrementAndGet();
// 轮询,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,可以保证每次选择的不一样
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// 取模
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 判断broker代理是否可用,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在
if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
return mq;
}
// 由于更新故障项,可能导致上面判断broker代理都不可用的情况,但不是绝对的,这里选取一个broker
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
// 同样是采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
该方法的作用就是负载均衡选取一条高可用的消息队列。
- sendLatencyFaultEnable开关打开,采用轮询的方式;采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,可以保证每次选择的不一样,即负载均衡;判断broker代理是否可用,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在
- 由于更新故障项,可能导致上面判断broker代理都不可用的情况,但不是绝对的,这里选取一个broker;同样是采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息
1.1、模拟随机递增取模消息队列数为5
public static void main(String[] args) {
ThreadLocalIndex threadLocalIndex = new ThreadLocalIndex();
for (int i = 0; i < 10; i++) {
int incrementAndGet = threadLocalIndex.incrementAndGet();
// 模拟消息队列数为5
int pos1 = Math.abs(incrementAndGet++)%5;
System.out.println("第 " + i + " 为:" + incrementAndGet + " 5选取消息队列为:" + pos1);
}
}
实验结果:
实验结论:
由实验结果可以看出随机递增取模的方式选取队列,每次选取都是不同的消息队列,这就是负载均衡,也是轮询。
1.2、模拟随机递增取模消息队列数为6
public static void main(String[] args) {
ThreadLocalIndex threadLocalIndex = new ThreadLocalIndex();
for (int i = 0; i < 10; i++) {
int incrementAndGet = threadLocalIndex.incrementAndGet();
// 模拟消息队列数为6
int pos2 = Math.abs(incrementAndGet++)%6;
System.out.println("第 " + i + " 为:" + incrementAndGet + " 6选取消息队列为:" + pos2);
}
}
实验结果:
实验结论:
同样由实验结果可以看出随机递增取模的方式选取队列,每次选取都是不同的消息队列,这就是负载均衡,也采用了轮询。虽然说这样可以保证在上次发送成功或失败的情况下不再选取同一条消息队列,即多负载了或认为其不可用,但是你仅通过轮询以及随机递增取模是不可以保证下一条消息队列就是可用的。故判断broker代理是否可用,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。
1.3、判断Broker代理是否可用
@Override
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
/**
* 故障项
*/
class FaultItem implements Comparable<FaultItem> {
/**
* brokerName
*/
private final String name;
/**
* 这次发送消息到出现异常的时间
*/
private volatile long currentLatency;
/**
* 在这个时间点以前,这个brokerName都会标记为可能存在故障
*/
private volatile long startTimestamp;
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
}
- 判断brokerName对应下Broker代理是否可用,不是绝对的。
- startTimestamp是LatencyFaultToleranceImpl的内部类故障项FaultItem中维护的字段,作用是在这个时间点以前,这个brokerName对应下Broker代理都会标记为可能存在故障,即不可用。
- 为了避免选择故障项,startTimestamp需要通过setter更新维护,而故障项如果恢复的话,你也要更新维护。那么维护它的时机是在哪里呢?前面有提及了更新故障项,那么是否也在该时机执行呢?
2、更新故障项维护startTimestamp字段
public class MQFaultStrategy {
private final static InternalLogger log = ClientLogger.getLog();
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
private boolean sendLatencyFaultEnable = false;
/**
* 潜伏期最大值
*/
private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};
/**
* 不可用持续时间
*/
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
/**
* 该方法在发送消息成功或失败后执行
*
* @param brokerName 代理服务器服务名
* @param currentLatency 当前等待时间 = 执行远程调用完毕时间 - 开始时间
* @param isolation isolation
*/
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
// 计算不可用时间
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
// 延迟错误误差
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
latencyMax与notAvailableDuration是一一对应关系:latencyMax中数值越小即整个请求处理成功或失败所花的时间越少,越少那么往往都是成功的,故对应notAvailableDuration的值越小甚至0,以便isAvailable()判断Broker代理是否可用不过滤;相反,latencyMax中数值越大,那么故障的可能性越大,这时就尽量不再选择它,退避处理。
2.1、退避运算
private long computeNotAvailableDuration(final long currentLatency) {
// 倒序,一旦发送失败,那么尽最大可能避免再次选择发送失败的服务器
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
这里逻辑就是退避运算:这里数组索引是倒序的,一旦发送失败,那么尽最大可能避免再次选择发送失败的服务器。感兴趣的读者可以自己去模拟下实验,拿几组数据自己去探究探究。
2.2、更新故障项维护startTimestamp字段
@Override
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
// 根据代理服务器服务名brokerName,从本地缓存获取故障项
FaultItem old = this.faultItemTable.get(name);
// 没有新建再缓存
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
//
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
// 存在则直接更新
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
根据代理服务器服务名brokerName,从本地缓存获取故障项,没有新建再缓存;有则直接setter关系维护startTimestamp字段
3、总结
本篇文章涉及到的重点就是负载均衡的算法实现以及退避运算中涉及的设计思想等:采用轮询的方式;采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,可以保证每次选择的不一样,即负载均衡;判断broker代理是否可用,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在,内部两个二维数组字段参与到退避运算中,计算出的结果维护到startTimestamp字段,以便判断broker代理是否可用、达到高可用目的。