golang源码分析:断路器 sony/gobreaker
本文分析一款断路器sony/gobreaker,https://github.com/sony/gobreaker,它的源码很简单就一个gobreaker.go,并且它携带了一个例子example/http_breaker.go,我们先从例子入手分析,它主要包含了两个函数,首先是初始化函数:
func init()var st gobreaker.Settingsst.ReadyToTrip = func(counts gobreaker.Counts) bool {}cb = gobreaker.NewCircuitBreaker(st)
它通过ReadyToTrip设置短路规则,用Settings来初始化一个断路器。
第二个函数是cb.Execute函数,它的参数是一个函数,我们可以在函数内部定义封装我们的业务请求,比如做成http client的middleware
func Get(url string) ([]byte, error)body, err := cb.Execute(func() (interface{}, error) {}
通过上面两步,我们可以很快实现我们的breaker接入,很简单有没有。
下面我们分析下gobreaker的源码gobreaker.go
首先定义了断路器的状态
type State int它分为三种:关闭,半开,打开
const (StateClosed State = iotaStateHalfOpenStateOpen)
然后是断路器特有的两个错误码
ErrTooManyRequests = errors.New("too many requests")// 请求数量超出半开的限制ErrOpenState = errors.New("circuit breaker is open")// 断路器打开
state有一个方法,用于序列化
func (s State) String() string接着我们看下计数器,它保存了请求的成功和失败计数,以及连续成功数和连续失败数,是决策断路器打开关闭的依据。断路器会定时会清理这些计数,每清理一次产生一个新的generation。
type Counts struct {Requests uint32TotalSuccesses uint32TotalFailures uint32ConsecutiveSuccesses uint32ConsecutiveFailures uint32}
对应的,有四个修改计数器的函数,比如onSucess成功一次就会把连续失败次数清0,成功数和连续成功数加一
func (c *Counts) onRequest()func (c *Counts) onSuccess()func (c *Counts) onFailure()func (c *Counts) clear()
接着我们看下Setting数,它是我们使用的时候可控参数的集合:
type Settings struct {Name stringMaxRequests uint32Interval time.DurationTimeout time.DurationReadyToTrip func(counts Counts) boolOnStateChange func(name string, from State, to State)IsSuccessful func(err error) bool}
断路器的设置参数含义分析如下:
MaxRequests:半开状态下允许通过的最大请求数,超过这个数会失败报错
Interval:闭合状态下清理计数器的时间间隔;闭合状态下如果间隔<=0计数器不清零
Timeout:断路器打开状态的时间,超过这个时间后变成半开,如果设置的值小于等于0,默认会设置成60s
ReadyToTrip:闭合状态下,请求失败一次它就会调用,如果返回true,就会变成打开状态,如果是nil会调用默认的,它的定义是连续失败5次,返回true。我们可以在这个函数里定义我们期望的断路器打开策略。
OnStateChange:状态变化的时候调用
IsSuccessful:请求是否成功,根据这个值来修改计数器;如果是nil会调用默认函数,当返回的错误不是nil就认为是false
接下来就是非常重要的断路器结构,维护了请求过程中三个状态切换的状态机:
type CircuitBreaker struct {name stringmaxRequests uint32interval time.Durationtimeout time.DurationreadyToTrip func(counts Counts) boolisSuccessful func(err error) boolonStateChange func(name string, from State, to State)mutex sync.Mutexstate Stategeneration uint64counts Countsexpiry time.Time}
两阶段断路器提供了另外一种使用方法,不是通过Execute函数来包裹整个请求,而是通过单独一步回调的方式来检查请求结果是否成功。
type TwoStepCircuitBreaker struct {cb *CircuitBreaker}
我们看下断路器的构造函数做了啥:
func NewCircuitBreaker(st Settings) *CircuitBreakerdefaultReadyToTripcounts.ConsecutiveFailures > 5defaultIsSuccessfulerr == nilcb.toNewGeneration(time.Now())
它通过Settings来初始化了断路器的参数,判断两个函数ReadyToTrip,IsSuccessful 是否有指定,没有指定的话使用默认函数,最后用当前时间戳,初始化了我们状态机。
两阶段断路器类似:
func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker func (cb *CircuitBreaker) Name() string获取当前状态:
func (cb *CircuitBreaker) State() Statecb.mutex.Lock()now := time.Now()state, _ := cb.currentState(now)
func (cb *CircuitBreaker) Counts() Counts 执行传入的请求,如果成功更新成功计数,panic也当作失败处理:
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error)generation, err := cb.beforeRequest()cb.afterRequest(generation, false)result, err := req()cb.afterRequest(generation, cb.isSuccessful(err))
func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error)generation, err := tscb.cb.beforeRequest()return func(success bool) {tscb.cb.afterRequest(generation, success)}, nil
每次执行的时候都会调用两个函数beforeRequest和afterRequest,前者获取当前的状态和generation,增加访问计数,返回对应的错误,如果断路器打开,或者半开但是访问计数达到了最大值,不发送请求:
func (cb *CircuitBreaker) beforeRequest() (uint64, error)state, generation := cb.currentState(now)if state == StateOpen {return generation, ErrOpenState} else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {return generation, ErrTooManyRequests}cb.counts.onRequest()
后者根据请求结果的成功失败来更新我们的计数器,成功失败由settings里面的IsSuccess函数的返回值来决定。
func (cb *CircuitBreaker) afterRequest(before uint64, success bool)state, generation := cb.currentState(now)if generation != before {return}if success {cb.onSuccess(state, now)} else {cb.onFailure(state, now)}
计数器的更新是通过onSuccess和onFailure两个函数来实现的,如果连续成功的请求数达到了设置里面的最大请求数,断路器从半开状态变为关闭状态
func (cb *CircuitBreaker) onSuccess(state State, now time.Time)switch state {case StateClosed:cb.counts.onSuccess()case StateHalfOpen:cb.counts.onSuccess()if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {cb.setState(StateClosed, now)}}
onFailure根据settings里面的readyToTrip函数来决策是否由关闭状态到达打开状态,如果是半开状态,本次请求又失败了,它会变回打开状态。
func (cb *CircuitBreaker) onFailure(state State, now time.Time)switch state {case StateClosed:cb.counts.onFailure()if cb.readyToTrip(cb.counts) {cb.setState(StateOpen, now)}case StateHalfOpen:cb.setState(StateOpen, now)}
currentState获取当前状态和分代信息,如果当前是关闭状态,到了过期时间,会把计数器清零,并且产生一个新的分代。如果是打开状态,并且已经过期,会把状态改为半开状态:
func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64)switch cb.state {case StateClosed:if !cb.expiry.IsZero() && cb.expiry.Before(now) {cb.toNewGeneration(now)}case StateOpen:if cb.expiry.Before(now) {cb.setState(StateHalfOpen, now)}}
每次修改状态都会根据当前时间戳产生一个新的分代,并且调用onStateChange函数
func (cb *CircuitBreaker) setState(state State, now time.Time) {cb.toNewGeneration(now)cb.onStateChange(cb.name, prev, state)
产生新的分代的时候,会清零计数器,更新过期时间,过期时间只和打开和关闭两个状态有关,半开没有过期时间
func (cb *CircuitBreaker) toNewGeneration(now time.Time)cb.generation++cb.counts.clear()cb.expiry = now.Add(cb.interval)switch cb.state {case StateClosed:if cb.interval == 0 {cb.expiry = zero} else {cb.expiry = now.Add(cb.interval)}case StateOpen:cb.expiry = now.Add(cb.timeout)default: // StateHalfOpencb.expiry = zero}
整体代码就分析完了,是不是很简洁明了?当然其中修改状态,使用了锁,分析过程中略去了,其实这里还是有优化空间的,比如换成更轻量级的原子操作。
推荐阅读
