微服务架构|go-zero 的自适应熔断器

news2024/10/7 10:21:42

原文链接: go-zero 的自适应熔断器

上篇文章我们介绍了微服务的限流,详细分析了计数器限流和令牌桶限流算法,这篇文章来说说熔断。

熔断和限流还不太一样,限流是控制请求速率,只要还能承受,那么都会处理,但熔断不是。

在一条调用链上,如果发现某个服务异常,比如响应超时。那么调用者为了避免过多请求导致资源消耗过大,最终引发系统雪崩,会直接返回错误,而不是疯狂调用这个服务。

本篇文章会介绍主流熔断器的工作原理,并且会借助 go-zero 源码,分析 googleBreaker 是如何通过滑动窗口来统计流量,并且最终执行熔断的。

工作原理

这部分主要介绍两种熔断器的工作原理,分别是 Netflix 开源的 Hystrix,其也是 Spring Cloud 默认的熔断组件,和 Google 的自适应的熔断器。

Hystrix is no longer in active development, and is currently in maintenance mode.

注意,Hystrix 官方已经宣布不再积极开发了,目前处在维护模式。

Hystrix 官方推荐替代的开源组件:Resilience4j,还有阿里开源的 Sentinel 也是不错的替代品。

hystrixBreaker

Hystrix 采用了熔断器模式,相当于电路中的保险丝,系统出现紧急问题,立刻禁止所有请求,已达到保护系统的作用。

系统需要维护三种状态,分别是:

  • 关闭: 默认状态,所有请求全部能够通过。当请求失败数量增加,失败率超过阈值时,会进入到断开状态。
  • 断开: 此状态下,所有请求都会被拦截。当经过一段超时时间后,会进入到半断开状态。
  • 半断开: 此状态下会允许一部分请求通过,并统计成功数量,当请求成功时,恢复到关闭状态,否则继续断开。

通过状态的变更,可以有效防止系统雪崩的问题。同时,在半断开状态下,又可以让系统进行自我修复。

googleBreaker

googleBreaker 实现了一种自适应的熔断模式,来看一下算法的计算公式,客户端请求被拒绝的概率

参数很少,也比较好理解:

  1. requests:请求数量
  2. accepts:后端接收的请求数量
  3. K:敏感度,一般推荐 1.5-2 之间

通过分析公式,我们可以得到下面几个结论,也就是产生熔断的实际原理:

  1. 正常情况下,requests 和 accepts 是相等的,拒绝的概率就是 0,没有产生熔断
  2. 当正常请求量,也就是 accepts 减少时,概率会逐渐增加,当概率大于 0 时,就会产生熔断。如果 accepts 等于 0 了,则完全熔断。
  3. 当服务恢复后,requests 和 accepts 的数量会同时增加,但由于 K * accepts 增长的更快,所以概率又会很快变回到 0,相当于关闭了熔断。

总的来说,googleBreaker 的实现方案更加优雅,而且参数也少,不用维护那么多的状态。

go-zero 就是采用了 googleBreaker 的方案,下面就来分析代码,看看到底是怎么实现的。

接口设计

接口定义这部分我个人感觉还是挺不好理解的,看了好多遍才理清了它们之间的关系。

其实看代码和看书是一样的,书越看越薄,代码会越看越短。刚开始看感觉代码很长,随着看懂的地方越来越多,明显感觉代码变短了。所以遇到不懂的代码不要怕,反复看,总会看懂的。

首先来看一下 breaker 部分的 UML 图,有了这张图,很多地方看起来还是相对清晰的,下面来详细分析。

这里用到了静态代理模式,也可以说是接口装饰器,接下来就看看到底是怎么定义的:

// core/breaker/breaker.go
internalThrottle interface {
    allow() (internalPromise, error)
    doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
}

// core/breaker/googlebreaker.go
type googleBreaker struct {
    k     float64
    stat  *collection.RollingWindow
    proba *mathx.Proba
}

这个接口是最终实现熔断方法的接口,由 googleBreaker 结构体实现。

// core/breaker/breaker.go
throttle interface {
    allow() (Promise, error)
    doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error
}

type loggedThrottle struct {
    name string
    internalThrottle
    errWin *errorWindow
}

func newLoggedThrottle(name string, t internalThrottle) loggedThrottle {
    return loggedThrottle{
        name:             name,
        internalThrottle: t,
        errWin:           new(errorWindow),
    }
}

这个是实现了日志收集的结构体,首先它实现了 throttle 接口,然后它包含了一个字段 internalThrottle,相当于具体的熔断方法是代理给 internalThrottle 来做的。

// core/breaker/breaker.go
func (lt loggedThrottle) allow() (Promise, error) {
    promise, err := lt.internalThrottle.allow()
    return promiseWithReason{
        promise: promise,
        errWin:  lt.errWin,
    }, lt.logError(err)
}

func (lt loggedThrottle) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
    return lt.logError(lt.internalThrottle.doReq(req, fallback, func(err error) bool {
        accept := acceptable(err)
        if !accept && err != nil {
            lt.errWin.add(err.Error())
        }
        return accept
    }))
}

所以当它执行相应方法时,都是直接调用 internalThrottle 接口的方法,然后再加上自己的逻辑。

这也就是代理所起到的作用,在不改变原方法的基础上,扩展原方法的功能。

// core/breaker/breaker.go
circuitBreaker struct {
    name string
    throttle
}

// NewBreaker returns a Breaker object.
// opts can be used to customize the Breaker.
func NewBreaker(opts ...Option) Breaker {
    var b circuitBreaker
    for _, opt := range opts {
        opt(&b)
    }
    if len(b.name) == 0 {
        b.name = stringx.Rand()
    }
    b.throttle = newLoggedThrottle(b.name, newGoogleBreaker())

    return &b
}

最终的熔断器又将功能代理给了 throttle

这就是它们之间的关系,如果感觉有点乱的话,就反复看,看的次数多了,就清晰了。

日志收集

上文介绍过了,loggedThrottle 是为了记录日志而设计的代理层,这部分内容来分析一下是如何记录日志的。

// core/breaker/breaker.go
type errorWindow struct {
    // 记录日志的数组
    reasons [numHistoryReasons]string
    // 索引
    index   int
    // 数组元素数量,小于等于 numHistoryReasons
    count   int
    lock    sync.Mutex
}

func (ew *errorWindow) add(reason string) {
    ew.lock.Lock()
    // 记录错误日志内容
    ew.reasons[ew.index] = fmt.Sprintf("%s %s", time.Now().Format(timeFormat), reason)
    // 对 numHistoryReasons 进行取余来得到数组索引
    ew.index = (ew.index + 1) % numHistoryReasons
    ew.count = mathx.MinInt(ew.count+1, numHistoryReasons)
    ew.lock.Unlock()
}

func (ew *errorWindow) String() string {
    var reasons []string

    ew.lock.Lock()
    // reverse order
    for i := ew.index - 1; i >= ew.index-ew.count; i-- {
        reasons = append(reasons, ew.reasons[(i+numHistoryReasons)%numHistoryReasons])
    }
    ew.lock.Unlock()

    return strings.Join(reasons, "\n")
}

核心就是这里采用了一个环形数组,通过维护两个字段来实现,分别是 indexcount

count 表示数组中元素的个数,最大值是数组的长度;index 是索引,每次 +1,然后对数组长度取余得到新索引。

我之前有一次面试就让我设计一个环形数组,当时答的还不是很好,这次算是学会了。

滑动窗口

一般来说,想要判断是否需要触发熔断,那么首先要知道一段时间的请求数量,一段时间内的数量统计可以使用滑动窗口来实现。

首先看一下滑动窗口的定义:

// core/collection/rollingwindow.go

type RollingWindow struct {
    lock          sync.RWMutex
    // 窗口大小
    size          int
    // 窗口数据容器
    win           *window
    // 时间间隔
    interval      time.Duration
    // 游标,用于定位当前应该写入哪个 bucket
    offset        int
    // 汇总数据时,是否忽略当前正在写入桶的数据
    // 某些场景下因为当前正在写入的桶数据并没有经过完整的窗口时间间隔
    // 可能导致当前桶的统计并不准确
    ignoreCurrent bool
    // 最后写入桶的时间
    // 用于计算下一次写入数据间隔最后一次写入数据的之间
    // 经过了多少个时间间隔
    lastTime      time.Duration // start time of the last bucket
}

再来看一下 window 的结构:

type Bucket struct {
    // 桶内值的和
    Sum   float64
    // 桶内 add 次数
    Count int64
}

func (b *Bucket) add(v float64) {
    b.Sum += v
    b.Count++
}

func (b *Bucket) reset() {
    b.Sum = 0
    b.Count = 0
}

type window struct {
    // 桶,一个桶就是一个时间间隔
    buckets []*Bucket
    // 窗口大小,也就是桶的数量
    size    int
}

有了这两个结构之后,我们就可以画出这个滑动窗口了,如图所示。

现在来看一下向窗口中添加数据,是怎样一个过程。

func (rw *RollingWindow) Add(v float64) {
    rw.lock.Lock()
    defer rw.lock.Unlock()
    // 获取当前写入下标
    rw.updateOffset()
    // 向 bucket 中写入数据
    rw.win.add(rw.offset, v)
}

func (rw *RollingWindow) span() int {
    // 计算距离 lastTime 经过了多少个时间间隔,也就是多少个桶
    offset := int(timex.Since(rw.lastTime) / rw.interval)
    // 如果在窗口范围内,返回实际值,否则返回窗口大小
    if 0 <= offset && offset < rw.size {
        return offset
    }

    return rw.size
}

func (rw *RollingWindow) updateOffset() {
    // 经过了多少个时间间隔,也就是多少个桶
    span := rw.span()
    // 还在同一单元时间内不需要更新
    if span <= 0 {
        return
    }

    offset := rw.offset
    // reset expired buckets
    // 这里是清除过期桶的数据
    // 也是对数组大小进行取余的方式,类似上文介绍的环形数组
    for i := 0; i < span; i++ {
        rw.win.resetBucket((offset + i + 1) % rw.size)
    }

    // 更新游标
    rw.offset = (offset + span) % rw.size
    now := timex.Now()
    // align to interval time boundary
    // 这里应该是一个时间的对齐,保持在桶内指向位置是一致的
    rw.lastTime = now - (now-rw.lastTime)%rw.interval
}

// 向桶内添加数据
func (w *window) add(offset int, v float64) {
    // 根据 offset 对数组大小取余得到索引,然后添加数据
    w.buckets[offset%w.size].add(v)
}

// 重置桶数据
func (w *window) resetBucket(offset int) {
    w.buckets[offset%w.size].reset()
}

我画了一张图,来模拟整个滑动过程:

主要经历 4 个步骤:

  1. 计算当前时间距离上次添加时间经过了多少个时间间隔,也就是多少个 bucket
  2. 清理过期桶数据
  3. 更新 offset,更新 offset 的过程实际就是模拟窗口滑动的过程
  4. 添加数据

比如上图,刚开始 offset 指向了 bucket[1],经过了两个 span 之后,bucket[2]bucket[3] 会被清空,同时,新的 offset 会指向 bucket[3],新添加的数据会写入到 bucket[3]

再来看看数据统计,也就是窗口内的有效数据量是多少。

// Reduce runs fn on all buckets, ignore current bucket if ignoreCurrent was set.
func (rw *RollingWindow) Reduce(fn func(b *Bucket)) {
    rw.lock.RLock()
    defer rw.lock.RUnlock()

    var diff int
    span := rw.span()
    // ignore current bucket, because of partial data
    if span == 0 && rw.ignoreCurrent {
        diff = rw.size - 1
    } else {
        diff = rw.size - span
    }
    // 需要统计的 bucket 数量,窗口大小减去 span 数量
    if diff > 0 {
        // 获取统计的起始位置,span 是已经被重置的 bucket
        offset := (rw.offset + span + 1) % rw.size
        rw.win.reduce(offset, diff, fn)
    }
}

func (w *window) reduce(start, count int, fn func(b *Bucket)) {
    for i := 0; i < count; i++ {
        // 自定义统计函数
        fn(w.buckets[(start+i)%w.size])
    }
}

统计出窗口数据之后,就可以判断是否需要熔断了。

执行熔断

接下来就是执行熔断了,主要就是看看自适应熔断是如何实现的。

// core/breaker/googlebreaker.go

const (
    // 250ms for bucket duration
    window     = time.Second * 10
    buckets    = 40
    k          = 1.5
    protection = 5
)

窗口的定义部分,整个窗口是 10s,然后分成 40 个 bucket,每个 bucket 就是 250ms。

// googleBreaker is a netflixBreaker pattern from google.
// see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
type googleBreaker struct {
    k     float64
    stat  *collection.RollingWindow
    proba *mathx.Proba
}

func (b *googleBreaker) accept() error {
    // 获取最近一段时间的统计数据
    accepts, total := b.history()
    // 根据上文提到的算法来计算一个概率
    weightedAccepts := b.k * float64(accepts)
    // https://landing.google.com/sre/sre-book/chapters/handling-overload/#eq2101
    dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
    // 如果小于等于 0 直接通过,不熔断
    if dropRatio <= 0 {
        return nil
    }

    // 随机产生 0.0-1.0 之间的随机数与上面计算出来的熔断概率相比较
    // 如果随机数比熔断概率小则进行熔断
    if b.proba.TrueOnProba(dropRatio) {
        return ErrServiceUnavailable
    }

    return nil
}

func (b *googleBreaker) history() (accepts, total int64) {
    b.stat.Reduce(func(b *collection.Bucket) {
        accepts += int64(b.Sum)
        total += b.Count
    })

    return
}

以上就是自适应熔断的逻辑,通过概率的比较来随机淘汰掉部分请求,然后随着服务恢复,淘汰的请求会逐渐变少,直至不淘汰。

func (b *googleBreaker) allow() (internalPromise, error) {
    if err := b.accept(); err != nil {
        return nil, err
    }

    // 返回一个 promise 异步回调对象,可由开发者自行决定是否上报结果到熔断器
    return googlePromise{
        b: b,
    }, nil
}

// req - 熔断对象方法
// fallback - 自定义快速失败函数,可对熔断产生的err进行包装后返回
// acceptable - 对本次未熔断时执行请求的结果进行自定义的判定,比如可以针对http.code,rpc.code,body.code
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
    if err := b.accept(); err != nil {
        // 熔断中,如果有自定义的fallback则执行
        if fallback != nil {
            return fallback(err)
        }

        return err
    }

    defer func() {
        // 如果执行req()过程发生了panic,依然判定本次执行失败上报至熔断器
        if e := recover(); e != nil {
            b.markFailure()
            panic(e)
        }
    }()

    err := req()
    // 上报结果
    if acceptable(err) {
        b.markSuccess()
    } else {
        b.markFailure()
    }

    return err
}

熔断器对外暴露两种类型的方法:

1、简单场景直接判断对象是否被熔断,执行请求后必须需手动上报执行结果至熔断器。

func (b *googleBreaker) allow() (internalPromise, error)

2、复杂场景下支持自定义快速失败,自定义判定请求是否成功的熔断方法,自动上报执行结果至熔断器。

func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error

个人感觉,熔断这部分代码,相较于前几篇文章,理解起来是更困难的。但其中的一些设计思想,和底层的实现原理也是非常值得学习的,希望这篇文章能够对大家有帮助。

以上就是本文的全部内容,如果觉得还不错的话欢迎点赞转发关注,感谢支持。


参考文章:

  • https://juejin.cn/post/7030997067560386590
  • https://go-zero.dev/docs/tutorials/service/governance/breaker
  • https://sre.google/sre-book/handling-overload/
  • https://martinfowler.com/bliki/CircuitBreaker.html

推荐阅读:

  • go-zero 是如何实现令牌桶限流的?
  • go-zero 是如何实现计数器限流的?
  • go-zero 是如何做路由管理的?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/963311.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Meta AI的Nougat能够将数学表达式从PDF文件转换为机器可读文本

大多数科学知识通常以可移植文档格式&#xff08;PDF&#xff09;的形式存储&#xff0c;这也是互联网上第二突出的数据格式。然而&#xff0c;从这种格式中提取信息或将其转换为机器可读的文本具有挑战性&#xff0c;尤其是在涉及数学表达式时。 为了解决这个问题&#xff0c…

Sharding-JDBC(九)5.3.0版本,实现按月分表、自动建表、自动刷新节点

目录 一、简介二、Maven依赖三、配置文件application.ymlsharding.yaml 四、代码实现1.自动建表、自动刷新节点思路2.创建表结构3.TimeShardingAlgorithm.java 分片算法类4.ShardingAlgorithmTool.java 分片工具类5.ShardingTablesLoadRunner.java 初始化缓存类6.SpringUtil.ja…

xss前十二关靶场练习

目录 一、xss原理和分类 1.原理 2.分类&#xff1a;xss分为存储型和反射型以及dom型 &#xff08;1&#xff09;反射性 &#xff08;2&#xff09;存储型 &#xff08;3&#xff09;dom型 二、靶场关卡练习​编辑 1.第一关 2.第二关 3.第三关 4.第四关 5.第五关 6…

flutter plugins插件【三】【Flutter Intl】

3、 Flutter Intl 多语言国际化 在Android Studio中菜单Tools找到flutter intl创建多语言配置。 创建后会在pubspec.yaml出现 flutter_intl:enabled: true 在工程的lib会生成l10n与generated文件夹 l10n包含 intl_en.arb intl_zn.arb 我们在intl_en.arb添加 { home: &quo…

常见的存储结构

分析&回答 本文只作为了解&#xff0c;让大家理解 B数跟透传&#xff0c;可以不刷哈。 常见的存储结构&#xff1a; 我们计算机的主存基本都是随机访问存储器(Random-Access Memory&#xff0c;RAM)&#xff0c;他分为两类&#xff1a;静态随机访问存储器&#xff08;SRA…

iPhone 15 Ultra都要来啦?顶配8GB 内存、2TB存储,满足任何想象

据相关媒体透露&#xff0c;苹果计划在今年9月推出全新的iPhone 15系列。除了已确认的iPhone 15 Pro和iPhone 15 Pro Max之外&#xff0c;还有一款名为iPhone 15 Ultra至尊版的机型将会问世。 这款iPhone 15 Ultra将成为苹果旗舰系列的巅峰之作&#xff0c;预计将配备更高配置和…

【算法竞赛宝典】语言之争

【算法竞赛宝典】语言之争 题目描述代码展示 题目描述 代码展示 //语言之争 #include<fstream> #include<string>using namespace std;ifstream cin("language.in"); ofstream cout("language.out");string a; int n;int main() {int i;bool …

快速建设数字工厂管理系统,需要做好哪些准备

随着工业4.0的推进&#xff0c;数字工厂管理系统已经在全球范围内得到了广泛的应用。在中国&#xff0c;许多制造业企业也逐步认识到数字化转型的重要性&#xff0c;开始积极探索和实施数字工厂管理系统。那么&#xff0c;在快速建设数字工厂管理系统的过程中&#xff0c;需要做…

SpringBoot 使用MyBatis分页插件实现分页功能

SpringBoot 使用MyBatis分页插件实现分页功能 1、集成pagehelper2、配置pagehelper3、编写代码4、分页效果 案例地址&#xff1a; https://gitee.com/vinci99/paging-pagehelper-demo/tree/master 1、集成pagehelper <!-- 集成pagehelper --> <dependency><gr…

大数据课程K17——Spark的协同过滤法

文章作者邮箱&#xff1a;yugongshiyesina.cn 地址&#xff1a;广东惠州 ▲ 本章节目的 ⚪ 了解Spark的协同过滤概念&#xff1b; 一、协同过滤概念 1. 概念 协同过滤是一种借助众包智慧的途径。它利用大量已有的用户偏好来估计用户对其未接触过的物品的喜好程…

最快速度!北京银行6个月完成40个业务系统数字化升级

近日&#xff0c;北京银行的数字化升级步调不断加快。过去 6 个月&#xff0c;基于原生分布式数据库 OceanBase&#xff0c;北京银行已完成包括客户信息系统等在内的 40 个关键业务系统升级。 北京银行将数字化转型作为统领“五大转型”的发展战略&#xff0c;要以技术为创新驱…

开发部门源代码防泄密方案

在企业内部&#xff0c;最核心的部门无外乎企业的研发部门&#xff0c;研发部门可以说是每一家企业的核心动力&#xff0c;研发部门研发的资料一般为源代码、图纸两种类型最多。那么企业投入大最的人力物力&#xff0c;当研发离职时&#xff0c;都会把在企业做过的源代码或图纸…

Systrace分析App性能学习笔记

学习Gracker Systrace系列文章&#xff0c;总结使用Systrace分析App性能的方法。推荐想通过Systrace学习Framework的同学&#xff0c;去看原文。 文章目录 概述Systrace使用流程Systrace 文件生成图形方式(不推荐)命令行方式 Systrace分析快捷键使用帧状态线程状态查看线程唤醒…

使用正则表达式在中英文之间添加空格

有时为了排版需要&#xff0c;我们可能需要在文章的中英文之间添加空格&#xff0c;特别是中文中引用了英文单词时&#xff0c;这种情况使用正则表达式整体修订是最明智的做法。首先&#xff0c;推荐使用在线的正则表格式工具&#xff1a;https://regex101.com/ , 该工具非常强…

抖音小程序开发教学系列(1)- 抖音小程序简介

章节一&#xff1a;抖音小程序简介 1.1 抖音小程序的背景和概述 抖音小程序的发展背景和市场趋势&#xff1a; 抖音作为一款热门的短视频社交平台&#xff0c;用户群体庞大&#xff0c;社交共享的特性也为小程序的发展提供了广阔的空间。抖音小程序作为抖音在社交和用户粘性…

【算法竞赛宝典】统计字符

【算法竞赛宝典】统计字符 题目描述代码展示 题目描述 代码展示 //统计出其中英文字母、空格、数字和其他字符的个数 # include <iostream> #include <cstdlib>using namespace std;int main() {freopen("letter.in", "r", stdin);freopen(&q…

软件产品鉴定测试

1. 服务流程 2. 服务内容 该项测试从技术和应用的角度对商用软件产品的质量特性进行全面地、系统地测试和评估&#xff0c;测试内容涵盖了功能性测试、性能测试、可靠性测试、易用性测试、维护性测试及可移植性测试。 3. 周期 7-15个工作日 4. 报告用途 可作为进行省级、国…

支付宝使用OceanBase的历史库实践分享

为解决因业务增长引发的数据库存储空间问题&#xff0c;支付宝基于 OceanBase 数据库启动了历史库项目&#xff0c;通过历史数据归档、过期数据清理、异常数据回滚&#xff0c;实现了总成本降低 80%。 历史数据归档&#xff1a;将在线库&#xff08;SSD 磁盘&#xff09;数据归…

Javase | IO流

目录&#xff1a; 1.输入 (Intput/Read)2.输出 (Output/Write)3.IO4.IO流5.IO流的分类&#xff1a;5.1 分类总述5.2 按照 “流的方向” 进行分类5.3 按照 “读取数据的方式” 进行分类 6.IO包下要重点掌握的流&#xff1a;6.1 文件专属 (流)6.2 转换流 ( 将字节流转换为字符流 …

解决npm install报错: No module named gyp

今天运行一个以前vue项目&#xff0c;启动时报错如下&#xff1a; ERROR Failed to compile with 1 error上午10:19:33 error in ./src/App.vue?vue&typestyle&index0&langscss& Syntax Error: Error: Missing binding D:\javacode\Springboot-MiMall-RSA\V…