SOFAJRaft 注册中心-心跳检测实现

news2025/1/22 15:52:59

文章目录

    • 1.前言
    • 2.心跳流程图
      • 整体流程
      • 心跳续约&心跳检测
    • 3.实现步骤
      • 3.1 客户端
      • 3.2 服务端
        • 3.2.1 HeartBeatRpcProcessor
        • 3.2.2 HeartBeatRequestHandler
        • 3.2.3 ServiceDiscoveryRequestHandlerFactory 新增 onBeat 方法
        • 3.2.4 ServiceDiscoveryCheckBeatThread 心跳检测线程
        • 3.2.5 ServiceDiscoveryRequestHandlerFactory 新增 checkBeat 方法
        • 3.2.6 ServiceDiscoveryServer 服务端启动时启动心跳检测线程
    • 4.测试

1.前言

  • 在上篇文章 基于 SOFAJRaft 实现注册中心_不懂的浪漫的博客-CSDN博客 学习之后,我们了解了如何基于 JRaft 来实现一个注册中心,了解了 JRaft 的编程模式。
  • 按照官网的步骤和示例,一步一步完善代码即可。
  • 本文主要是完善注册中心的心跳功能。

ps:对之前的文章中的一些代码缺陷也进行了修正。

本文完整代码地址:https://github.com/huajiexiewenfeng/eval-discovery

2.心跳流程图

整体流程

image.png
整个流程可以分为三个子流程

  • 服务注册:client 发送 register 注册请求到服务端
    • 由 RegistrationRpcProcessor 来进行处理
      • 触发 ServiceDiscoveryStateMachine 状态机的 onApply 方法
        • 底层采用 ServiceDiscoveryRequestHandlerFactory#storage 来存储注册信息
  • 发送心跳:在 client 发送 register 请求的同时,会启动一个心跳线程
    • 该线程每 5 秒发送一个 beat 心跳请求到服务端
      • 由 HeartBeatRpcProcessor 来进行处理
        • 触发 ServiceDiscoveryStateMachine 状态机的 onApply 方法
          • 底层 ServiceDiscoveryRequestHandlerFactory#onBeat 处理心跳
  • 心跳检测:注册中心服务端启动时,会启动一个心跳检测线程,每 5 秒检测实例存活情况
    • 底层 ServiceDiscoveryRequestHandlerFactory#checkBeat

心跳续约&心跳检测

image.png
心跳续约

  • 每次发送心跳请求,更新服务器侧心跳服务实例 Map 集合
  • 数据结构:Map<String, Map<String, Instant>>
    • serviceName
      • serviceId:当前时间 now
  • 每次续约都会更新当前时间 now

心跳检测

  • 遍历 心跳服务实例 Map 集合
  • 获取实例续约信息
  • 与当前时间对比,如果时间>过期时间
  • 删除实例信息

3.实现步骤

3.1 客户端

我们先改造 JRaftServiceDiscovery,在 register 方法中增加心跳线程。改造之后核心代码如下:

public class JRaftServiceDiscovery implements ServiceDiscovery {
  ...
  @Override
  public void register(ServiceInstance serviceInstance) {
    logger.info(" register serviceInstance,{}", serviceInstance);
    // 调用 RPC
    ServiceDiscoveryOuter.Registration registration = buildRegistration(serviceInstance, false);
    try {
      serviceDiscoveryClient.invoke(registration);
    } catch (Throwable e) {
      e.printStackTrace();
    }
    // 注册成功后,启动心跳线程服务
    ServiceDiscoveryHeartBeatThread beatThread = new ServiceDiscoveryHeartBeatThread(serviceDiscoveryClient,serviceInstance);
    beatThread.setDaemon(true);
    beatThread.start();
  }
  ...
}

ServiceDiscoveryHeartBeatThread

  • 心跳的核心逻辑与 register 类似
    • 封装 HeartBeat 对象
    • 发送 RPC 请求
public class ServiceDiscoveryHeartBeatThread extends Thread {

  private final ServiceDiscoveryClient serviceDiscoveryClient;
  private final ServiceInstance serviceInstance;

  private static final Logger logger = LoggerFactory
      .getLogger(ServiceDiscoveryHeartBeatThread.class);

  public ServiceDiscoveryHeartBeatThread(ServiceDiscoveryClient serviceDiscoveryClient,
      ServiceInstance serviceInstance) {
    super("client-service-instance-beat");
    this.serviceDiscoveryClient = serviceDiscoveryClient;
    this.serviceInstance = serviceInstance;
  }

  @Override
  public void run() {
    while (true) {
      // 调用 RPC
      ServiceDiscoveryOuter.HeartBeat heartBeat = buildHeartBeat(serviceInstance);
      try {
        serviceDiscoveryClient.invoke(heartBeat);
        Thread.sleep(5000);
      } catch (Throwable e) {
        logger.error("Fail to send heartbeat for a service instance : " + serviceInstance, e);
      }
    }
  }

  private HeartBeat buildHeartBeat(ServiceInstance serviceInstance) {
    return HeartBeat.newBuilder()
        .setHost(serviceInstance.getHost())
        .setId(serviceInstance.getId())
        .setPort(serviceInstance.getPort())
        .setServiceName(serviceInstance.getServiceName())
        .build();
  }
}

3.2 服务端

3.2.1 HeartBeatRpcProcessor

先完善 HeartBeatRpcProcessor,此类是请求到服务端,经过的处理类,参考 RegistrationRpcProcessor 的代码即可,大部分代码一致

public class HeartBeatRpcProcessor implements RpcProcessor<ServiceDiscoveryOuter.HeartBeat> {
  ...
  @Override
  public void handleRequest(RpcContext rpcContext, HeartBeat heartBeat) {
    ServiceInstance serviceInstance = convertServiceInstance(heartBeat);

    String serviceName = heartBeat.getServiceName();

    final Kind kind = Kind.BEAT;

    ServiceDiscoveryOperation op = new ServiceDiscoveryOperation(kind, serviceInstance);

    final ServiceDiscoveryClosure closure = new ServiceDiscoveryClosure(op) {
      @Override
      public void run(Status status) {
        if (!status.isOk()) {
          logger.warn("Closure status is : {} at the {}", status, rpcProcessorService.getNode());
          return;
        }
        rpcContext.sendResponse(response(status));
        logger.info("'{}' has been handled ,serviceName : '{}' , result : {} , status : {}",
            kind, serviceName, getResult(), status);
      }
    };

    this.rpcProcessorService.applyOperation(closure);
  }
  ...
}
3.2.2 HeartBeatRequestHandler

此类为 ServiceDiscoveryStateMachine#onApply 中处理消息处理类,我们继续完善

public class HeartBeatRequestHandler implements ServiceDiscoveryRequestHandler {

  private static final Logger logger = LoggerFactory.getLogger(HeartBeatRequestHandler.class);

  private ServiceDiscoveryRequestHandlerFactory factory;

  public HeartBeatRequestHandler(
      ServiceDiscoveryRequestHandlerFactory factory) {
    this.factory = factory;
  }

  @Override
  public void doHandle(ServiceDiscoveryClosure closure, ServiceInstance serviceInstance) {
    if (null == serviceInstance) {
      return;
    }
    factory.onBeat(serviceInstance);
    logger.info("{} has been renewed at the node", serviceInstance);
  }
}
3.2.3 ServiceDiscoveryRequestHandlerFactory 新增 onBeat 方法
ublic class ServiceDiscoveryRequestHandlerFactory {
  ...
  private final Object monitor = new Object();

  /**
   * 服务名称与服务实例列表(List)映射 serverName:<serverId:instance>,使用 serverId 来做去重
   */
  private final Map<String, Map<String, ServiceInstance>> serviceNameToInstancesStorage = new ConcurrentHashMap<>();

  /**
   * 服务名称与服务实例列表(List+心跳时间)映射 serverName:<serverId:now>
   */
  private final Map<String, Map<String, Instant>> serviceNameToInstantsMap = new ConcurrentHashMap<>();
  ...
  public void onBeat(ServiceInstance serviceInstance) {
    String serviceName = serviceInstance.getServiceName();
    String id = serviceInstance.getId();
    synchronized (monitor) {
      Map<String, ServiceInstance> serviceInstancesMap = serviceNameToInstancesStorage
          .computeIfAbsent(serviceName, n -> new LinkedHashMap<>());
      if (!serviceInstancesMap.containsKey(id)) {
        //无效心跳请求
        logger.info("{} beat is invalid", serviceInstance);
        return;
      }
      Map<String, Instant> instantMap = serviceNameToInstantsMap
          .computeIfAbsent(serviceName, n -> new LinkedHashMap<>());
      // 续约
      instantMap.put(id, Instant.now());
    }
  }
}
3.2.4 ServiceDiscoveryCheckBeatThread 心跳检测线程
public class ServiceDiscoveryCheckBeatThread extends Thread {

  private final ServiceDiscoveryRequestHandlerFactory factory;
  private final ServiceDiscoveryStateMachine stateMachine;

  private static final Logger logger = LoggerFactory
      .getLogger(ServiceDiscoveryCheckBeatThread.class);

  public ServiceDiscoveryCheckBeatThread(ServiceDiscoveryRequestHandlerFactory factory,
      ServiceDiscoveryStateMachine stateMachine) {
    super("service-instance-beat-check");
    this.factory = factory;
    this.stateMachine = stateMachine;
  }

  @Override
  public void run() {
    while (true) {
      try {
        if (stateMachine.isLeader()) {
          factory.checkBeat();
        }
        Thread.sleep(5000);
      } catch (Throwable e) {
        logger.error("error on check beat", e);
      }
    }
  }

}
3.2.5 ServiceDiscoveryRequestHandlerFactory 新增 checkBeat 方法
  public void checkBeat() {
    final Instant now = Instant.now();
    synchronized (monitor) {
      // 遍历所有服务实例集合
      for (Map.Entry<String, Map<String, ServiceInstance>> serviceInstanceMap : this.serviceNameToInstancesStorage
          .entrySet()) {
        String serviceName = serviceInstanceMap.getKey();
        // 获取当前服务实例对应的服务续约时间集合
        Map<String, Instant> instantMap = this.serviceNameToInstantsMap.get(serviceName);
        if (CollectionUtils.isEmpty(instantMap)) {
          // 移除当前服务的所有实例
          serviceInstanceMap.getValue().clear();
          continue;
        }
        for (Map.Entry<String, Instant> instantService : instantMap.entrySet()) {
          if (instantService.getValue().plus(expired, ChronoUnit.SECONDS).isBefore(now)) {
            //超过30s没有收到心跳
            logger.info(
                "The current instance [{}] has not received a heartbeat request for more than 30 seconds",
                instantService);
            removeInstance(serviceName, instantService.getKey());
          }
        }
      }
    }
  }

如果30s没有收到心跳,则移除实例

  private void removeInstance(String serviceName, String id) {
    Map<String, ServiceInstance> serviceInstancesMap = getServiceInstancesMap(serviceName);
    ServiceInstance serviceInstance = serviceInstancesMap.get(id);
    if (null != serviceInstance) {
      // 发送注销消息同步到followers
      logger.info("send DeRegistration {}", serviceInstance);
      sendDeRegistrationRpc(serviceInstance);
    }
  }

  private void sendDeRegistrationRpc(ServiceInstance serviceInstance) {
    String serviceName = serviceInstance.getServiceName();

    final Kind kind = Kind.DEREGISTRATION;

    ServiceDiscoveryOperation op = new ServiceDiscoveryOperation(kind, serviceInstance);

    final ServiceDiscoveryClosure closure = new ServiceDiscoveryClosure(op) {
      @Override
      public void run(Status status) {
        if (!status.isOk()) {
          logger.warn("Closure status is : {} at the {}", status, rpcProcessorService.getNode());
          return;
        }
        logger.info("'{}' has been handled ,serviceName : '{}' , result : {} , status : {}",
            kind, serviceName, getResult(), status);
      }
    };

    this.rpcProcessorService.applyOperation(closure);
  }
3.2.6 ServiceDiscoveryServer 服务端启动时启动心跳检测线程
public class ServiceDiscoveryServer {

  private RaftGroupService raftGroupService;
  private Node node;
  private ServiceDiscoveryStateMachine fsm;

  public ServiceDiscoveryServer(final String dataPath, final String groupId, final PeerId serverId,
      final NodeOptions nodeOptions) throws IOException {
    ...
    this.raftGroupService = new RaftGroupService(groupId, serverId, nodeOptions, rpcServer);
    // start raft node
    this.node = this.raftGroupService.start();
    ServiceDiscoveryRequestHandlerFactory instanceFactory = ServiceDiscoveryRequestHandlerFactory
        .getInstance();
    instanceFactory.init();
    instanceFactory.setRpcProcessorService(rpcProcessorService);
    // 启动心跳检测线程
    ServiceDiscoveryCheckBeatThread beatThread = new ServiceDiscoveryCheckBeatThread(
        instanceFactory, fsm);
    beatThread.setDaemon(true);
    beatThread.start();
  }
  ...
}

4.测试

按照前一篇文章 基于 SOFAJRaft 实现注册中心_不懂的浪漫的博客-CSDN博客 的方式分别启动

  • ServiceDiscoveryServer1 - 注册中心服务器1
  • ServiceDiscoveryServer2 - 注册中心服务器2
  • ServiceDiscoveryServer3 - 注册中心服务器3
  • MyApplication - Spring Web 应用

启动完成之后,观察三个服务端日志打印情况。
可以看到 Beat 心跳请求是每 5 秒打印一次
image.png
关闭 MyApplication ,模拟心跳超时(服务下线)情况
Leader 节点日志:
image.png
Follower 节点日志
image.png

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1084792.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

开发一个npm组件包(2)

通过vueelement 原来后台 开发npm包的时候 会遇到一下几个问题 入口文件变化为package/index 需要再配置打包方法 package.json下 "scripts": {"package": "vue-cli-service build --target lib ./src/package/index.js --name managerpage --dest…

scsi READ CAPACITY (10)命令总结

READ CAPACITY (10)概述&#xff1a; READ CAPACITY(10)命令(参见表119)请求设备服务器将描述直接访问块设备的容量和介质格式的8字节参数数据传输到数据缓存中。这个命令可以被处理&#xff0c;就好像它有一个HEAD OF QUEUE任务属性。 如果逻辑单元支持保护信息&#xff0c;应…

Java使用opencv实现人脸识别、人脸比对

1. opencv概述 OpenCV是一个开源的计算机视觉库&#xff0c;它提供了一系列丰富的图像处理和计算机视觉算法&#xff0c;包括图像读取、显示、滤波、特征检测、目标跟踪等功能。 opencv官网&#xff1a;https://opencv.org/ 2. 安装opencv 2.1 下载opencv opencv下载&#x…

idea中父工程Project创建

1.file-->new-->Project 2.选择maven包和JavaSDK 3.填写项目名&#xff0c;选择文件目录&#xff0c;项目包等 4.配置maven tip&#xff1a;约定>配置>编码 5.设置项目编码 6.注解生效激活&#xff0c;便于项目中使用注解 7.Java编译版本选择8 8.File Type 过滤&a…

Java idea查看自定义注解的调用地方

Java idea查看自定义注解的调用地方

RunnerGo UI自动化测试功能使用体验

首先需要进入官网&#xff0c;RunnerGo支持开源&#xff0c;可以自行下载安装&#xff0c;也可以点击右上角体验企业版按钮快速体验 点击体验企业版进入工作台后可以点击页面上方的UI自动化 进入到测试页面 创建元素 我们可以在元素管理中创建我们测试时需要的元素 这里我们以…

【低代码表单设计器】:创造高效率的流程化办公!

当前&#xff0c;有不少用户朋友对低代码表单设计器挺感兴趣。其实&#xff0c;如果想要实现提质增效的办公效率&#xff0c;创造一个流程化办公&#xff0c;那么确实可以了解低代码技术平台。流辰信息作为服务商&#xff0c;拥有较强的自主研发能力&#xff0c;根据市场的变化…

CANoe-如何实现27服务解锁

27服务解锁的工作原理可以在文章《诊断27服务介绍》查看,这里简单介绍下流程: Tester向ECU发送27 01诊断请求请求种子seed,ECU收到该请求后随机生成一个seed,通过67 01诊断响应发送给Tester。Tester收到该诊断响应后取出seed值,传入和ECU相同的算法后生成一个密钥keyT。然…

[C++11]花括号{}、initializer_list、auto、decltype

文章目录 1.花括号{ }的扩展2.initializer_list3.auto4.decltype5.容器的增加5.1array[useless]5.2forward_list[useless]5.3unordered_map/unordered_set5.4统一增加 6.知乎文章 1.花括号{ }的扩展 int main() {//C98花括号{ }支持 1.数组 2.结构体struct Point{int _x;int _…

系统韧性研究(1)| 何谓「系统韧性」?

过去十年&#xff0c;系统韧性作为一个关键问题被广泛讨论&#xff0c;在数据中心和云计算方面尤甚&#xff0c;同时它对赛博物理系统也至关重要&#xff0c;尽管该术语在该领域不太常用。大伙都希望自己的系统具有韧性&#xff0c;但这到底意味着什么&#xff1f;韧性与其他质…

“AI大模型+电子签”,下一站在哪?

大模型所带来的数据分析、训练能力&#xff0c;将使得一些厂商的数据优势被逐渐放大&#xff0c;打造自身的差异化&#xff0c;打破电子签赛道同质竞争的局面。 作者|斗斗 编辑|皮爷 出品|产业家 AI大模型爆发以来&#xff0c;参与者众多。在电子签领域&#xff0c;这个…

前端代码重复度检测

在前端开发中&#xff0c;代码的重复度是一个常见的问题。重复的代码不仅增加了代码的维护成本&#xff0c;还可能导致程序的低效运行。为了解决这个问题&#xff0c;有许多工具和技术被用来检测和消除代码重复。其中一个被广泛使用的工具就是jscpd。 jscpd简介 jscpd是一款开…

格式转换 ▏Python 实现Word转HTML

将Word转换为HTML能将文档内容发布在网页上&#xff0c;这样&#xff0c;用户就可以通过浏览器直接查看或阅读文档而无需安装特定的软件。Word转HTML对于在线发布信息、创建在线文档库以及构建交互式网页应用程序都非常有用。以下是用Python将Word转换为HTML网页的攻略&#xf…

软件测试/测试开发丨校招推荐-中控技术股份有限公司岗位开放

软件测试工程师 岗位职责 1.参与软件项目需求分析&#xff0c;根据需求制定测试方案&#xff0c;设计测试用例&#xff0c;输出测试报告&#xff1b; 2.实施软件测试&#xff0c;对产品的接口、功能、性能等方面的测试负责&#xff1b; 3.针对项目中的问题进行跟踪分析和报…

从一个咖啡机提取一个嵌入式前端应用

学习一下除C或系统外的另一种嵌入式程序编写方法&#xff0c;JavaScript用于UI的设计与串口设备的控制 设备是基于SSD202芯片&#xff0c;dispinit初始化LCD&#xff0c;mplayer为程序需要用到视频播放程序&#xff0c;感觉开发效率会比较高及用户体现会比较好&#xff0c;毕竟…

新书免费领 | 数睿数据参编的《零基础学低(无)代码》图书正式发行

由中国工业出版集团、电子工业出版社出版的《零基础学低&#xff08;无&#xff09;代码》图书现已正式对外发行&#xff0c;目前已在多个购书平台和阅读平台上线。 本书由中国软件行业协会应用软件产品云服务分会秘书长曹开彬、清华大学软件学院副研究员刘英博主编。数睿数据…

智能工厂:APS高级计划排程系统成为了制造业建设智能工厂的核心必要需求

近年来&#xff0c;中国经济受到了许多因素的影响&#xff0c;例如新冠疫情冲击和国内外经济环境的巨大变化&#xff0c;随着我国人口红利的减少和人力成本逐步的增加&#xff0c;不论是中大型或小微制造企业为了提高市场竞争力并降低生产成本&#xff0c;都纷纷开始规划建设数…

实用!生产车间管理方案(万人收藏)

生产车间怎么管理&#xff1f; 发现大多数是车间管理普遍存在的问题&#xff0c;本文总结了以下几个方面&#xff1a; 1、制度管理 &#xff08;1&#xff09;没有明确的生产计划&#xff0c;生产流程混乱 &#xff08;2&#xff09;没有准确的记录和跟踪 2、人员管理&…

MySQL分页排序注意事项

最近测试发现个bug&#xff0c;同一个列表&#xff0c;分页选择展示10条数据时和展示20条数据时&#xff0c;展示20条数据不是展示10条数据下10条数据&#xff0c;数据有所错乱&#xff0c;如下图示。 SELECTid,no,year,quarter,dept_id,dept_name,create_time FROMlist_list …

Blender:制作一个变形动画

就是一个球逐渐地变为一个立方体 首先创建一个球和一个立方体 然后把两个物体放在一起&#xff0c;放缩球&#xff0c;让球包含立方体 之后选中球&#xff0c;为其添加修改器&#xff0c;缩裹 在这里选择缩裹对象为立方体 然后在应用下拉箭头中选择“应用为形态键” 下一步选中…