Java在SpringCloud中自定义Gateway负载均衡策略

news2025/1/13 0:23:31

Java在SpringCloud中自定义Gateway负载均衡策略

一、前言

spring-cloud-starter-netflix-ribbon已经不再更新了,最新版本是2.2.10.RELEASE,最后更新时间是2021年11月18日,详细信息可以看maven官方仓库:org.springframework.cloud/spring-cloud-starter-netflix-ribbon,SpringCloud官方推荐使用spring-cloud-starter-loadbalancer进行负载均衡。

背景:大文件上传做切片文件上传;

流程:将切片文件上传到服务器,然后进行合并任务,合并完成之后上传到对象存储;现在服务搞成多节点以后,网关默认走轮循,但是相同的服务在不同的机器上,这样就会导致切片文件散落在不同的服务器上,会导致文件合并失败;所以根据一个标识去自定义gateway对应服务的负载均衡策略,可以解决这个问题;

我的版本如下:

<spring-boot.version>2.7.3</spring-boot.version>
        <spring-cloud.version>2021.0.4</spring-cloud.version>
        <spring-cloud-alibaba.version>2021.0.4.0</spring-cloud-alibaba.version>

二、参考默认实现

springCloud原生默认的负载均衡策略是这个类:

org.springframework.cloud.loadbalancer.core.RoundRobinLoadBalancer

我们参考这个类实现自己的负载均衡策略即可,RoundRobinLoadBalancer实现了ReactorServiceInstanceLoadBalancer这个接口,实现了choose这个方法,如下图:

在choose方法中调用了processInstanceResponse方法,processInstanceResponse方法中调用了getInstanceResponse方法,所以我们我们可以复制RoundRobinLoadBalancer整个类,只修改getInstanceResponse这个方法里的内容就可以实现自定义负载均衡策略。

三、实现代码

原理:根据请求头当中设备的唯一标识传递到下游,唯一标识做哈希取余,可以指定对应的服务器节点,需要的服务设置自定义负载策略,不需要的服务设置默认的轮循机制即可

package com.wondertek.gateway.loadBalancer;

import cn.hutool.core.util.ObjectUtil;
import com.wondertek.web.exception.enums.HttpRequestHeaderEnum;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
@Slf4j
@Component
public class RequestFilter implements GlobalFilter, Ordered {
    @Override
    public int getOrder() {
        // 应该小于LoadBalancerClientFilter的顺序值
        return Ordered.HIGHEST_PRECEDENCE;
    }
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String clientDeviceUniqueCode = request.getHeaders().getFirst(HttpRequestHeaderEnum.CLIENT_DEVICE_UNIQUE_CODE.getCode());
        // 存入Reactor上下文
        String resultCode = clientDeviceUniqueCode;
        return chain.filter(exchange)
                .contextWrite(context -> {
                    if (ObjectUtil.isNotEmpty(resultCode)) {
                        log.info("开始将request中的唯一标识封装到上下游中:{}", resultCode);
                        return context.put("identification", resultCode);
                    } else {
                        // 或者根据需求进行其他处理
                        return context;
                    }
                });
    }
}

package com.wondertek.gateway.loadBalancer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;

import java.util.*;
import java.util.concurrent.ThreadLocalRandom;

@Slf4j
public class ClientDeviceUniqueCodeInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {

    private final String serviceId;
    private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;


    public ClientDeviceUniqueCodeInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) {
        this.serviceId = serviceId;
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
    }

    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        //在 choose 方法中,使用 deferContextual 方法来访问上下文并提取客户端标识。这里的 getOrDefault 方法尝试从上下文中获取一个键为 "identification" 的值,如果不存在则返回 "default-identification"
        return Mono.deferContextual(contextView -> {
            String identification = contextView.getOrDefault("identification", "14d58a1ba286f087d9736249ec785314");
            log.info("上下游获取到的identification的值为:{}", identification);
            ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
                    .getIfAvailable(NoopServiceInstanceListSupplier::new);
            return supplier.get(request).next()
                    .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances, identification));
        });
    }

    private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier, List<ServiceInstance> serviceInstances, String identification) {
        Response<ServiceInstance> serviceInstanceResponse;
        if (Objects.isNull(identification)) {
            serviceInstanceResponse = this.getInstanceResponse(serviceInstances, null);
        } else {
            serviceInstanceResponse = this.getIpInstanceResponse(serviceInstances, identification);
        }
        if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
            ((SelectedInstanceCallback) supplier).selectedServiceInstance((ServiceInstance) serviceInstanceResponse.getServer());
        }
        return serviceInstanceResponse;
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances, String identification) {
        if (instances.isEmpty()) {
            if (log.isWarnEnabled()) {
                log.warn("No servers available for service: " + this.serviceId);
            }
            return new EmptyResponse();
        } else {
            int index = ThreadLocalRandom.current().nextInt(instances.size());
            ServiceInstance instance = (ServiceInstance) instances.get(index);
            return new DefaultResponse(instance);
        }
    }

    private Response<ServiceInstance> getIpInstanceResponse(List<ServiceInstance> instances, String identification) {
        if (instances.isEmpty()) {
            log.warn("No servers available for service: " + this.serviceId);
            return new EmptyResponse();
        } else if (instances.size() == 1) {
            log.info("只有一个服务实例,直接返回这个实例");
            return new DefaultResponse(instances.get(0));
        } else {
            //创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题
            List<ServiceInstance> sortedInstances = new ArrayList<>(instances);
            // 现在对新列表进行排序,保持原始列表的顺序不变
            Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));
            //log.info("获取到的实例个数的值为:{}", sortedInstances.size());
            sortedInstances.forEach(instance -> log.info("排序后的实例: {},{}", instance.getHost(), instance.getPort()));
            //log.info("多个服务实例,使用客户端 identification 地址的哈希值来选择服务实例");
            // 使用排序后的列表来找到实例
            int ipHashCode = Math.abs(identification.hashCode());
            //log.info("identificationHashCode的值为:{}", ipHashCode);
            int instanceIndex = ipHashCode % sortedInstances.size();
            //log.info("instanceIndex的值为:{}", instanceIndex);
            ServiceInstance instanceToReturn = sortedInstances.get(instanceIndex);
            //log.info("instanceToReturn.getUri()的值为:{}", instanceToReturn.getUri());
            log.info("自定义identification负载机制,Client identification: {} is routed to instance: {}:{}", identification, instanceToReturn.getHost(), instanceToReturn.getPort());
            return new DefaultResponse(instanceToReturn);
        }
    }

}

package com.wondertek.gateway.loadBalancer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.loadbalancer.DefaultResponse;
import org.springframework.cloud.client.loadbalancer.EmptyResponse;
import org.springframework.cloud.client.loadbalancer.Request;
import org.springframework.cloud.client.loadbalancer.Response;
import org.springframework.cloud.loadbalancer.core.NoopServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.SelectedInstanceCallback;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import reactor.core.publisher.Mono;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

@Slf4j
public class DefaultInstanceLoadBalancer implements ReactorServiceInstanceLoadBalancer {

    private final String serviceId;
    private ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
    final AtomicInteger position;

    public DefaultInstanceLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId, AtomicInteger position) {
        this.serviceId = serviceId;
        this.serviceInstanceListSupplierProvider = serviceInstanceListSupplierProvider;
        this.position = position;
    }
    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) {
        ServiceInstanceListSupplier supplier = serviceInstanceListSupplierProvider
                .getIfAvailable(NoopServiceInstanceListSupplier::new);
        return supplier.get(request).next()
                .map(serviceInstances -> processInstanceResponse(supplier, serviceInstances));
    }

    private Response<ServiceInstance> processInstanceResponse(ServiceInstanceListSupplier supplier,
                                                              List<ServiceInstance> serviceInstances) {
        Response<ServiceInstance> serviceInstanceResponse = getInstanceResponse(serviceInstances);
        if (supplier instanceof SelectedInstanceCallback && serviceInstanceResponse.hasServer()) {
            ((SelectedInstanceCallback) supplier).selectedServiceInstance(serviceInstanceResponse.getServer());
        }
        return serviceInstanceResponse;
    }

    private Response<ServiceInstance> getInstanceResponse(List<ServiceInstance> instances) {
        if (instances.isEmpty()) {
            if (log.isWarnEnabled()) {
                log.warn("No servers available for service: " + serviceId);
            }
            return new EmptyResponse();
        }
        //创建一个新的列表以避免在原始列表上排序,避免了修改共享状态可能带来的线程安全问题
        List<ServiceInstance> sortedInstances = new ArrayList<>(instances);
        // 现在对新列表进行排序,保持原始列表的顺序不变
        Collections.sort(sortedInstances, Comparator.comparing(ServiceInstance::getHost));
        //log.info("获取到的实例个数的值为:{}", sortedInstances.size());
        sortedInstances.forEach(instance -> log.info("排序后的实例: {},{}", instance.getHost(), instance.getPort()));
        int pos = Math.abs(this.position.incrementAndGet());
        //log.info("默认轮循机制,pos递加后的值为:{}", pos);
        int positionIndex = pos % instances.size();
        //log.info("取余后的positionIndex的值为:{}", positionIndex);
        ServiceInstance instance = instances.get(positionIndex);
        //log.info("instance.getUri()的值为:{}", instance.getUri());
        log.info("默认轮循机制,routed to instance: {}:{}",instance.getHost(), instance.getPort());
        return new DefaultResponse(instance);
    }

}

package com.wondertek.gateway.loadBalancer;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClient;
import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients;
import org.springframework.cloud.loadbalancer.core.ReactorServiceInstanceLoadBalancer;
import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier;
import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

import java.util.concurrent.atomic.AtomicInteger;

@Configuration
//单台服务
//@LoadBalancerClient(name = "oms-api", configuration = CustomLoadBalancerConfig.class)
//多台服务
@LoadBalancerClients({
        @LoadBalancerClient(name = "oms-api", configuration = CustomLoadBalancerConfig.class),
        @LoadBalancerClient(name = "unity-api", configuration = CustomLoadBalancerConfig.class),
        @LoadBalancerClient(name = "cloud-api", configuration = CustomLoadBalancerConfig.class),
        @LoadBalancerClient(name = "open-api", configuration = CustomLoadBalancerConfig.class),
        @LoadBalancerClient(name = "server-api", configuration = CustomLoadBalancerConfig.class),
        @LoadBalancerClient(name = "center-service", configuration = CustomLoadBalancerConfig.class),
})
@Slf4j
public class CustomLoadBalancerConfig {
    // 定义一个Bean来提供AtomicInteger的实例
    @Bean
    public AtomicInteger positionTracker() {
        // 这将在应用上下文中只初始化一次
        return new AtomicInteger(0);
    }

    //自定义优先级负载均衡器
    @Bean
    public ReactorServiceInstanceLoadBalancer customPriorityLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider,
                                                                     Environment environment,AtomicInteger positionTracker) {
        String serviceId = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        //目的为解决文件上传切片文件分散上传的问题
        if ("oms-api".equals(serviceId)||"unity-api".equals(serviceId)||"cloud-api".equals(serviceId)){
            //log.info("服务名称:serviceId:{},走自定义clientDeviceUniqueCode负载模式", serviceId);
            return new ClientDeviceUniqueCodeInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId);
        }
        //log.info("服务名称:serviceId:{},走默认负载模式", serviceId);
        return new DefaultInstanceLoadBalancer(serviceInstanceListSupplierProvider, serviceId,positionTracker);
    }
}

【SpringCloud系列】开发环境下重写Loadbalancer实现自定义负载均衡

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1341110.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

轻舟已过万重山,鸿蒙4.0程序员危机

现在是2023年末。自从华为推出的鸿蒙系统到现在已经有4年多。之前的鸿蒙系统只是基于Android套壳&#xff0c;因为这也也被无数人瞧不起&#xff0c;自从华为秋季发布会后&#xff0c;宣布鸿蒙4.0问世。不再兼容Android&#xff0c;华为做独立的系统终于打了翻身仗。 鸿蒙系统…

2024年最新软件测试必问面试题,面试前一天刷效果更佳

1.你为什么选择软件测试行业 因为之前有了解软件测试这个行业&#xff0c;觉得他的发展前景很好。 2.根据你以前的工作经验描述一下软件开发、测试过程&#xff0c;由那些角色负责&#xff0c;你做什么 要有架构师、开发经理、测试经理、程序员、测试员。我在里面主要是负责所…

Python无法拒绝的表白界面完整代码

运行时弹出界面 当点击“不要”时弹出 当点击“”时弹出 文章目录 环境需求完整代码详细分析 环境需求 python3.11.4及以上版本PyCharm Community Edition 2023.2.5pyinstaller6.2.0&#xff08;可选&#xff0c;这个库用于打包&#xff0c;使程序没有python环境也可以运行…

openGauss系新增市场份额达21.9%,跨越生态拐点

12月28日&#xff0c;以“汇聚数据库创新力量&#xff0c;打造千行万业数据基石”为主题的openGauss Summit 2023在北京举办。本次大会由国家工业信息安全发展研究中心软件所、中国软件行业协会、中国计算机学会数据库专业委员会指导&#xff0c;openGauss社区主办。 中国工程…

街道洗扫车VR虚拟仿真展示创新了培训方式

吸污车用于收集处理城市中的污水、污泥&#xff0c;起到疏通管道的作用&#xff0c;特别是洪涝灾害时是重要的清理工具。吸污车由于内部结构复杂、工艺原理繁琐且造价成本高&#xff0c;因此传统的吸污车作业培训难以达到满意效果。VR虚拟仿真技术的出现&#xff0c;给企业提供…

Leetcode—1572.矩阵对角线元素的和【简单】

2023每日刷题&#xff08;七十三&#xff09; Leetcode—1572.矩阵对角线元素的和 实现代码 class Solution { public:int diagonalSum(vector<vector<int>>& mat) {int n mat.size();if(n 1) {return mat[0][0];}int sum 0;int i 0, j n - 1;while(i &…

三维可视化智慧工地源码,数字孪生可视化大屏,微服务架构+Java+Spring Cloud +UniApp +MySql

源码技术说明 微服务架构JavaSpring Cloud UniApp MySql&#xff1b;支持多端展示&#xff08;PC端、手机端、平板端&#xff09;;数字孪生可视化大屏&#xff0c;一张图掌握项目整体情况;使用轻量化模型&#xff0c;部署三维可视化管理&#xff0c;与一线生产过程相融合&#…

线程池原理及使用

线程池继承关系 1.为什么使用线程池&#xff1f; 1.反复创建线程开销大; 2.过多线程会占用太多内存(执行任务易出现“内存溢出”); 3.加快程序响应速度; 4.合理利用CPU和内存; 5.统一管理线程; 2.创建和停止线程池 2.1.线程池参数解释 1.keppAliveTime 如果线程池当中的线程数…

【Vulnhub 靶场】【Hms?: 1】【简单】【20210728】

1、环境介绍 靶场介绍&#xff1a;https://www.vulnhub.com/entry/hms-1,728/ 靶场下载&#xff1a;https://download.vulnhub.com/hms/niveK.ova 靶场难度&#xff1a;简单 发布日期&#xff1a;2021年07月28日 文件大小&#xff1a;2.9 GB 靶场作者&#xff1a;niveK 靶场系…

使用react+vite开发项目时候,部署上线后刷新页面无法访问解决办法

说一下我这边的环境和使用的路由模式&#xff1a;vitereactBrowserRouter路由模式&#xff0c;所以如果你和我一样的话&#xff0c;可以试试我的这种解决办法&#xff0c;我是将项目打包后直接丢到服务器上的目录里面&#xff0c;然后配置nginx直接访问根目录。 我的nginx配置…

总埋怨内娱没有舞台,但打歌节目为什么没人看?

内娱又一档打歌节目停播了&#xff0c;优酷的《朝阳打歌中心》还是没能挺过2023年。 12月24日最后一期节目播出之后&#xff0c;《朝阳打歌中心第二季》就正式收官了&#xff0c;原定将录制到明年2月份却突然提前停播&#xff0c;无论是参演歌手还是观众都感到无比惊讶和不舍。…

设计高手的秘密武器:7款卓越的PPT工具推荐

1.即时设计 使用模板制作ppt可以帮助设计师节省工作时间&#xff0c;减轻工作压力。在即时设计资源广场为设计师提供各种精美的设计材料&#xff0c;可以有效地帮助设计师节省时间和高效地完成工作任务。此外&#xff0c;设计师还可以重新更改这些设计材料&#xff0c;以满足设…

vite前端项目根据不同环境切换不同的请求域名,5分钟搞定

有能力的可以直接看vite官方文档&#xff1a;环境变量和模式 | Vite 官方中文文档 简单操作步骤&#xff1a;创建不同环境的配置文件&#xff0c;在配置文件中声明VITE_开头的变量并赋值&#xff0c;然后在项目中引入这个变量并使用。 创建配置文件 一般分为开发模式和生产模…

PyTorch 进阶指南,10个必须知道的原则

PyTorch 是一种流行的深度学习框架&#xff0c;它提供了强大的工具和灵活的接口&#xff0c;使得开发者能够搭建和训练各种神经网络模型。这份指南旨在为开发者提供一些有用的原则&#xff0c;以帮助他们在PyTorch中编写高效、可维护和可扩展的代码。 如果你对 Pytorch 还处于…

【ES】es介绍

倒排索引&#xff08;Inverted Index&#xff09;和正排索引&#xff08;Forward Index&#xff09; 正排索引是一种以文档为单位的索引结构&#xff0c;它将文档中的每个单词或词组与其所在的文档进行映射关系的建立。正排索引通常用于快速检索指定文档的内容&#xff0c;可以…

华为鸿蒙(HarmonyOS)介绍

华为鸿蒙&#xff08;HarmonyOS&#xff09;介绍 华为鸿蒙&#xff08;HarmonyOS&#xff09;是一款由华为自主研发的操作系统&#xff0c;旨在为各种智能设备提供一种统一、高效、安全的解决方案。鸿蒙系统基于微内核架构&#xff0c;可以应用于多种类型的设备&#xff0c;鸿…

32阵元 MVDR和DREC DOA估计波束方向图对比

32阵元 MVDR和DREC DOA估计波束方向图对比 一、原理 MVDR原理&#xff1a;https://zhuanlan.zhihu.com/p/457528114 DREC原理&#xff08;无失真响应特征干扰相消器&#xff09;&#xff1a;http://radarst.ijournal.cn/html/2019/3/201903018.html 主要参数&#xff1a; 阵…

高效Java开发分析:JProfiler 14 for Mac中文

JProfiler允许你对运行中的Java应用程序进行实时性能分析。它提供了丰富的统计数据、图表和概览&#xff0c;帮助你了解应用程序的CPU使用情况、内存使用情况、线程活动等。你可以追踪和识别性能瓶颈&#xff0c;并快速定位问题所在。 内存分析&#xff1a;软件提供了强大的内…

中职网络安全Server2002——Web隐藏信息获取

B-2&#xff1a;Web隐藏信息获取 任务环境说明&#xff1a; 服务器场景名&#xff1a;Server2002&#xff08;关闭链接&#xff09;服务器场景用户名&#xff1a;未知 有问题需要环境加q 通过本地PC中渗透测试平台Kali使用Nmap扫描目标靶机HTTP服务子目录&#xff0c;将扫描子…

js_常用事件演示

✨前言✨ 1.如果代码对您有帮助 欢迎点赞&#x1f44d;收藏⭐哟 后面如有问题可以私信评论哟&#x1f5d2;️ 2.博主后面将持续更新哟&#x1f618;&#x1f389;文章目录 &#x1f354;一、在JavaScript中什么是事件&#xff1f;&#x1f35f;二、为什么要使用事件&#x…