1.设计
1.1 背景
系统启动后,所有任务都在被执行,如果这时某个节点宕机,那它负责的任务就不能执行了,这对有稳定性要求的任务是不能接受的,所以系统要实现rebalance的功能。
1.2 设计
下面是Job分配与执行的业务点,重分配就是在 follower下线、controller下线、节点新上线进行重分配。理清楚接下来实现就是水到渠成了
2. 实现
2.1 RebalanceJobType
定义了重平衡job的类型
public enum RebalanceJobType {
FOLLOWER_OFFLINE(0), CONTROLLER_OFFLINE(1), NODE_ONLINE(2);
private int code;
RebalanceJobType(int code) {
this.code = code;
}
public boolean isFollowerOffline() {
return this.code == FOLLOWER_OFFLINE.code;
}
public boolean isControllerOffline() {
return this.code == CONTROLLER_OFFLINE.code;
}
public boolean isNodeOnline() {
return this.code == NODE_ONLINE.code;
}
}
2.2 AverageJobAllotStrategy
添加了 rebalanceJob的方法,只有Controller才能调用,对不同的重平衡情况进行分别处理
private Map<Long, List<DttaskJob>> getDttaskJobMap() {
List<DttaskJob> allDttaskJob = getAllDttaskJob();
return average(allDttaskJob);
}
@Override
public void rebalanceJob(RebalanceJobContext rebalanceJobContext) {
if (rebalanceJobContext.getType().isFollowerOffline()
|| rebalanceJobContext.getType().isControllerOffline()) {
long offlineServerId = rebalanceJobContext.getServerId();
log.info("{}节点={}下线->重平衡job={}",
rebalanceJobContext.getType().isFollowerOffline() ? "follower" : "controller",
offlineServerId,
rebalanceJobContext);
List<DttaskJob> dttaskJobs = getByDttaskId(offlineServerId);
List<NodeInfo> nodeInfoList = ServerInfo.getNodeInfoList();
Map<Long, List<DttaskJob>> allotMap = new HashMap<>();
int i = 0;
int nodeCount = nodeInfoList.size();
while (i < dttaskJobs.size()) {
DttaskJob dttaskJob = dttaskJobs.get(i);
NodeInfo nodeInfo = nodeInfoList.get(i % nodeCount);
i++;
List<DttaskJob> dttaskJobList = allotMap.getOrDefault(nodeInfo.getServerId(), new ArrayList<>());
dttaskJobList.add(dttaskJob);
allotMap.put(nodeInfo.getServerId(), dttaskJobList);
}
executeDttaskJob(new ExecuteDttaskJobContext(allotMap, true));
} else if (rebalanceJobContext.getType().isNodeOnline()) {
log.info("节点上线->重平衡job={}", rebalanceJobContext);
long onlineServerId = rebalanceJobContext.getServerId();
Map<Long, List<DttaskJob>> dttaskJobMap = BeanUseHelper.entityHelpService().queryDttaskJob();
Map<Long, List<DttaskJob>> allotDttaskJobMap = getDttaskJobMap();
Map<Long, List<DttaskJob>> stopDttaskJobMapOfOldNodes = new HashMap<>();
Map<Long, List<DttaskJob>> startDttaskJobMapOfNewNodes = new HashMap<>();
List<DttaskJob> startDttaskJobs = new ArrayList<>();
dttaskJobMap.forEach((serverId, dttaskJobList) -> {
int size = dttaskJobList.size();
int newSize = allotDttaskJobMap.get(serverId).size();
if (size > newSize) {
List<DttaskJob> dttaskJobs = dttaskJobList.subList(0, size - newSize);
stopDttaskJobMapOfOldNodes.put(serverId, dttaskJobs);
startDttaskJobs.addAll(dttaskJobs);
}
});
startDttaskJobMapOfNewNodes.put(onlineServerId, startDttaskJobs);
executeDttaskJob(new ExecuteDttaskJobContext(stopDttaskJobMapOfOldNodes, false));
executeDttaskJob(new ExecuteDttaskJobContext(startDttaskJobMapOfNewNodes, true));
}
}
2.3 ServerClientChannelHandler
对节点下线进行重平衡处理
2.4 NodeOnlineMessageService
3. 测试
启动三个节点,节点完成选举,每个节点执行2个任务
- 3号节点下线
1 2 节点各分配了一个任务继续执行
- 3号节点上线
新上线的3号节点,重新得到2个任务,1 2节点各停止一个任务
至此,节点上下线的任务重平衡完成