文章目录
- 1.前言
- 2.心跳流程图
- 整体流程
- 心跳续约&心跳检测
- 3.实现步骤
- 3.1 客户端
- 3.2 服务端
- 3.2.1 HeartBeatRpcProcessor
- 3.2.2 HeartBeatRequestHandler
- 3.2.3 ServiceDiscoveryRequestHandlerFactory 新增 onBeat 方法
- 3.2.4 ServiceDiscoveryCheckBeatThread 心跳检测线程
- 3.2.5 ServiceDiscoveryRequestHandlerFactory 新增 checkBeat 方法
- 3.2.6 ServiceDiscoveryServer 服务端启动时启动心跳检测线程
- 4.测试
1.前言
- 在上篇文章 基于 SOFAJRaft 实现注册中心_不懂的浪漫的博客-CSDN博客 学习之后,我们了解了如何基于 JRaft 来实现一个注册中心,了解了 JRaft 的编程模式。
- 按照官网的步骤和示例,一步一步完善代码即可。
- 本文主要是完善注册中心的心跳功能。
ps:对之前的文章中的一些代码缺陷也进行了修正。
本文完整代码地址:https://github.com/huajiexiewenfeng/eval-discovery
2.心跳流程图
整体流程
整个流程可以分为三个子流程
- 服务注册:client 发送 register 注册请求到服务端
- 由 RegistrationRpcProcessor 来进行处理
- 触发 ServiceDiscoveryStateMachine 状态机的 onApply 方法
- 底层采用 ServiceDiscoveryRequestHandlerFactory#storage 来存储注册信息
- 触发 ServiceDiscoveryStateMachine 状态机的 onApply 方法
- 由 RegistrationRpcProcessor 来进行处理
- 发送心跳:在 client 发送 register 请求的同时,会启动一个心跳线程
- 该线程每 5 秒发送一个 beat 心跳请求到服务端
- 由 HeartBeatRpcProcessor 来进行处理
- 触发 ServiceDiscoveryStateMachine 状态机的 onApply 方法
- 底层 ServiceDiscoveryRequestHandlerFactory#onBeat 处理心跳
- 触发 ServiceDiscoveryStateMachine 状态机的 onApply 方法
- 由 HeartBeatRpcProcessor 来进行处理
- 该线程每 5 秒发送一个 beat 心跳请求到服务端
- 心跳检测:注册中心服务端启动时,会启动一个心跳检测线程,每 5 秒检测实例存活情况
- 底层 ServiceDiscoveryRequestHandlerFactory#checkBeat
心跳续约&心跳检测
心跳续约
- 每次发送心跳请求,更新服务器侧心跳服务实例 Map 集合
- 数据结构:Map<String, Map<String, Instant>>
- serviceName
- serviceId:当前时间 now
- serviceName
- 每次续约都会更新当前时间 now
心跳检测
- 遍历 心跳服务实例 Map 集合
- 获取实例续约信息
- 与当前时间对比,如果时间>过期时间
- 删除实例信息
3.实现步骤
3.1 客户端
我们先改造 JRaftServiceDiscovery,在 register 方法中增加心跳线程。改造之后核心代码如下:
public class JRaftServiceDiscovery implements ServiceDiscovery {
...
@Override
public void register(ServiceInstance serviceInstance) {
logger.info(" register serviceInstance,{}", serviceInstance);
// 调用 RPC
ServiceDiscoveryOuter.Registration registration = buildRegistration(serviceInstance, false);
try {
serviceDiscoveryClient.invoke(registration);
} catch (Throwable e) {
e.printStackTrace();
}
// 注册成功后,启动心跳线程服务
ServiceDiscoveryHeartBeatThread beatThread = new ServiceDiscoveryHeartBeatThread(serviceDiscoveryClient,serviceInstance);
beatThread.setDaemon(true);
beatThread.start();
}
...
}
ServiceDiscoveryHeartBeatThread
- 心跳的核心逻辑与 register 类似
- 封装 HeartBeat 对象
- 发送 RPC 请求
public class ServiceDiscoveryHeartBeatThread extends Thread {
private final ServiceDiscoveryClient serviceDiscoveryClient;
private final ServiceInstance serviceInstance;
private static final Logger logger = LoggerFactory
.getLogger(ServiceDiscoveryHeartBeatThread.class);
public ServiceDiscoveryHeartBeatThread(ServiceDiscoveryClient serviceDiscoveryClient,
ServiceInstance serviceInstance) {
super("client-service-instance-beat");
this.serviceDiscoveryClient = serviceDiscoveryClient;
this.serviceInstance = serviceInstance;
}
@Override
public void run() {
while (true) {
// 调用 RPC
ServiceDiscoveryOuter.HeartBeat heartBeat = buildHeartBeat(serviceInstance);
try {
serviceDiscoveryClient.invoke(heartBeat);
Thread.sleep(5000);
} catch (Throwable e) {
logger.error("Fail to send heartbeat for a service instance : " + serviceInstance, e);
}
}
}
private HeartBeat buildHeartBeat(ServiceInstance serviceInstance) {
return HeartBeat.newBuilder()
.setHost(serviceInstance.getHost())
.setId(serviceInstance.getId())
.setPort(serviceInstance.getPort())
.setServiceName(serviceInstance.getServiceName())
.build();
}
}
3.2 服务端
3.2.1 HeartBeatRpcProcessor
先完善 HeartBeatRpcProcessor,此类是请求到服务端,经过的处理类,参考 RegistrationRpcProcessor 的代码即可,大部分代码一致
public class HeartBeatRpcProcessor implements RpcProcessor<ServiceDiscoveryOuter.HeartBeat> {
...
@Override
public void handleRequest(RpcContext rpcContext, HeartBeat heartBeat) {
ServiceInstance serviceInstance = convertServiceInstance(heartBeat);
String serviceName = heartBeat.getServiceName();
final Kind kind = Kind.BEAT;
ServiceDiscoveryOperation op = new ServiceDiscoveryOperation(kind, serviceInstance);
final ServiceDiscoveryClosure closure = new ServiceDiscoveryClosure(op) {
@Override
public void run(Status status) {
if (!status.isOk()) {
logger.warn("Closure status is : {} at the {}", status, rpcProcessorService.getNode());
return;
}
rpcContext.sendResponse(response(status));
logger.info("'{}' has been handled ,serviceName : '{}' , result : {} , status : {}",
kind, serviceName, getResult(), status);
}
};
this.rpcProcessorService.applyOperation(closure);
}
...
}
3.2.2 HeartBeatRequestHandler
此类为 ServiceDiscoveryStateMachine#onApply 中处理消息处理类,我们继续完善
public class HeartBeatRequestHandler implements ServiceDiscoveryRequestHandler {
private static final Logger logger = LoggerFactory.getLogger(HeartBeatRequestHandler.class);
private ServiceDiscoveryRequestHandlerFactory factory;
public HeartBeatRequestHandler(
ServiceDiscoveryRequestHandlerFactory factory) {
this.factory = factory;
}
@Override
public void doHandle(ServiceDiscoveryClosure closure, ServiceInstance serviceInstance) {
if (null == serviceInstance) {
return;
}
factory.onBeat(serviceInstance);
logger.info("{} has been renewed at the node", serviceInstance);
}
}
3.2.3 ServiceDiscoveryRequestHandlerFactory 新增 onBeat 方法
ublic class ServiceDiscoveryRequestHandlerFactory {
...
private final Object monitor = new Object();
/**
* 服务名称与服务实例列表(List)映射 serverName:<serverId:instance>,使用 serverId 来做去重
*/
private final Map<String, Map<String, ServiceInstance>> serviceNameToInstancesStorage = new ConcurrentHashMap<>();
/**
* 服务名称与服务实例列表(List+心跳时间)映射 serverName:<serverId:now>
*/
private final Map<String, Map<String, Instant>> serviceNameToInstantsMap = new ConcurrentHashMap<>();
...
public void onBeat(ServiceInstance serviceInstance) {
String serviceName = serviceInstance.getServiceName();
String id = serviceInstance.getId();
synchronized (monitor) {
Map<String, ServiceInstance> serviceInstancesMap = serviceNameToInstancesStorage
.computeIfAbsent(serviceName, n -> new LinkedHashMap<>());
if (!serviceInstancesMap.containsKey(id)) {
//无效心跳请求
logger.info("{} beat is invalid", serviceInstance);
return;
}
Map<String, Instant> instantMap = serviceNameToInstantsMap
.computeIfAbsent(serviceName, n -> new LinkedHashMap<>());
// 续约
instantMap.put(id, Instant.now());
}
}
}
3.2.4 ServiceDiscoveryCheckBeatThread 心跳检测线程
public class ServiceDiscoveryCheckBeatThread extends Thread {
private final ServiceDiscoveryRequestHandlerFactory factory;
private final ServiceDiscoveryStateMachine stateMachine;
private static final Logger logger = LoggerFactory
.getLogger(ServiceDiscoveryCheckBeatThread.class);
public ServiceDiscoveryCheckBeatThread(ServiceDiscoveryRequestHandlerFactory factory,
ServiceDiscoveryStateMachine stateMachine) {
super("service-instance-beat-check");
this.factory = factory;
this.stateMachine = stateMachine;
}
@Override
public void run() {
while (true) {
try {
if (stateMachine.isLeader()) {
factory.checkBeat();
}
Thread.sleep(5000);
} catch (Throwable e) {
logger.error("error on check beat", e);
}
}
}
}
3.2.5 ServiceDiscoveryRequestHandlerFactory 新增 checkBeat 方法
public void checkBeat() {
final Instant now = Instant.now();
synchronized (monitor) {
// 遍历所有服务实例集合
for (Map.Entry<String, Map<String, ServiceInstance>> serviceInstanceMap : this.serviceNameToInstancesStorage
.entrySet()) {
String serviceName = serviceInstanceMap.getKey();
// 获取当前服务实例对应的服务续约时间集合
Map<String, Instant> instantMap = this.serviceNameToInstantsMap.get(serviceName);
if (CollectionUtils.isEmpty(instantMap)) {
// 移除当前服务的所有实例
serviceInstanceMap.getValue().clear();
continue;
}
for (Map.Entry<String, Instant> instantService : instantMap.entrySet()) {
if (instantService.getValue().plus(expired, ChronoUnit.SECONDS).isBefore(now)) {
//超过30s没有收到心跳
logger.info(
"The current instance [{}] has not received a heartbeat request for more than 30 seconds",
instantService);
removeInstance(serviceName, instantService.getKey());
}
}
}
}
}
如果30s没有收到心跳,则移除实例
private void removeInstance(String serviceName, String id) {
Map<String, ServiceInstance> serviceInstancesMap = getServiceInstancesMap(serviceName);
ServiceInstance serviceInstance = serviceInstancesMap.get(id);
if (null != serviceInstance) {
// 发送注销消息同步到followers
logger.info("send DeRegistration {}", serviceInstance);
sendDeRegistrationRpc(serviceInstance);
}
}
private void sendDeRegistrationRpc(ServiceInstance serviceInstance) {
String serviceName = serviceInstance.getServiceName();
final Kind kind = Kind.DEREGISTRATION;
ServiceDiscoveryOperation op = new ServiceDiscoveryOperation(kind, serviceInstance);
final ServiceDiscoveryClosure closure = new ServiceDiscoveryClosure(op) {
@Override
public void run(Status status) {
if (!status.isOk()) {
logger.warn("Closure status is : {} at the {}", status, rpcProcessorService.getNode());
return;
}
logger.info("'{}' has been handled ,serviceName : '{}' , result : {} , status : {}",
kind, serviceName, getResult(), status);
}
};
this.rpcProcessorService.applyOperation(closure);
}
3.2.6 ServiceDiscoveryServer 服务端启动时启动心跳检测线程
public class ServiceDiscoveryServer {
private RaftGroupService raftGroupService;
private Node node;
private ServiceDiscoveryStateMachine fsm;
public ServiceDiscoveryServer(final String dataPath, final String groupId, final PeerId serverId,
final NodeOptions nodeOptions) throws IOException {
...
this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
// start raft node
this.node = this.raftGroupService.start();
ServiceDiscoveryRequestHandlerFactory instanceFactory = ServiceDiscoveryRequestHandlerFactory
.getInstance();
instanceFactory.init();
instanceFactory.setRpcProcessorService(rpcProcessorService);
// 启动心跳检测线程
ServiceDiscoveryCheckBeatThread beatThread = new ServiceDiscoveryCheckBeatThread(
instanceFactory, fsm);
beatThread.setDaemon(true);
beatThread.start();
}
...
}
4.测试
按照前一篇文章 基于 SOFAJRaft 实现注册中心_不懂的浪漫的博客-CSDN博客 的方式分别启动
- ServiceDiscoveryServer1 - 注册中心服务器1
- ServiceDiscoveryServer2 - 注册中心服务器2
- ServiceDiscoveryServer3 - 注册中心服务器3
- MyApplication - Spring Web 应用
启动完成之后,观察三个服务端日志打印情况。
可以看到 Beat 心跳请求是每 5 秒打印一次
关闭 MyApplication ,模拟心跳超时(服务下线)情况
Leader 节点日志:
Follower 节点日志