【SpringCloud】Hystrix源码解析

news2025/1/17 0:57:20

e246a1dda09849a5a89535a62441565d.png

hystrix是一个微服务容错组件,提供了资源隔离、服务降级、服务熔断的功能。这一章重点分析hystrix的实现原理

1、服务降级

当服务实例所在服务器承受的压力过大或者受到网络因素影响没法及时响应请求时,请求会阻塞堆积,情况严重的话整个系统都会瘫痪,hystrix为此提供了一种容错机制:当服务实例没法及时响应请求,可以采用服务降级的方式快速失败,维持系统的稳定性

服务降级和@HystrixCommand注解绑定,查看它的源码

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface HystrixCommand {

    ...

    String fallbackMethod() default "";

}

源码提供的信息很少,要分析注解的功能得找到处理注解的类:HystrixCommandAspect

@Aspect
public class HystrixCommandAspect {
    ...
    
    // 环绕通知
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = AopUtils.getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");
        } else {
            MetaHolderFactory metaHolderFactory = (MetaHolderFactory)META_HOLDER_FACTORY_MAP.get(HystrixCommandAspect.HystrixPointcutType.of(method));
            MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
            HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
            ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

            try {
                Object result;
                if (!metaHolder.isObservable()) {
                    // 代理执行方法
                    result = CommandExecutor.execute(invokable, executionType, metaHolder);
                } else {
                    result = this.executeObservable(invokable, executionType, metaHolder);
                }

                return result;
            } catch (HystrixBadRequestException var9) {
                throw var9.getCause();
            } catch (HystrixRuntimeException var10) {
                throw this.hystrixRuntimeExceptionToThrowable(metaHolder, var10);
            }
        }
    }
}

从命名上我们能看出这是一个切面,说明服务降级是通过aop代理实现的,跟踪CommandExecutor的execute方法

调用链:
-> CommandExecutor.execute
-> castToExecutable(invokable, executionType).execute()
-> HystrixCommand.execute
-> this.queue().get()
public Future<R> queue() {

	// 获取Future对象
	final Future<R> delegate = this.toObservable().toBlocking().toFuture();
	Future<R> f = new Future<R>() {
		...

		public R get() throws InterruptedException, ExecutionException {
			return delegate.get();
		}

		public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
			return delegate.get(timeout, unit);
		}
	};
	...
}

HystrixCommand类的queue方法返回了一个Future对象,在线程任务中常用Future对象来获取任务执行完成后返回的结果。这里的Future对象是通过this.toObservable().toBlocking().toFuture()创建的,点击查看toObservable方法,它返回一个Observable对象

public Observable<R> toObservable() {
   ...
   final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        public Observable<R> call() {
              return 
((CommandState)AbstractCommand.this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED) ? Observable.never() : 
              // 传入指令执行任务
              AbstractCommand.this.applyHystrixSemantics(AbstractCommand.this);
        }
   };
   ...
	return Observable.defer(new Func0<Observable<R>>() {
		public Observable<R> call() {
                ...
                // 有订阅者订阅了才创建Observable对象
				Observable<R> hystrixObservable = 
                Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
				Observable afterCache;
				if (requestCacheEnabled && cacheKey != null) {
					HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, AbstractCommand.this);
					HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache)AbstractCommand.this.requestCache.putIfAbsent(cacheKey, toCache);
					if (fromCache != null) {
						toCache.unsubscribe();
						AbstractCommand.this.isResponseFromCache = true;
						return AbstractCommand.this.handleRequestCacheHitAndEmitValues(fromCache, AbstractCommand.this);
					}

					afterCache = toCache.toObservable();
				} else {
					afterCache = hystrixObservable;
				}

				return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook);
			...
		}
	});        
}

Observable对象的创建任务委托了给了AbstractCommand.this.applyHystrixSemantics方法

private Observable<R> applyHystrixSemantics(AbstractCommand<R> _cmd) {
	this.executionHook.onStart(_cmd);
	// 是否允许请求,判断熔断状态
	if (this.circuitBreaker.allowRequest()) {
		final TryableSemaphore executionSemaphore = this.getExecutionSemaphore();
		final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
		Action0 singleSemaphoreRelease = new Action0() {
			public void call() {
				if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
					executionSemaphore.release();
				}

			}
		};
		Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
			public void call(Throwable t) {
				AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, AbstractCommand.this.commandKey);
			}
		};
		if (executionSemaphore.tryAcquire()) {
			try {
				this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis());
				// 执行任务
				return this.executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);
			} catch (RuntimeException var7) {
				return Observable.error(var7);
			}
		} else {
			return this.handleSemaphoreRejectionViaFallback();
		}
	} else {
		// 处于熔断状态,执行备用任务
		return this.handleShortCircuitViaFallback();
	}
}

this.circuitBreaker.allowReques返回true表示没有熔断,走executeCommandAndObserve方法

private Observable<R> executeCommandAndObserve(AbstractCommand<R> _cmd) {
	...
	Observable execution;
	if ((Boolean)this.properties.executionTimeoutEnabled().get()) {
		// 添加了超时监控
		execution = this.executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));
	} else {
		execution = this.executeCommandWithSpecifiedIsolation(_cmd);
	}

   ...
	// handleFallback:不同异常状况下使用不同的处理方法
	Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
		public Observable<R> call(Throwable t) {
			Exception e = AbstractCommand.this.getExceptionFromThrowable(t);
			AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.setExecutionException(e);
			if (e instanceof RejectedExecutionException) {
				return AbstractCommand.this.handleThreadPoolRejectionViaFallback(e);
			} else if (t instanceof HystrixTimeoutException) {
				// 抛出超时异常时,做超时处理
				return AbstractCommand.this.handleTimeoutViaFallback();
			} else if (t instanceof HystrixBadRequestException) {
				return AbstractCommand.this.handleBadRequestByEmittingError(e);
			} else if (e instanceof HystrixBadRequestException) {
				AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, AbstractCommand.this.commandKey);
				return Observable.error(e);
			} else {
				return AbstractCommand.this.handleFailureViaFallback(e);
			}
		}
	};
   ...

	return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted)
		  // 调用handleFallback处理异常
		  .onErrorResumeNext(handleFallback).doOnEach(setRequestContext);
}
private static class HystrixObservableTimeoutOperator<R> implements Observable.Operator<R, R> {
        final AbstractCommand<R> originalCommand;

        public HystrixObservableTimeoutOperator(AbstractCommand<R> originalCommand) {
            this.originalCommand = originalCommand;
        }

        public Subscriber<? super R> call(final Subscriber<? super R> child) {
            final CompositeSubscription s = new CompositeSubscription();
            child.add(s);
            final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(this.originalCommand.concurrencyStrategy, new Runnable() {
                public void run() {
                    // 3.抛出超时异常
                    child.onError(new HystrixTimeoutException());
                }
            });
            HystrixTimer.TimerListener listener = new HystrixTimer.TimerListener() {
                // 1.判断是否超时
                public void tick() {
                    if (HystrixObservableTimeoutOperator.this.originalCommand.isCommandTimedOut.compareAndSet(AbstractCommand.TimedOutStatus.NOT_EXECUTED, AbstractCommand.TimedOutStatus.TIMED_OUT)) {
                        HystrixObservableTimeoutOperator.this.originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, HystrixObservableTimeoutOperator.this.originalCommand.commandKey);
                        s.unsubscribe();
                        // 2.执行超时任务
                        timeoutRunnable.run();
                    }
                }
            };
            
        }
    }

executeCommandAndObserve方法添加超时监控,如果任务执行超出限制时间会抛出超时异常,handleTimeoutViaFallback方法负责处理超时异常

private Observable<R> handleTimeoutViaFallback() {
	// 1.根据异常类型处理异常
	return this.getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
}

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, HystrixEventType eventType, final HystrixRuntimeException.FailureType failureType, final String message, final Exception originalException) {
	...
	// 获取回调观察者
	fallbackExecutionChain = this.getFallbackObservable();
	...
}    

protected final Observable<R> getFallbackObservable() {
	return Observable.defer(new Func0<Observable<R>>() {
		public Observable<R> call() {
			try {
				// 执行备用方法
				return Observable.just(HystrixCommand.this.getFallback());
			} catch (Throwable var2) {
				return Observable.error(var2);
			}
		}
	});
}

到这里终于看到了getFallback方法,它会调用注解中fallback指向的方法,快速失败返回响应结果

protected Object getFallback() {
	// 获取注解中的备用方法信息
	final CommandAction commandAction = this.getFallbackAction();
	if (commandAction != null) {
		try {
			return this.process(new AbstractHystrixCommand<Object>.Action() {
				Object execute() {
					MetaHolder metaHolder = commandAction.getMetaHolder();
					Object[] args = CommonUtils.createArgsForFallback(metaHolder, GenericCommand.this.getExecutionException());
					return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
				}
			});
		} catch (Throwable var3) {
			LOGGER.error(FallbackErrorMessageBuilder.create().append(commandAction, var3).build());
			throw new FallbackInvocationException(ExceptionUtils.unwrapCause(var3));
		}
	} else {
		return super.getFallback();
	}
}

回到AbstractCommand.this.applyHystrixSemantics方法,当this.circuitBreaker.allowReques返回true时会走正常请求,返回false时表示服务进入熔断状态,熔断状态会走getFallback方法

调用链
-> AbstractCommand.handleShortCircuitViaFallback
-> getFallbackOrThrowException
-> this.getFallbackObservable
-> GenericCommand.getFallback

2、服务熔断

服务熔断是hystrix提供的一种保护机制,当一段时间内服务响应的异常的次数过多,hystrix会让服务降级快速返回失败信息,避免累积压力造成服务崩溃。
联系上文找到circuitBreaker.allowRequest方法,该方法判断是否允许请求往下走

public boolean allowRequest() {
	// 是否强制打开熔断
	if ((Boolean)this.properties.circuitBreakerForceOpen().get()) {
		return false;
	// 是否强制关闭熔断
	} else if ((Boolean)this.properties.circuitBreakerForceClosed().get()) {
		this.isOpen();
		return true;
	} else {
		return !this.isOpen() || this.allowSingleTest();
	}
}

public boolean isOpen() {
	if (this.circuitOpen.get()) {
		return true;
	} else {
		HystrixCommandMetrics.HealthCounts health = this.metrics.getHealthCounts();
		// 请求次数是否超过单位时间内请求数阈值
		if (health.getTotalRequests() < (long)(Integer)this.properties.circuitBreakerRequestVolumeThreshold().get()) {
			return false;
		// 请求异常次数占比
		} else if (health.getErrorPercentage() < (Integer)this.properties.circuitBreakerErrorThresholdPercentage().get()) {
			return false;
		} else if (this.circuitOpen.compareAndSet(false, true)) {
			this.circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
			return true;
		} else {
			return true;
		}
	}
}

isOpen方法内有针对请求的各种量化计算,当请求异常情况过多,就会触发熔断,走服务降级

3、总结

hystrix组件会根据请求状态判断是否执行请求,当请求超时或者存在其他异常会走备用方法,当异常次数过多会进入熔断状态快速失败,避免服务累积过多压力

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1895152.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PyCharm中如何将某个文件设置为默认运行文件

之前在使用JetBrain公司的另一款软件IDEA的时候&#xff0c;如果在选中static main函数后按键altenter可以默认以后运行Main类的main函数。最近在使用PyCharm学习Python&#xff0c;既然同为一家公司的产品而且二者的风格如此之像&#xff0c;所以我怀疑PyCharm中肯定也有类似的…

【Windows】Bootstrap Studio(网页设计)软件介绍及安装步骤

软件介绍 Bootstrap Studio 是一款专为前端开发者设计的强大工具&#xff0c;主要用于快速创建现代化的响应式网页和网站。以下是它的主要特点和功能&#xff1a; 直观的界面设计 Bootstrap Studio 提供了直观的用户界面&#xff0c;使用户能够轻松拖放元素来构建网页。界面…

2024年前端面试题及答案

7、 nginx代理跨域 8、 nodejs中间件代理跨域 9、 WebSocket协议跨域 前端数据加密问题 1 一般如何处理用户敏感信息&#xff1f; 前端一般使用md5、base64加密、sha1加密&#xff0c;想要了解详情请自行百度。 前端http相关问题 1 HTTP常用状态码及其含义&#xff1f; …

【Godot4.2】用PlantUML和语雀画UML类图

概述 UML&#xff1a;统一建模语言(Unified Modeling Language,UML)是用来设计软件的可视化建模语言。PlantUML&#xff1a;是一个开源工具&#xff0c;它允许我们用文本形式来描绘和创建UML图。在VSCode中可以安装扩展来绘制&#xff0c;而在语雀的MarkDown编辑器中&#xff…

2024 年第十四届 APMCM 亚太地区大学生数学建模竞赛B题超详细解题思路+数据预处理问题一代码分享

B题 洪水灾害的数据分析与预测 亚太中文赛事本次报名队伍约3000队&#xff0c;竞赛规模体量大致相当于2024年认证杯&#xff0c;1/3个妈杯&#xff0c;1/10个国赛。赛题难度大致相当于0.6个国赛&#xff0c;0.8个妈杯。该比例仅供大家参考。 本次竞赛赛题难度A:B:C3:1:4&…

学习笔记——动态路由——OSPF(OSPF状态机、DR\BDR选举)

七、OSPF状态机、DR\BDR选举 1、OSPF的8种状态机 OSPF在邻居与邻接建立的过程中会经过多个状态机的变化&#xff0c;状态机的出现不仅能让我们了解OSPF建立过程&#xff0c;也能在OSPF出现故障的时候通过状态机的状态来粗略判断问题的所在。 (1)邻居建立状态变化过程 1、Dow…

搜维尔科技:Xsens实时动作捕捉,驱动人形机器人操作!

搜维尔科技&#xff1a;http://www.souvr.comhttp://www.souvr.com实时动作捕捉&#xff0c;驱动人形机器人操作&#xff01; 搜维尔科技&#xff1a;Xsens实时动作捕捉&#xff0c;驱动人形机器人操作&#xff01;

Vue3Echarts写关于温湿度统计的好看折线图

在项目统计界面&#xff0c;我们离不开对Echarts的使用&#xff0c;接下来是我在做项目过程中做的一个关于温湿度统计的好看折线图&#xff0c;统计的是温度蓝色和湿度绿色&#xff0c;它们还会有告警和断电&#xff0c;分别用橘黄色和红色区分&#xff0c;以下是示例&#xff…

anaconda命令大全

目录 查看所有虚拟环境查看某虚拟环境安装的包创建虚拟环境激活创建好的虚拟环境回到之前的环境删除创建的虚拟环境查看conda所在的位置、虚拟环境位置等信息conda修改虚拟环境所在的位置 查看所有虚拟环境 conda env list查看某虚拟环境安装的包 激活要查看的虚拟环境之后&a…

欢乐钓鱼大师游戏攻略:在什么地方掉称号鱼?云手机游戏辅助!

《欢乐钓鱼大师》是一款融合了休闲娱乐和策略挑战的钓鱼游戏。游戏中的各种鱼类不仅各具特色&#xff0c;而且钓鱼过程充满了挑战和乐趣。下面将为大家详细介绍如何在游戏中钓鱼&#xff0c;以及一些有效的钓鱼技巧&#xff0c;帮助你成为一个出色的钓鱼大师。 实用工具推荐 为…

如何计算弧线弹道的落地位置

1&#xff09;如何计算弧线弹道的落地位置 2&#xff09;Unity 2021 IL2CPP下使用Protobuf-net序列化报异常 3&#xff09;编译问题&#xff0c;用Mono可以&#xff0c;但用IL2CPP就报错 4&#xff09;Wwise的Bank在安卓上LoadBank之后&#xff0c;播放没有声音 这是第393篇UWA…

oracle存储结构-----逻辑存储结构(表空间、段、区、块)

文章目录 oracle存储结构图&#xff08;逻辑存储物理存储&#xff09;oracle逻辑存储结构图逻辑存储结构、表空间、段、区、数据块的关系&#xff1a;1、数据 块&#xff08;block&#xff09;---逻辑存储最小单位2、 数据区&#xff08;extent&#xff09;--存储空间分配和回收…

使用 Git Hooks 防止敏感信息泄露

欢迎关注公众号&#xff1a;冬瓜白 在日常开发中&#xff0c;我们可能会不小心将敏感信息提交到 Git。为了防止这种情况&#xff0c;可以利用 Git Hooks 编写一个简单的脚本&#xff0c;当发现提交中包含敏感词时&#xff0c;给出提示。 以下是一个基于 pre-commit 钩子的示例…

记录在Linux(龙蜥8.6)配置jdk环境变量不生效问题

在Linux中&#xff0c;将jdk解压至 /opt/soft/jdk 目录中&#xff0c;使用root用户配置在/etc/profile中配置环境变量并source后&#xff0c;使用root用户是正常的&#xff0c;但是切换到普通用户后提示&#xff1a; 这个问题是普通用户对 /opt/soft/jdk 目录无执行权限 解决&…

面向对象-封装

一.包 1.简介 当我们把所有的java类都写src下的第一层级&#xff0c;如果是项目中&#xff0c;也许会有几百个java文件。 src下的文件会很多&#xff0c;开发的时候不方便查找&#xff0c;也不方便维护如果较多的文件中有同名的&#xff0c;十分麻烦 模块1中有一个叫test.ja…

汇聚荣拼多多评价好不好?

汇聚荣拼多多评价好不好?在探讨电商平台的口碑时&#xff0c;用户评价是衡量其服务质量和商品质量的重要指标。拼多多作为国内领先的电商平台之一&#xff0c;其用户评价自然成为消费者选择购物平台时的参考依据。针对“汇聚荣拼多多评价好不好?”这一问题&#xff0c;可以从…

Pinia:Vue 2 和 Vue 3 中更好用的状态管理框架

前言 还在用Vuex? 在Vue应用程序的开发过程中&#xff0c;高效且易于维护的状态管理一直是开发者关注的核心问题之一。随着Vue 3的发布&#xff0c;状态管理领域迎来了一位新星——Pinia&#xff0c;它不仅为Vue 3量身打造&#xff0c;同时也向下兼容Vue 2&#xff0c;以其简…

uni-appx,实现登录功能,弹窗功能。组件之间传值

这篇文章的内容使用组合式API实现的&#xff0c;只有弹窗部分有选择式API的写法介绍。如果想要看其他选择式API&#xff0c;还请下载官方的hello-uni-appx源码进行学习&#xff0c;查看。想要看组合式API的写法&#xff0c;请查看源码 hello-uvue。 hello-uni-appx源码 相比于…

JavaSE (Java基础):面向对象(下)

8.7 多态 什么是多态&#xff1f; 即同一方法可以根据发送对象的不同而采用多种不同的方式。 一个对象的实际类型是确定的&#xff0c;但可以指向对象的引用的类型有很多。在句话我是这样理解的&#xff1a; 在实例中使用方法都是根据他最开始将类实例化最左边的类型来定的&…

http数据传输确保完整性和保密性整流程方案(含源码)

往期文章回顾 【深度学习】 【深度学习】物体检测/分割/追踪/姿态估计/图像分类检测演示系统【含源码】【深度学习】YOLOV8数据标注及模型训练方法整体流程介绍及演示【深度学习】行人跌倒行为检测软件系统【深度学习】火灾检测软件系统【深度学习】吸烟行为检测软件系统【深度…