文章目录
- 背景
- 源码分析
- 总结
背景
在微服务时代,服务和服务之间调用、跨部门调用都是很常见的事,但这些调用都存在很多不确定因素,如核心服务A依赖的部门B服务挂掉了,那么A本身的功能将会受到直接的影响,而这些都会影响着我们本身为用户提供的产品功能表现,因此,做好服务调用的熔断降级措施是非常有必要的。在golang开发中,我们经常都会使用到一个组件gobreaker,用非常少量的代码实现了服务熔断功能,下面我们将对gobreaker的源代码进行分析。
源码分析
源码地址:https://github.com/sony/gobreaker
熔断器设计:gobreaker熔断器是否生效主要是根据状态来的,熔断器会在Closed、HalfOpen、Open三种状态中转换。
- 初始状态是Closed,这个状态下熔断器会放行所有请求。
- 当满足熔断条件(如达到一定数量的错误计数)时,熔断器进入Open 状态,不能再放行请求,对于所有的请求都直接返回熔断器本身定义的熔断错误。
- 熔断器在Open状态经过一段Interval时间后,自动进入Half-Open状态,在此期间,会根据HalfOpen的策略放行请求,并记录请求的执行结果状态。如果放行的这些请求最终计数满足闭合状态,熔断器将进入Closed状态,继续放行请求,反之则会自动进入Open状态。
Closed
/ \
Half-Open <--> Open
1、熔断器配置
type Settings struct {
Name string // 熔断器名称
MaxRequests uint32 // 最大请求数,熔断器半开状态放行的最大请求数
Interval time.Duration // 计数周期,类似于滑动窗口的窗口大小,用于定期清理counts
Timeout time.Duration // 熔断器进入Open状态后,经过timeout时间进入HalfOpen状态
ReadyToTrip func(counts Counts) bool // 熔断器的计数策略,计数是记在counts中的,如连续出错达到一定数量后,该方法将会返回true,此时熔断器将进入Open状态
OnStateChange func(name string, from State, to State) // 熔断器状态发生变化时候的回调方法,参数表示熔断器从一个状态转变到另一个状态
IsSuccessful func(err error) bool // 熔断器的计数方法,调用发生错误时,通过该方法进行计数,累积到ReadyToTrip中的策略触发后,熔断器将进入Open状态
}
官方解释:
2、熔断器计数
type Counts struct {
Requests uint32 // 总的请求数量
TotalSuccesses uint32 // 总的成功数
TotalFailures uint32 // 总的失败数
ConsecutiveSuccesses uint32 // 连续成功数
ConsecutiveFailures uint32 // 连续失败数
}
此外,需要注意的是,熔断器的计数是发生在范围: Generation周期内的。
3、熔断器
// CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
type CircuitBreaker struct {
name string // 熔断器名称
maxRequests uint32 // 熔断器半开状态的最大请求数
interval time.Duration // 熔断器处于闭合状态时的计数周期,每个周期开始时会清理counts
timeout time.Duration // 熔断器从open状态到halfopen状态的时间
readyToTrip func(counts Counts) bool // 熔断器计数策略
isSuccessful func(err error) bool // 熔断器计数方法
onStateChange func(name string, from State, to State) // 熔断器状态改变回调方法
mutex sync.Mutex
state State // 熔断器当前状态:Open、Closed、HalfOpen
generation uint64 // 每一个时间周期(Interval)的计数(count)状态称为一个generation。
counts Counts // 当前generation的计数统计,切换generation时候会清空counts
expiry time.Time // 过期时间
}
熔断器的核心方法Execute :
// Execute runs the given request if the CircuitBreaker accepts it.
// Execute returns an error instantly if the CircuitBreaker rejects the request.
// Otherwise, Execute returns the result of the request.
// If a panic occurs in the request, the CircuitBreaker handles it as an error
// and causes the same panic again.
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
generation, err := cb.beforeRequest()
if err != nil {
return nil, err
}
defer func() {
e := recover()
if e != nil {
cb.afterRequest(generation, false)
panic(e)
}
}()
result, err := req()
cb.afterRequest(generation, cb.isSuccessful(err))
return result, err
}
该方法主要是几个步骤,beforeRequest()、 执行请求req()和afterRequest(),其中,req是我们真正需要执行的业务方法,比如为A对B的一次http、rpc调用等。
- beforeRequest()
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now) // 获取当前熔断器的状态state和计数周期generation
if state == StateOpen { // 如果熔断器处于Open状态,那么将会直接返回熔断错误,并将generation返回
return generation, ErrOpenState
//如果熔断器处于半开状态,且请求数目已经超过了最大请求数,那么也将会返回错误
} else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
return generation, ErrTooManyRequests
}
// 熔断器处于闭合状态,正常放行请求,计数
cb.counts.onRequest() // counts计数
return generation, nil
}
- afterRequest()
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now) // 获取当前熔断器的状态和计数周期
if generation != before { // 如果此时的计数周期和before阶段返回的不一致,那么将直接返回
return
}
// 否则,根据调用设置的响应,对counts的成功或者失败请求进行计数
if success {
cb.onSuccess(state, now)
} else {
cb.onFailure(state, now)
}
}
// 根据熔断器状态计数成功的请求:
// 1、熔断器处于闭合状态,则直接计数success
// 2、熔断器处于半开状态,则计数成功,且如果连续成功的数量超过了最大请求数,那么熔断器将进入闭合状态,计数进入下一个周期
func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onSuccess()
case StateHalfOpen:
cb.counts.onSuccess()
if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
cb.setState(StateClosed, now)
}
}
}
// 根据熔断器状态计数成功的请求:
// 1、熔断器处于闭合状态,计数失败,且如果当前计数周期的统计结果达到了熔断的条件,那么熔断器将被设置为打开状态。
// 2、如果熔断器处于半开状态,此时又发生了错误,那么熔断器直接进入打开状态
func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onFailure()
if cb.readyToTrip(cb.counts) {
cb.setState(StateOpen, now)
}
case StateHalfOpen:
cb.setState(StateOpen, now)
}
}
- currentState()
该方法作用主要是根据熔断器的状态以及计数过期时间expiry等,来判断是否需要进入到下一个generation(计数周期)中,currentState作用当然就是返回当前的generation和熔断器状态了。
func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
switch cb.state {
case StateClosed: // 当熔断器处于闭合状态时,如果过期时间到,则进入到下一个计数周期中,产生一个新的generation
if !cb.expiry.IsZero() && cb.expiry.Before(now) {
cb.toNewGeneration(now) // 产生新的generation
}
case StateOpen: // 如果熔断器处于打开状态,且过期时间expiry到,那么熔断器将进入半开状态
if cb.expiry.Before(now) {
cb.setState(StateHalfOpen, now)
}
}
return cb.state, cb.generation
}
func (cb *CircuitBreaker) setState(state State, now time.Time) {
if cb.state == state {
return
}
prev := cb.state
cb.state = state
cb.toNewGeneration(now)
if cb.onStateChange != nil {
cb.onStateChange(cb.name, prev, state)
}
}
// 生成新的generation。 主要是清空counts和设置expiry(过期时间)。
// 当状态为Closed时expiry为Closed的过期时间(当前时间 + interval),
// 当状态为Open时expiry为Open的过期时间(当前时间 + timeout)
func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
cb.generation++ // 计数周期++
cb.counts.clear() // 清空counts统计
// 根据熔断器状态state、闭合状态的计数周期interval和
// 熔断器从Open恢复到HalfOpen的超时时间timeout来重置过期时间
var zero time.Time
switch cb.state {
case StateClosed:
if cb.interval == 0 {
cb.expiry = zero
} else {
cb.expiry = now.Add(cb.interval)
}
case StateOpen:
cb.expiry = now.Add(cb.timeout)
default: // StateHalfOpen状态,关闭超时时间
cb.expiry = zero
}
}
总结
Sony的gobreaker通过短短几百行代码就实现了一个功能强大的熔断器,其中的原理解释来源微软Circuit Breaker Pattern,整体上,gobreaker的设计思想主要体现在几个函数中:
- beforeRequest()
该函数主要作用是根据熔断器的计数状态,判断是否放行请求,计数或达到切换新条件刚切换。
1、判断熔断器是否Closed,如是,放行所有请求。并且会在调用toNewGeneration()判断时间是否达到Interval周期,从而清空计数,进入新的计数周期。
2、如果是Open状态,返回ErrOpenState,不放行所有请求。同样判断周期时间,到达则 同样调用 toNewGeneration()
3、如果是HalfOpen状态,则判断是否已放行MaxRequests个请求,如未达到则放行请求;否则返回:ErrTooManyRequests。
beforeRequest方法中,一旦放行请求,就会对当前的周期的请求计数加1。
- afterRequest()
该函数核心内容很简单,主要就是对before阶段放行的请求进行统计,放行请求执行成功/失败都会调用该方法进行计数,达到条件则切换状态。
1、与beforeRequest一样,会调用公共函数 currentState方法;在currentState中会根据熔断器状态和来判断如何产生一个新的计数周期;如果熔断器处于闭合状态,则会根据expiry过期时间来判断熔断器是否进入先前的一个计数周期,如果是则调用toNewGeneration来产生一个新的计数周期,并且清空计数统计。如果熔断器处于断开状态,并且达到超时时间,那么将会改变熔断器的状态为半开状态,并且调用toNewGeneration进入下一个计数周期。
2、注意:在after中进入新的计数周期并是好事,因为这往往意味着执行业务请求req花费了更多的时间,导致before阶段和after阶段不在一个计数周期内,因此,这种情况熔断器将不会计数。也就是说,如果req耗时大于Interval,熔断器每次after时都会进入新的计数周期,上一个周期的统计就清空了,熔断器也就没有太大价值了。
gobreaker的核心代码中使用了一个generation的概念,每一个时间周期(Interval)的计数(count)状态称为一个generation。这个概念保证了熔断器after阶段的计数和before的计数是在同一个计数周期内。