【源码解析】断路器Hystrix使用和工作原理

news2024/11/16 7:38:57

断路器Hystrix使用和工作原理

介绍

在微服务架构的分布式系统中,众多微服务有复杂的依赖关系,这些依赖在某些情况下不可避免的会出现一些请求失败。当一个依赖由于延迟高出现阻塞,调用该依赖的服务线程就会发生排队阻塞。如果这个时候出现大量的业务流量打过来,就可能会出现服务器资源被消耗殆尽导致服务宕机。

对于这种情况,就需要有一个服务的保护机制——服务隔离和熔断机制,当依赖应用出现问题,不会导致下游服务出现阻塞而导致一些列的雪崩效应
在这里插入图片描述

快速开始

  1. pom配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.12.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.charles</groupId>
    <artifactId>demo-hystrix</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo-hystrix</name>
    <description>demo-hystrix</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
            <version>2.2.10.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  1. 启动类添加注解 @EnableCircuitBreaker
@SpringBootApplication
@EnableHystrix
public class DemoHystrixApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoHystrixApplication.class, args);
    }

}
  1. 方法上使用断路器注解
@Service
public class HelloService {

    @HystrixCommand(
            fallbackMethod = "fallback",
            commandProperties = {@HystrixProperty(name = "execution.isolation.strategy", value = "THREAD"), @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "1000")})
    public String getHystrixResponse() {
        RestTemplate restTemplate = new RestTemplate();
        HttpHeaders headers = new HttpHeaders();
        headers.add("Content-Type", "application/json; charset=UTF-8");
        headers.add("Accept", "*/*");
        HttpEntity<String> requestEntity = new HttpEntity<>("", headers);
        ResponseEntity<String> exchange = restTemplate.exchange("http://localhost:8082/sleep", HttpMethod.GET, requestEntity, String.class);
        String body = exchange.getBody();
        return body;
    }

    public String fallback() {
        return "sorry, the request is timeout";
    }
}
  1. 当调用远程服务接口超时,超过1s后,就会执行fallback方法。

源码解析

  1. HystrixCommandAspect是实现熔断的核心类。会拦截带有HystrixCommandHystrixCollapser注解的方法。
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = AopUtils.getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");
        } else {
            HystrixCommandAspect.MetaHolderFactory metaHolderFactory = (HystrixCommandAspect.MetaHolderFactory)META_HOLDER_FACTORY_MAP.get(HystrixCommandAspect.HystrixPointcutType.of(method));
            MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
            HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
            ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

            try {
                Object result;
                if (!metaHolder.isObservable()) {
                    result = CommandExecutor.execute(invokable, executionType, metaHolder);
                } else {
                    result = this.executeObservable(invokable, executionType, metaHolder);
                }

                return result;
            } catch (HystrixBadRequestException var9) {
                throw var9.getCause();
            } catch (HystrixRuntimeException var10) {
                throw this.hystrixRuntimeExceptionToThrowable(metaHolder, var10);
            }
        }
    }
  1. 进入到result = CommandExecutor.execute(invokable, executionType, metaHolder);,会执行HystrixCommand#execute
public R execute() {
    try {
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}
  1. 跟进queue方法。
    public Future<R> queue() {
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        //...
    }
  1. 跟进toObservable方法。核心是applyHystrixSemantics
    public Observable<R> toObservable() {
        final AbstractCommand<R> _cmd = this;
        
        //...

        final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                    return Observable.never();
                }
                return applyHystrixSemantics(_cmd);
            }
        };
  1. AbstractCommand#applyHystrixSemantics方法中会执行AbstractCommand#executeCommandAndObserve
    private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
        // mark that we're starting execution on the ExecutionHook
        // if this hook throws an exception, then a fast-fail occurs with no fallback.  No state is left inconsistent
        executionHook.onStart(_cmd);

        /* determine if we're allowed to execute */
        if (circuitBreaker.allowRequest()) {
            final TryableSemaphore executionSemaphore = getExecutionSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        executionSemaphore.release();
                    }
                }
            };

            final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
                }
            };

            if (executionSemaphore.tryAcquire()) {
                try {
                    /* used to track userThreadExecutionTime */
                    executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                    return executeCommandAndObserve(_cmd)
                            .doOnError(markExceptionThrown)
                            .doOnTerminate(singleSemaphoreRelease)
                            .doOnUnsubscribe(singleSemaphoreRelease);
                } catch (RuntimeException e) {
                    return Observable.error(e);
                }
            } else {
                return handleSemaphoreRejectionViaFallback();
            }
        } else {
            return handleShortCircuitViaFallback();
        }
    }
  1. AbstractCommand#executeCommandAndObserve方法中定义了handleFallback,用来处理事件。
        final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
            @Override
            public Observable<R> call(Throwable t) {
                Exception e = getExceptionFromThrowable(t);
                executionResult = executionResult.setExecutionException(e);
                if (e instanceof RejectedExecutionException) {
                    return handleThreadPoolRejectionViaFallback(e);
                } else if (t instanceof HystrixTimeoutException) {
                    return handleTimeoutViaFallback();
                } else if (t instanceof HystrixBadRequestException) {
                    return handleBadRequestByEmittingError(e);
                } else {
                    /*
                     * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                     */
                    if (e instanceof HystrixBadRequestException) {
                        eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                        return Observable.error(e);
                    }

                    return handleFailureViaFallback(e);
                }
            }
        };
  1. AbstractCommand#executeCommandAndObserve中,因为默认是开启超时配置,所以会创建HystrixObservableTimeoutOperator
        Observable<R> execution;
        if (properties.executionTimeoutEnabled().get()) {
            execution = executeCommandWithSpecifiedIsolation(_cmd)
                    .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
        } else {
            execution = executeCommandWithSpecifiedIsolation(_cmd);
        }
  1. HystrixObservableTimeoutOperator对象的call方法,该方法创建了一个listener,之后将listener加入到HystrixTimer中,如果超时会执行tick方法。
        public Subscriber<? super R> call(final Subscriber<? super R> child) {
            final CompositeSubscription s = new CompositeSubscription();
            // if the child unsubscribes we unsubscribe our parent as well
            child.add(s);

            final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, new Runnable() {

                @Override
                public void run() {
                    child.onError(new HystrixTimeoutException());
                }
            });

            TimerListener listener = new TimerListener() {

                @Override
                public void tick() {
                    // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
                    // otherwise it means we lost a race and the run() execution completed or did not start
                    if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
                        // report timeout failure
                        originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);

                        // shut down the original request
                        s.unsubscribe();

                        timeoutRunnable.run();
                        //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
                    }
                }

                @Override
                public int getIntervalTimeInMilliseconds() {
                    return originalCommand.properties.executionTimeoutInMilliseconds().get();
                }
            };

            final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);

            // set externally so execute/queue can see this
            originalCommand.timeoutTimer.set(tl);

            Subscriber<R> parent = new Subscriber<R>() {

                @Override
                public void onCompleted() {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onCompleted();
                    }
                }

                @Override
                public void onError(Throwable e) {
                    if (isNotTimedOut()) {
                        // stop timer and pass notification through
                        tl.clear();
                        child.onError(e);
                    }
                }

                @Override
                public void onNext(R v) {
                    if (isNotTimedOut()) {
                        child.onNext(v);
                    }
                }

                private boolean isNotTimedOut() {
                    // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
                    return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
                            originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
                }

            };

            // if s is unsubscribed we want to unsubscribe the parent
            s.add(parent);

            return parent;
        }
  1. tick方法就是将状态置为Time Out,并将观察者执行一个onError方法,内容为new HystrixTimeoutException()

  2. AbstractCommand#handleTimeoutViaFallback用来处理超时。

  3. GenericCommand#getFallback,用来获取fallback方法,且执行。

    protected Object getFallback() {
        final CommandAction commandAction = this.getFallbackAction();
        if (commandAction != null) {
            try {
                return this.process(new AbstractHystrixCommand<Object>.Action() {
                    Object execute() {
                        MetaHolder metaHolder = commandAction.getMetaHolder();
                        Object[] args = CommonUtils.createArgsForFallback(metaHolder, GenericCommand.this.getExecutionException());
                        return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                    }
                });
            } catch (Throwable var3) {
                LOGGER.error(FallbackErrorMessageBuilder.create().append(commandAction, var3).build());
                throw new FallbackInvocationException(ExceptionUtils.unwrapCause(var3));
            }
        } else {
            return super.getFallback();
        }
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/136845.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

二叉树实现及应用(C语言模拟实现可以存放任意结点的栈、队列,二叉树遍历的递归与非递归实现,附上源码和实验报告,用了自取)

XIAN TECHNOLOGICAL UNIVERSITY 目录 课程设计报告 1绪论 2课程设计目的和内容 3算法的基本思想 1 .建立二叉树结构      建立二叉树时&#xff0c;要先明确是按哪一种遍历规则输入&#xff0c;该二叉树是按你所输入的遍历规则来建立的。本实验用的先序遍历行建树。二叉树…

第六章. 图解数组计算模块Numpy—数据的相关概念和创建数组

第六章. 图解数组计算模块Numpy 6.1 数据的相关概念和创建数组 Numpy是Python数组计算&#xff0c;矩阵运算和科学计算的核心库&#xff0c;它的用途是以数组的形式对数据进行操作&#xff0c;由于Numpy是通过C语言实现的&#xff0c;所以运算速度比较快。 1. Numpy的功能&…

技术贴 | SQL 编译与执行 -parser

前言SQL 编译与执行系列技术博客将按照以下顺序分别介绍整个 SQL 执行引擎。图一 SQL 编译与执行研读流程parser 部分&#xff0c;包括词法解析和语法解析。compile 部分&#xff0c;包括语义解析以及计划的构建。optimize 部分&#xff0c;包括计划的优化。exec 部分&#xff…

十四、TCP多线程、原子类AtomicInteger、日志、枚举

tcp多线程 tcp客户端 多线程收发代码 package com.heima.test2;import java.io.*; import java.net.Socket; import java.nio.charset.Charset; import java.util.Scanner;class ClientSend implements Runnable {Socket socket;Scanner sc new Scanner(System.in);public C…

2019年数维杯国际大学生数学建模B题无人机避障问题设计规划求解全过程文档及程序

2019年数维杯国际大学生数学建模 B题 无人机避障问题设计规划 问题重述&#xff1a; 任务1&#xff1a;假设无人机在飞行过程中不受风向、湿度等外界因素的影响&#xff0c;飞行速度和拍摄角度恒定&#xff0c;无人机对一定宽度的区域进行直线飞行模式航拍。执行此航拍的飞行…

SpringBoot(一): SpringBoot的创建和使用

Spring的创建和使用1. 什么是Spring&#xff1f;2. SpringBoot的优点3. SpringBoot项目的创建3.1 使用IDEA创建3.2 使用网页创建4. 项目目录介绍和运行4.1 目录介绍4.2 项目运行4.3 输出hello world4.4 约定大于配置1. 什么是Spring&#xff1f; Spring的诞生是为了简化Java程…

Spring-boot启动失败 Unregistering JMX-exposed beans on shutdown 异常处理

目录一、异常错误二、原因三、解决方法一、异常错误 Spring-boot启动Run时&#xff0c;出现 o.s.j.e.a.AnnotationMBeanExporter - Unregistering JMX-exposed beans on shutdown 错误 *************************** APPLICATION FAILED TO START Description: The Tomcat conn…

【小程序】包与数据共享

文章目录使用 npm 包Vant WeappAPI Promise化全局事件共享MobX分包分包概念使用分包独立分包分包预下载使用 npm 包 目前&#xff0c;小程序中已经支持使用 npm 安装第三方包&#xff0c;从而来提高小程序的开发效率。但是&#xff0c;在小程序中使用npm 包有如下 3 个限制&am…

【韩顺平Linux】学习笔记3

【韩顺平Linux】学习笔记3一、文件目录指令pwd指令 ls指令cd指令mkdir指令rmdir指令touch指令cp指令rm指令mv指令cat指令more指令less 指令echo指令 head指令tail指令> 指令 >>指令ln指令history指令二、时间日期指令三、查找指令四、压缩和解压一、文件目录指令 根目…

【前端】Vue项目:旅游App-(3)TabBar:点击active效果、点击路由跳转

文章目录目标代码与过程设置active主题颜色添加点击active效果点击路由跳转效果总代码修改或新增的文件common.cssindex.csstab-bar.vue目标 添加点击active效果实现点击路由跳转效果 上一篇TabBar搭建&#xff1a;【前端】Vue项目&#xff1a;旅游App-&#xff08;2&#xff…

LVGL学习笔记12 - 复选框CheckBox

目录 1. Parts 1.1 LV_PART_MAIN 1.2 LV_PART_INDICATOR 2. 状态 3. 样式 3.1 设置字符串颜色 3.2 设置点击框外框颜色 3.3 修改点击框弧度 3.4 修改字符串与点击框的间隔 4. 事件 复选框通过lv_checkbox_create创建。一个CheckBox由一个点击框加一个Label组成。 obj1 …

Minikube Mac 安装 使用

Minikube Mac 安装 使用 环境要求 硬件要求 至少 2核 CPUs2GB 以上内存20GB 以上磁盘空间网络环境容器或虚拟机, 例如: Docker, QEMU, Hyperkit, Hyper-V, KVM, Parallels, Podman, VirtualBox, or VMware Fusion/Workstation 本机环境 Mac Pro 10.13.6 Docker 18.09.1 …

半导体行业相关术语

目录 1.晶圆&#xff08;wafer&#xff09; 2. 自动化测试设备&#xff08;ATE Automatic Test Equipment&#xff09; 3.晶盒&#xff08;Cassette&#xff09; 4. 待测设备(DUT Device Under Test) 5. 探针接口板(PIB Prober Interface Board) 6. 设备接口板(DIB D…

干货 | web自动化总卡在文件上传和弹框处理上?

在有些场景中&#xff0c;需要上传文件&#xff0c;而 Selenium 无法定位到弹出的文件框&#xff0c;以及网页弹出的提醒。这些都是需要特殊的方式来处理。input 标签使用自动化上传&#xff0c;先定位到上传按钮&#xff0c;然后 send_keys 把路径作为值给传进去.如图所示&…

【计算机网络-物理层】通信基础

文章目录1 码元、速率、波特、带宽1.1 码元1.2 波特率1.3 比特率1.4 带宽1.5 相关例题2 奈氏准则、香农定理2.1 奈氏准则&#xff08;采样定理&#xff09;2.2 香农定理2.3 相关例题3 编码方式3.1 归零编码&#xff08;RZ&#xff09;3.2 非归零编码&#xff08;NRZ&#xff09…

【简单DP】[NOIP2007 普及组] 守望者的逃离

P1095 [NOIP2007 普及组] 守望者的逃离 - 洛谷 | 计算机科学教育新生态 (luogu.com.cn)题意&#xff1a;思路&#xff1a;独立做出来的一道DP&#xff01;一开始我去模拟过程找子问题&#xff0c;然后去找阶段是什么本来想的是以路程作为阶段&#xff0c;但是1e8数组开不下那么…

如何看待PyTorch 2.0?

作者&#xff5c;吴育昕 1 为什么是TorchDynamo Graph capture 把用户 Python 写的模型代码变成 graph&#xff0c;是一切编译的根基。而 PyTorch 在试了这么多方案之后似乎已经锁定 TorchDynamo 作为 graph capture 的未来方向了&#xff0c;所以写一点关于 TorchDynamo 的…

假如面试官问你Babel的原理该怎么回答

1. 什么是 Babel 简单地说&#xff0c;Babel 能够转译 ECMAScript 2015 的代码&#xff0c;使它在旧的浏览器或者环境中也能够运行。 // es2015 的 const 和 arrow function const add (a, b) > a b;// Babel 转译后 var add function add(a, b) {return a b; };Babel…

pwr | 谁说样本量计算是个老大难问题!?(二)(独立样本均值篇)

1写在前面 上次介绍了两组发生率的样本量计算方法&#xff0c;通过pwr包进行计算非常简单&#xff0c;可以有效地减少我们的工作量。&#x1f618; 有时候我们想比较两组之间的均值&#xff0c;如何计算样本量又一次成了老大难问题。&#x1f912; 本期我们还是基于pwr包&#…

【自学Java】Windows安装PyCharm IDE

Windows安装PyCharm IDE PyCharm下载 PyCharm下载地址 https://www.jetbrains.com/pycharm/PyCharm下载 打开上面的链接&#xff0c;打开 Python 的开发工具 PyCharm 的下载页面&#xff0c;如下图所示&#xff1a; 这里我们点击 Download&#xff0c;跳转到新的页面&#…