【SpringCloud】 K8s的滚动更新中明明已经下掉旧Pod,还是会把流量分到了不存活的节点

news2025/1/19 3:43:57

系列文章目录


文章目录

  • 系列文章目录
  • 前言
  • 一、初步定位问题
  • 二、源码解释
    • 1.引入库
      • 核心问题代码
      • 进一步往下看【这块儿算是只是拓展了,问题其实处在上面的代码】
        • Nacos是如何实现的?
    • 如何解决
  • 总结


前言

背景:
使用了SpringCloudGateWay 和 SpringCloud 全套的组件,内网服务指之间通信和请求使用FeignClient (基于HTTP的)的一个客户端。

现况:运维已经增加了5秒的服务下线等待,并且不会让流量在打到下线的服务,但是还是有相关请求路由到了不可用的服务节点上


一、初步定位问题

org.springframework.cloud.loadbalancer.cache.LoadBalancerCacheProperties 是配置SCG从Nacos获取路由配置`表的一个缓存配置,默认是打开状态并且缓存了35秒。

由于SCG使用了LoadBanalcer请求转发 ,此配置会导致,猜测服务下线时候获取到了已经不可用的或者下线的节点会导致503、500、504或者请求超时等等…

源码断点
请添加图片描述

在这里插入图片描述

二、源码解释

1.引入库

核心问题代码

/*
 * Copyright 2012-2020 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
 
package org.springframework.cloud.loadbalancer.core;
 
import java.util.List;
 
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.cache.CacheFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
 
import org.springframework.cache.Cache;
import org.springframework.cache.CacheManager;
import org.springframework.cloud.client.ServiceInstance;
 
/**
 * A {@link ServiceInstanceListSupplier} implementation that tries retrieving
 * {@link ServiceInstance} objects from cache; if none found, retrieves instances using
 * {@link DiscoveryClientServiceInstanceListSupplier}.
 *
 * @author Spencer Gibb 大佬1
 * @author Olga Maciaszek-Sharma 大佬2
 * @since 2.2.0 2.20的版本
 */
public class CachingServiceInstanceListSupplier extends DelegatingServiceInstanceListSupplier {
 
    private static final Log log = LogFactory.getLog(CachingServiceInstanceListSupplier.class);
 
    /**
     * 缓存的名字
     */
    public static final String SERVICE_INSTANCE_CACHE_NAME = CachingServiceInstanceListSupplier.class.getSimpleName()
            + "Cache";
 
    /**
     * 缓存Flux集合
     */
    private final Flux<List<ServiceInstance>> serviceInstances;
 
    @SuppressWarnings("unchecked")
    public CachingServiceInstanceListSupplier(ServiceInstanceListSupplier delegate, CacheManager cacheManager) {
        super(delegate);//父类方法
        //在CacheFlux中寻找对应serviceName的服务,这里使用了一个CacheFlux
        //如何使用看这里:具体的API文档在这里【这个可能会在未来的版本会去掉】:https://projectreactor.io/docs/extra/release/api/reactor/cache/CacheFlux.html
        this.serviceInstances = CacheFlux.lookup(key -> {
            // TODO: configurable cache name 【这里似乎有一个TODO代码需要处理,可以指定缓存的的名字了~可能未来版本会支持】
            Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
            //缓存不存在说明缓存管理器有问题,这里一般情况不会进来,是一个为了逻辑完整的代码
            if (cache == null) {
                if (log.isErrorEnabled()) {
                    log.error("Unable to find cache: " + SERVICE_INSTANCE_CACHE_NAME);
                }
                return Mono.empty();
            }
            //找到了缓存,但是缓存为空,说明缓存不存在
            List<ServiceInstance> list = cache.get(key, List.class);
            if (list == null || list.isEmpty()) {
                return Mono.empty();
            }
            //如果存在,则Flux.just返回这个缓存列表
            return Flux.just(list).materialize().collectList();
        }, delegate.getServiceId())
                //如果换成没有命中从delegate获取一个数据来源从
                .onCacheMissResume(delegate.get().take(1))
                //写入缓存
                .andWriteWith((key, signals) -> Flux.fromIterable(signals).dematerialize().doOnNext(instances -> {
                    Cache cache = cacheManager.getCache(SERVICE_INSTANCE_CACHE_NAME);
                    if (cache == null) {
                        if (log.isErrorEnabled()) {
                            log.error("Unable to find cache for writing: " + SERVICE_INSTANCE_CACHE_NAME);
                        }
                    }
                    else {
                        cache.put(key, instances);
                    }
                }).then());
    }
 
    @Override
    public Flux<List<ServiceInstance>> get() {
        return serviceInstances;
    }
 
}

进一步往下看【这块儿算是只是拓展了,问题其实处在上面的代码】

ServiceInstanceListSupplier:在SCG的实现是 ==>DiscoveryClientServiceInstanceListSupplier:
如果需要看懂,为了更好了解 需要先了解一下WebFlux和链式调用你才能看懂下面的代码,或者你只看 delegate.getInstances()这个方法也可以
源码解释

/*
 * Copyright 2012-2020 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.cloud.loadbalancer.core;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

import org.springframework.boot.convert.DurationStyle;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.DiscoveryClient;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.core.env.Environment;

import static org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory.PROPERTY_NAME;

/**
 * A discovery-client-based {@link ServiceInstanceListSupplier} implementation.
 *
 * @author Spencer Gibb
 * @author Olga Maciaszek-Sharma
 * @author Tim Ysewyn
 * @since 2.2.0
 */
public class DiscoveryClientServiceInstanceListSupplier implements ServiceInstanceListSupplier {

	/**
	 * Property that establishes the timeout for calls to service discovery.
	 */
	public static final String SERVICE_DISCOVERY_TIMEOUT = "spring.cloud.loadbalancer.service-discovery.timeout";

	private static final Log LOG = LogFactory.getLog(DiscoveryClientServiceInstanceListSupplier.class);

	private Duration timeout = Duration.ofSeconds(30);

	private final String serviceId;

	private final Flux<List<ServiceInstance>> serviceInstances;
	
	/***
	* 重点需要看下这个类的构造方法的serviceInstances 
	* 这是一个Flux的集合
	*/
	public DiscoveryClientServiceInstanceListSupplier(DiscoveryClient delegate, Environment environment) {
		this.serviceId = environment.getProperty(PROPERTY_NAME);
		resolveTimeout(environment);
		//delegate.getInstances 这个真正往集合中添加实例的方法,不同SpringCloud规范了这个接口,不同的组件实现它就好了,其他的WebFlux的一些方法可以忽略。
		//我们处于SpringCloudGateWay中,所以SpringCloudGateWay 按照这个方式
		this.serviceInstances = Flux.defer(() -> Flux.just(delegate.getInstances(serviceId)))
				.subscribeOn(Schedulers.boundedElastic()).timeout(timeout, Flux.defer(() -> {
					logTimeout();
					return Flux.just(new ArrayList<>());
				})).onErrorResume(error -> {
					logException(error);
					return Flux.just(new ArrayList<>());
				});
	}

	public DiscoveryClientServiceInstanceListSupplier(ReactiveDiscoveryClient delegate, Environment environment) {
		this.serviceId = environment.getProperty(PROPERTY_NAME);
		resolveTimeout(environment);
		this.serviceInstances = Flux
				.defer(() -> delegate.getInstances(serviceId).collectList().flux().timeout(timeout, Flux.defer(() -> {
					logTimeout();
					return Flux.just(new ArrayList<>());
				})).onErrorResume(error -> {
					logException(error);
					return Flux.just(new ArrayList<>());
				}));
	}

	@Override
	public String getServiceId() {
		return serviceId;
	}

	@Override
	public Flux<List<ServiceInstance>> get() {
		return serviceInstances;
	}

	private void resolveTimeout(Environment environment) {
		String providedTimeout = environment.getProperty(SERVICE_DISCOVERY_TIMEOUT);
		if (providedTimeout != null) {
			timeout = DurationStyle.detectAndParse(providedTimeout);
		}
	}

	private void logTimeout() {
		if (LOG.isDebugEnabled()) {
			LOG.debug(String.format("Timeout occurred while retrieving instances for service %s."
					+ "The instances could not be retrieved during %s", serviceId, timeout));
		}
	}

	private void logException(Throwable error) {
		LOG.error(String.format("Exception occurred while retrieving instances for service %s", serviceId), error);
	}

}

Nacos是如何实现的?

其中:实际走的是Nacos为其实现的获取Nacos实例的Reactive的实现
NacosReactiveDiscoveryClient: 核心方法 loadInstancesFromNacos()【哈哈哈 一看就是中国人写的 这方法名字】

/*
 * Copyright 2013-2018 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.alibaba.cloud.nacos.discovery.reactive;

import java.util.function.Function;

import com.alibaba.cloud.nacos.discovery.NacosServiceDiscovery;
import com.alibaba.nacos.api.exception.NacosException;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;

/**
 * @author <a href="mailto:echooy.mxq@gmail.com">echooymxq</a>
 **/
public class NacosReactiveDiscoveryClient implements ReactiveDiscoveryClient {

	private static final Logger log = LoggerFactory
			.getLogger(NacosReactiveDiscoveryClient.class);

	private NacosServiceDiscovery serviceDiscovery;

	public NacosReactiveDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) {
		this.serviceDiscovery = nacosServiceDiscovery;
	}

	@Override
	public String description() {
		return "Spring Cloud Nacos Reactive Discovery Client";
	}

	@Override
	public Flux<ServiceInstance> getInstances(String serviceId) {

		return Mono.justOrEmpty(serviceId).flatMapMany(loadInstancesFromNacos())
				.subscribeOn(Schedulers.boundedElastic());
	}

	private Function<String, Publisher<ServiceInstance>> loadInstancesFromNacos() {
		return serviceId -> {
			try {
				return Flux.fromIterable(serviceDiscovery.getInstances(serviceId));
			}
			catch (NacosException e) {
				log.error("get service instance[{}] from nacos error!", serviceId, e);
				return Flux.empty();
			}
		};
	}

	@Override
	public Flux<String> getServices() {
		return Flux.defer(() -> {
			try {
				return Flux.fromIterable(serviceDiscovery.getServices());
			}
			catch (Exception e) {
				log.error("get services from nacos server fail,", e);
				return Flux.empty();
			}
		}).subscribeOn(Schedulers.boundedElastic());
	}

}

到此SPG如何从Nacos中获取服务列表就梳理完毕,同样的,服务和服务之间内网之间的调用应该也是换汤不换药


如何解决

  • 增加配置,关闭缓存,增加SCG的bootstrap.yml配置,当使用这个配置以后,就不会走上述逻辑。
spring:
  cloud:
    loadbalancer:
      cache:
        enabled: false # 是否启用缓存
  • 带来的问题

    • 不使用缓存,似乎可以解决了上述的问题,但是没有缓存似乎会对Nacos带来一定的压力。问题为甚是35s ,35s不会频繁失效不会带来”内存风暴“吗?(这块需要了解)
    • 35秒的缓存设计通常与“Refresh-ahead”策略有关。这是一种缓存预热策略,用于在数据过期之前提前刷新缓存数据,适用于那些预计在不久的将来会被频繁请求的热数据。例如,如果缓存数据的过期时间设置为60秒,刷新提前系数设置为0.5,那么在数据实际过期前的30秒(即在第35秒时),缓存就会异步刷新数据。这样做的好处是在高流量系统中,可以在下一次可能的缓存访问之前更新缓存,避免缓存失效时的突然流量峰值,从而提高系统的性能和用户体验。在实际应用中,这种策略可以确保缓存中的数据始终保持最新状态,减少因缓存失效导致的数据库压力。例如,在一些高流量的Web应用中,通过提前刷新缓存,可以避免在缓存数据过期时大量用户同时请求数据库,从而减少数据库的负载并提高响应速度。此外,缓存设计还需要考虑其他因素,如缓存大小、缓存命中率、缓存未命中率等,以确保缓存系统的整体性能和效率。不同的应用场景可能需要不同的缓存策略组合,以适应特定的读/写访问模式。例如,写密集型应用可能需要结合使用Write-Through、Write-back或Write-around策略,而读密集型应用则可能更侧重于Read-through或Refresh-ahead策略。选择合适的缓存策略对于提高系统性能和用户体验至关重要。
  • 最终解决方案
    不建议关闭这缓存,会导致Nacos压力过大,所以这边解决方案是最后修改了滚动更新时候的旧Pod的存活时间

总结

与运维人员配合修改Pod 实例的”存活窗口时间“大于缓存的35秒即可

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2224778.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++学习路线(二十五)

常见错误总结 错误1&#xff1a;对象const问题 #include <iostream>class Man { public:void walk() {std::cout << "I am walking." << std::endl;} };int main() {const Man man;man.walk();return 0; } 原因是Man man是const对象 但是调用了…

大语言模型的Scaling Law【Power Low】

NLP-大语言模型学习系列目录 一、注意力机制基础——RNN,Seq2Seq等基础知识 二、注意力机制【Self-Attention,自注意力模型】 三、Transformer图文详解【Attention is all you need】 四、大语言模型的Scaling Law【Power Low】 文章目录 NLP-大语言模型学习系列目录一、什么是…

Stable Diffusion视频插件Ebsynth Utility安装方法

一、Ebsynth Utility制作视频的优势&#xff1a; 相比其他视频制作插件&#xff0c;Ebsynth Utility生成的视频&#xff0c;画面顺滑无闪烁&#xff0c;对显存要求相对不高。渲染速度也还可以接受。其基本过程为&#xff1a; 1、将参考视频分解为单个帧&#xff0c;并同时生成…

模型训练识别手写数字(二)

模型训练识别手写数字&#xff08;一&#xff09;使用手写数字图像进行模型测试 一、生成手写数字图像 1. 导入所需库 import cv2 import numpy as np import oscv2用于计算机视觉操作。 numpy用于处理数组和图像数据。 os用于文件和目录操作。 2. 初始化画布 canvas np.z…

GitHub下载参考

1.Git下载 Git下载https://blog.csdn.net/mengxiang_/article/details/128193219 注意&#xff1a;根据电脑的系统配置选择合适的版本&#xff0c;我安装的是64.exe的版本 2.Git右键不出现问题&#xff1a; Git右键不出现https://blog.csdn.net/ling1998/article/details/1…

Java项目实战II基于微信小程序的马拉松报名系统(开发文档+数据库+源码)

目录 一、前言 二、技术介绍 三、系统实现 四、文档参考 五、核心代码 六、源码获取 全栈码农以及毕业设计实战开发&#xff0c;CSDN平台Java领域新星创作者&#xff0c;专注于大学生项目实战开发、讲解和毕业答疑辅导。获取源码联系方式请查看文末 一、前言 马拉松运动…

[SWPUCTF 2022 新生赛]py1的write up

开启靶场&#xff0c;下载附件&#xff0c;解压后得到&#xff1a; 双击exe文件&#xff0c;出现弹窗&#xff1a; 问的是异或&#xff0c;写个python文件来计算结果&#xff1a; # 获取用户输入的两个整数 num1 int(input("Enter the first number: ")) num2 int…

云渲染主要是分布式(分机)渲染,如何使用blender云渲染呢?

云渲染主要是分布式&#xff08;分机&#xff09;渲染&#xff0c;比如一个镜头同时开20-100张3090显卡的机器渲染&#xff0c;就能同时渲染20-100帧&#xff0c;渲染不仅不占用自己电脑&#xff0c;效率也将增加几十上百倍&#xff01; blender使用教程如下&#xff1a; 第一…

基于Django+python的车牌识别系统设计与实现(带文档)

项目运行 需要先安装Python的相关依赖&#xff1a;pymysql&#xff0c;Django3.2.8&#xff0c;pillow 使用pip install 安装 第一步&#xff1a;创建数据库 第二步&#xff1a;执行SQL语句&#xff0c;.sql文件&#xff0c;运行该文件中的SQL语句 第三步&#xff1a;修改源…

软件架构设计学习总结

概述&#xff1b; 如何描述软件架构&#xff1b; 架构的层次结构&#xff1b; 架构设计技能&#xff1a; 需求分析、业务架构、数据架构、应用架构、技术架构、开发架构设计&#xff1b; 层次框架设计&#xff1b; 集成与接口设计&#xff1b; 性能优化&#xff1b; 设计…

C语言程序设计:现代设计方法习题笔记《chapter5》下篇

第七题 题目分析&#xff1a;求最大最小值转换为条件判断问题&#xff0c;最大值有四种可能&#xff0c;最小值相应有三种情况&#xff0c;给出下列代码。 示例代码&#xff1a; #include <stdio.h>int main() {int num1, num2, num3, num4; // 定义四个变量来存储输入…

Linux安装部署数据库:MongoDB

Linux安装部署数据库&#xff1a;MongoDB 一、虚拟机环境说明1、安装前准备2、数据库软件3、数据库工具 二、源码安装 MongoDB1、安装配置环境2、服务启动方式3、设置开机自启 三、管理使用 MongoDB1、登录使用2、常用命令 四、安全优化 MongoDB1、创建普通用户启动服务2、编写…

机器学习 - 树结构1 - 随机森林

算法原理 随机森林由多个决策树构成&#xff0c;每棵树在训练时使用随机选择的样本和特征子集。在分类任务中&#xff0c;每棵树对新的输入样本进行分类&#xff0c;最终的分类结果由多数树的分类结果决定。这种方法可以提高预测的准确性&#xff0c;并且通过平均或投票机制减少…

【C++】动态库动态加载实例详解

动态库动态加载&#xff1a;LoadLibrary与函数地址获取详解 一、概述三、加载失败的原因及解决方案DLL文件不存在或路径不正确&#xff1a;依赖的其他DLL未找到&#xff1a;权限问题&#xff1a;DLL版本不兼容&#xff1a; 四、总结 在软件开发中&#xff0c;模块化设计是一种非…

基于Spring Boot的学生宿舍信息资源整合

3系统分析 3.1可行性分析 通过对本学生宿舍信息管理系统实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本学生宿舍信息管理系统采用Spring Boot框架&#xff0…

【C++笔记】内存管理

前言 各位读者朋友们大家好&#xff0c;上期我们讲了类和对象下的内容&#xff0c;类和对象整体的内容我们就讲完了&#xff0c;接下来我们开启新的部分内存管理的讲解。 目录 前言一. C/C内存分布二. C语言中内存管理的方式三. C内存管理方式3.1 new/delete操作内置类型3.2…

时间序列预测(九)——门控循环单元网络(GRU)

目录 一、GRU结构 二、GRU核心思想 1、更新门&#xff08;Update Gate&#xff09;&#xff1a;决定了当前时刻隐藏状态中旧状态和新候选状态的混合比例。 2、重置门&#xff08;Reset Gate&#xff09;&#xff1a;用于控制前一时刻隐藏状态对当前候选隐藏状态的影响程度。…

idea 无法输入中文 快速解决

idea在某些情况会出现无法输入中文的情况&#xff0c;我们不去深究内部原因&#xff0c;直接上解决方案&#xff1a; 1、点击菜单help->Edit Custom VM Options 2、最后一行&#xff0c;追加&#xff1a; -Drecreate.x11.input.methodtrue 、 3、重启

计算机毕业设计Java连锁超市销售与分析系统 销售数据管理 超市运营分析 数据可视化 (源码+定制+开发)

博主介绍&#xff1a; ✌我是阿龙&#xff0c;一名专注于Java技术领域的程序员&#xff0c;全网拥有10W粉丝。作为CSDN特邀作者、博客专家、新星计划导师&#xff0c;我在计算机毕业设计开发方面积累了丰富的经验。同时&#xff0c;我也是掘金、华为云、阿里云、InfoQ等平台…

10月27日

取P为A 秩1矩阵只有1个特征值为正&#xff0c;其余为1