springboot、springcloudnacos、netty-socketio实现im集群弹性伸缩和节点上下线监听

news2024/12/27 1:23:03

1、im-server 所有节点都注册到nacos服务中,使用nacos服务端2.4.3,客户端1.4.6,

spring-cloud-starter-alibaba-nacos-discovery版本  2021.1

2、im-listener 监听 im-server的上线和下线事件

3、springcloudalibaba  nacos监听服务上线和下线

配置文件

spring:
  redis:
    redisson:
      file:
        classpath:redisson.yaml
  application:
    # 应用名称
    name: im-listener
  cloud:
    nacos:
      discovery:
        # 服务注册地址
        server-addr: xxx.xxx.xxx.xx:xx
        namespace: cb329a1e-c20b-495a-885f-72076fc90d5f
        #心跳间隔。时间单位:毫秒。
        heart-beat-interval: 1000
        #心跳暂停。时间单位:毫秒。 即nacos服务端40秒收不到微服务客户端心跳,会将该微服务客户端注册的实例设为不健康
        heart-beat-timeout: 4000
        #Ip删除超时。时间单位:秒。即服务端90秒收不到客户端心跳,会将该微服务客户端注册的实例删除
        ip-delete-timeout: 9000
        #nacos 账号
        username: XXXXXXXXX
        # nacos 密码
        password: YYYYYYYYYY
        register-enabled: false # 注意:该服务无需注册到注册中心上,只用于获取注册中心上的服务信息就行了
    config:
      # 相同配置,本地优先
      override-none: true



server:
  port: 8080

im:
  port: 9092


代码

package com.yh.im.config;

import com.alibaba.cloud.nacos.NacosServiceManager;
import com.alibaba.fastjson.JSON;
import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.NamingService;
import com.alibaba.nacos.api.naming.listener.EventListener;
import com.alibaba.nacos.api.naming.listener.NamingEvent;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
 * nacos 客户端 服务监听变化。当服务下线和上线的时候能够收到通知
 * @Date 2023/11/27 12:00
 */
@Component
@Slf4j
public class NacosDiscoveryListener {
    private final Set<String> subscribedServices = ConcurrentHashMap.newKeySet();
    @Resource
    private NacosServiceManager nacosServiceManager;
    private static Map<String, Map<String, Boolean>> instanceHealthStatus = new ConcurrentHashMap<>();


    /**
     * 构造一个事件监听器,主要作用是监听服务实例变化
     *
     * @return EventListener
     */
    private EventListener buildEventListener() {
        return event -> {
            if (event instanceof NamingEvent) {
                NamingEvent namingEvent = (NamingEvent) event;
                log.error("服务实例变化:{}", JSON.toJSONString(namingEvent));
                String serviceName = namingEvent.getServiceName();
                if (!instanceHealthStatus.containsKey(serviceName)) {
                    ConcurrentHashMap<String, Boolean> instanceMap = new ConcurrentHashMap<>();
                    instanceHealthStatus.put(serviceName, instanceMap);
                    List<Instance> newInstance = namingEvent.getInstances();
                    newInstance.forEach(instance -> {
                        String instanceKey = instance.getIp() + ":" + instance.getPort();
                        instanceMap.put(instanceKey, instance.isHealthy());
                        log.error("服务首次上线: {} -> {}", serviceName, instanceKey);
                    });
                    return;
                }
                List<ServiceInfo> allServiceInstances = getAllServiceInstances();
                int instanceTotal = allServiceInstances.stream()
                        .mapToInt(serviceInfo -> Integer.parseInt(serviceInfo.getClusters()))
                        .sum();
                Map<String, Boolean> serviceMap = instanceHealthStatus.computeIfAbsent(serviceName, k -> new ConcurrentHashMap<>());
                Set<String> oldInstanceKeys = new HashSet<>(serviceMap.keySet());
                List<Instance> newInstance = namingEvent.getInstances();
                Set<String> newInstanceKeys = newInstance.stream()
                        .map(instance -> instance.getIp() + ":" + instance.getPort())
                        .collect(Collectors.toSet());
                int oldSize = serviceMap.size();
                int newSize = namingEvent.getInstances().size();
                // 服务实例没有增减,只是状态变化
                if (oldSize == newSize) {
                    newInstance.forEach((instance) -> {
                        String instanceKey = instance.getIp() + ":" + instance.getPort();
                        if (instance.isHealthy() != serviceMap.get(instanceKey)) {
                            if (instance.isHealthy()) {
                                log.error("服务上线: {} -> {}", serviceName, instanceKey);

                            } else {
                                log.error("服务下线: {} -> {}", serviceName, instanceKey);

                            }
                            serviceMap.put(instanceKey, instance.isHealthy());
                        }
                    });
                }
                // 下线实例
                if (oldSize > newSize) {
                    newInstanceKeys.forEach(oldInstanceKeys::remove);
                    oldInstanceKeys.forEach(instanceKey -> log.error("服务下线: {} -> {}", serviceName, instanceKey));
                } else {
                    // 上线实例
                    newInstanceKeys.removeAll(oldInstanceKeys);
                    StringBuffer noticeTitle = new StringBuffer("服务上线通知");
                    newInstanceKeys.forEach(instanceKey -> {
                        String message = String.format("[%s-%s]", serviceName, instanceKey);
                        log.info(message);
                        noticeTitle.append(message).append(",");
                    });

                }
            }
        };
    }
    /**
     * 定时获取服务列表,然后根据获取到的服务名,进行订阅,
     * nacos客户端目前不能订阅所有服务,只能手动的订阅
     * 也可以不用定时需要的时候通过getAllServiceInstances获取
     */
    @Scheduled(fixedDelay = 5000)
    public void reportServices() {
        List<String> services = null;
        try {
            Properties properties = new Properties();
            NamingService namingService = nacosServiceManager.getNamingService(properties);

            services = namingService.getServicesOfServer(1, Integer.MAX_VALUE).getData();
            services.forEach(serviceName -> {
                if (!subscribedServices.contains(serviceName)) {
                    try {
                        namingService.subscribe(serviceName, buildEventListener());
                        subscribedServices.add(serviceName);
                    } catch (NacosException e) {
                        log.error("订阅服务失败", e);
                    }
                }
            });
        } catch (NacosException e) {
            log.error("获取服务列表失败", e);
        }
    }

    /**
     * 获取所有服务实例
     * @return  服务实例列表
     */
    public List<ServiceInfo> getAllServiceInstances() {
        List<ServiceInfo> serviceInfos = new ArrayList<>();
        try {
            Properties properties = new Properties();
            NamingService namingService = nacosServiceManager.getNamingService(properties);
            List<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE).getData();
            for (String serviceName : services) {
                List<Instance> onlineInstances = namingService.selectInstances(serviceName, true);
                // 下线服务暂时不用关注
                List<Instance> offlineInstances = namingService.selectInstances(serviceName, false);
                serviceInfos.add(new ServiceInfo(serviceName, String.valueOf(onlineInstances.size())));
            }
        } catch (NacosException e) {
            e.printStackTrace();
        }
        return serviceInfos;
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2266110.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Windows脚本清理C盘缓存

方法一&#xff1a;使用power文件.ps1的文件 脚本功能 清理临时文件夹&#xff1a; 当前用户的临时文件夹&#xff08;%Temp%&#xff09;。系统临时文件夹&#xff08;C:\Windows\Temp&#xff09;。 清理 Windows 更新缓存&#xff1a; 删除 Windows 更新下载缓存&#xff0…

Type-c接口

6P Type C 接口座&#xff1a; 仅支持充电 16P 与 12P Type C 接口座&#xff1a; 支持数据传输 Type-c引脚&#xff1a; SUB1,SUB2为辅助通讯引脚&#xff0c;主要用在音视频信号传输中&#xff0c;很多DIY都用不到 CC1、CC2引脚用于连接检测&#xff0c;一般可以不用连接&am…

基于python语音启动电脑应用程序

osk模型进行输入语音转换 txt字典导航程序路径 pyttsx3引擎进行语音打印输出 关键词程序路径 import os import json import queue import sounddevice as sd from vosk import Model, KaldiRecognizer import subprocess import time import pyttsx3 import threading# 初始…

互联网视频云平台EasyDSS无人机推流直播技术如何助力野生动植物保护工作?

在当今社会&#xff0c;随着科技的飞速发展&#xff0c;无人机技术已经广泛应用于各个领域&#xff0c;为我们的生活带来了诸多便利。而在动植物保护工作中&#xff0c;无人机的应用更是为这一领域注入了新的活力。EasyDSS&#xff0c;作为一款集视频处理、分发、存储于一体的综…

垃圾分割数据集labelme格式659张1类别

数据集格式&#xff1a;labelme格式(不包含mask文件&#xff0c;仅仅包含jpg图片和对应的json文件) 图片数量(jpg文件个数)&#xff1a;659 标注数量(json文件个数)&#xff1a;659 标注类别数&#xff1a;1 标注类别名称:["garbage"] 每个类别标注的框数&#…

记我的Springboot2.6.4从集成swagger到springdoc的坎坷路~

项目背景 主要依赖及jdk信息&#xff1a; Springboot&#xff1a;2.6.4 Jdk: 1.8 最近新搭建了一套管理系统&#xff0c;前端部分没有公司的前端团队&#xff0c;自己在github上找了一个star较多使用相对也简单的框架。在这个管理系统搭建好上线之后&#xff0c;给组内的小伙…

NNDL 作业11 LSTM

习题6-4 推导LSTM网络中参数的梯度&#xff0c; 并分析其避免梯度消失的效果 先来推个实例&#xff1a; 看式子中间&#xff0c;上半部分并未有连乘项&#xff0c;而下半部分有到的连乘项&#xff0c;从这可以看出&#xff0c;LSTM能缓解梯度消失&#xff0c;梯度爆炸只是不易…

uniapp使用live-pusher实现模拟人脸识别效果

需求&#xff1a; 1、前端实现模拟用户人脸识别&#xff0c;识别成功后抓取视频流或认证的一张静态图给服务端。 2、服务端调用第三方活体认证接口&#xff0c;验证前端传递的人脸是否存在&#xff0c;把认证结果反馈给前端。 3、前端根据服务端返回的状态&#xff0c;显示在…

MySQL中Performance Schema库的详解(下)

昨天说了关于SQL语句相关的&#xff0c;今天来说说性能相关的&#xff0c;如果没有看过上篇请点传送门https://blog.csdn.net/2301_80479959/article/details/144693574?fromshareblogdetail&sharetypeblogdetail&sharerId144693574&sharereferPC&sharesource…

YOLO11全解析:从原理到实战,全流程体验下一代目标检测

前言 一、模型介绍 二、网络结构 1.主干网络&#xff08;Backbone&#xff09; 2.颈部网络&#xff08;Neck&#xff09; 3.头部网络&#xff08;Head&#xff09; 三、算法改进 1.增强的特征提取 2.优化的效率和速度 3.更高的准确性与更少的参数 4.环境适应性强 5.…

【Qt】了解和HelloWorld

目录 0.用户交互界面风格 Windows下GUI开发方案&#xff1f; 1.Qt简介 1.1 版本Qt5. 1.2搭建Qt开发环境 需要安装3个工具 安装过程 熟悉QtSDK重要工具 2.使用Qt Creator创建项目 2.1代码解释 2.2helloworld 1.图形化方式 2.代码方式 0.用户交互界面风格 1.TUI&…

原点安全再次入选信通院 2024 大数据“星河”案例

近日&#xff0c;中国信息通信研究院和中国通信标准化协会大数据技术标准推进委员会&#xff08;CCSA TC601&#xff09;共同组织开展的 2024 大数据“星河&#xff08;Galaxy&#xff09;”案例征集活动结果正式公布。由工银瑞信基金管理有限公司、北京原点数安科技有限公司联…

【MySQL初阶】--- 数据类型

Welcome to 9ilks Code World (๑•́ ₃ •̀๑) 个人主页: 9ilk (๑•́ ₃ •̀๑) 文章专栏&#xff1a; MySQL &#x1f3e0; 数据类型分类 MySQL是一套整体的对外数据存取方案,既然要存取数据,而数据有不同的类型,因此MySQL也存在不同的数据类型,有不同的用…

使用VsCode编译调试Neo4j源码

文章目录 使用VsCode编译调试Neo4j源码1 简介2 步骤1 下载源码2 依赖3 构建Neo4j4 运行5 安装VsCode扩展6 **调试** 使用VsCode编译调试Neo4j源码 1 简介 Neo4j作为领先的图数据库&#xff0c;在存储、查询上都非常值得分析学习。通过调试、日志等方法跟踪代码工作流有助于理…

从零开始构建美颜SDK:直播美颜插件的开发实践指南

很多人好奇的一个问题&#xff0c;直播APP中主播们的美颜功能是如何实现的&#xff0c;今天&#xff0c;我们将以构建一款美颜SDK为主线&#xff0c;从技术架构、功能设计到开发实践&#xff0c;为读者提供一个全面的指导。 一、美颜SDK的核心技术 要构建一款优秀的美颜SDK&a…

计算机网络习题( 第3章 物理层 第4章 数据链路层 )

第3章 物理层 一、单选题 1、下列选项中&#xff0c;不属于物理层接口规范定义范畴的是&#xff08; &#xff09;。 A、 接口形状 B、 引脚功能 C、 传输媒体 D、 信号电平 正确答案&#xff1a; C 2、在物理层接口特性中&#xff0c;用于描述完成每种功能的事件发…

云手机群控能用来做什么?

随着云手机的发展&#xff0c;云手机群控技术逐渐从小众的游戏多开工具&#xff0c;发展为涵盖多个领域的智能操作平台。不论是手游搬砖、短视频运营&#xff0c;还是账号养成等场景&#xff0c;云手机群控都展现出了强大的应用潜力。本文将为大家详细解析云手机群控的应用场景…

深度学习实验十七 优化算法比较

目录 一、优化算法的实验设定 1.1 2D可视化实验&#xff08;被优化函数为&#xff09; 1.2 简单拟合实验 二、学习率调整 2.1 AdaGrad算法 2.2 RMSprop算法 三、梯度修正估计 3.1 动量法 3.2 Adam算法 四、被优化函数变为的2D可视化 五、不同优化器的3D可视化对比 …

汽车IVI中控开发入门及进阶(43):NanoVG

NanoVG:基于OpenGL的轻量级抗锯齿2D矢量绘图库 NanoVG是一个跨平台、基于OpenGL的矢量图形渲染库。它非常轻量级,用C语言实现,代码不到5000行,非常精简地实现了一套HTML5 Canvas API,做为一个实用而有趣的工具集,用来构建可伸缩的用户界面和可视化效果。NanoVG-Library为…

【生信圆桌x教程系列】如何安装 seurat V4版本R包

生物信息分析,上云更省心; 欢迎访问 www.tebteb.cc 了解 【生信云】 一.介绍 Seurat 是一个广泛使用的 R 包&#xff0c;专门用于单细胞基因表达数据的分析与可视化。它主要被生物信息学和生物统计学领域的研究者用来处理、分析和理解单细胞 RNA 测序&#xff08;scRNA-seq&am…