Go 第三方库源码分析:uber-go/ratelimit
https://github.com/uber-go/ratelimit 是一个漏桶限流器的实现,
rl := ratelimit.New(100) // per secondprev := time.Now()for i := 0; i < 10; i++ {now := rl.Take()fmt.Println(i, now.Sub(prev))prev = now}
在这个例子中,我们给定限流器每秒可以通过 100 个请求,也就是平均每个请求间隔 10ms。因此,最终会每 10ms 打印一行数据。输出结果如下:
// Output:// 0 0// 1 10ms// 2 10ms
整个包中源码如下:
example_test.golimiter_atomic.golimiter_mutexbased.goratelimit.goratelimit_bench_test.goratelimit_test.go
1,ratelimit.New
先看下初始化的过程:
// New returns a Limiter that will limit to the given RPS.func New(rate int, opts ...Option) Limiter {return newAtomicBased(rate, opts...)}
传入的参数是1s内产生的token数量:
// newAtomicBased returns a new atomic based limiter.func newAtomicBased(rate int, opts ...Option) *atomicLimiter {// TODO consider moving config building to the implementation// independent code.config := buildConfig(opts)perRequest := config.per / time.Duration(rate)l := &atomicLimiter{perRequest: perRequest,maxSlack: -1 * time.Duration(config.slack) * perRequest,clock: config.clock,}initialState := state{last: time.Time{},sleepFor: 0,}atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))return l}
1,通过options修改配置参数 config := buildConfig(opts)
func buildConfig(opts []Option) config {c := config{clock: clock.New(),slack: 10,per: time.Second,}for _, opt := range opts {opt.apply(&c)}return c}
可以看到,默认情况下per是1s
2,计算产生一个令牌话费的时间(时间间隔)
perRequest := config.per / time.Duration(rate)
3,初始化atomicLimiter,令牌产生时间间隔,时钟
type atomicLimiter struct {state unsafe.Pointer:ignore U1000 Padding is unused but it is crucial to maintain performanceof this rate limiter in case of collocation with other frequently accessed memory.padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.perRequest time.DurationmaxSlack time.Durationclock Clock}
4,记录初始化状态:当前时间和休眠时间
type state struct {last time.TimesleepFor time.Duration}
完成初始化的流程后,我们就进入了令牌产生的流程了。
2,rl.Take
Take是一个接口,返回当前时间
// Limiter is used to rate-limit some process, possibly across goroutines.// The process is expected to call Take() before every iteration, which// may block to throttle the goroutine.type Limiter interface {// Take should block to make sure that the RPS is met.Take() time.Time}
atomicLimiter 实现了这个接口
// Take blocks to ensure that the time spent between multiple// Take calls is on average time.Second/rate.func (t *atomicLimiter) Take() time.Time {var (newState statetaken boolinterval time.Duration)for !taken {now := t.clock.Now()previousStatePointer := atomic.LoadPointer(&t.state)oldState := (*state)(previousStatePointer)newState = state{last: now,sleepFor: oldState.sleepFor,}// If this is our first request, then we allow it.if oldState.last.IsZero() {taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))continue}// sleepFor calculates how much time we should sleep based on// the perRequest budget and how long the last request took.// Since the request may take longer than the budget, this number// can get negative, and is summed across requests.newState.sleepFor += t.perRequest - now.Sub(oldState.last)// We shouldn't allow sleepFor to get too negative, since it would mean that// a service that slowed down a lot for a short period of time would get// a much higher RPS following that.if newState.sleepFor < t.maxSlack {newState.sleepFor = t.maxSlack}if newState.sleepFor > 0 {newState.last = newState.last.Add(newState.sleepFor)interval, newState.sleepFor = newState.sleepFor, 0}taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))}t.clock.Sleep(interval)return newState.last}
1,获取当前时间
2,如果是初始化状态,也就是上一次访问时间是0,那么设置上一次访问时间是当前时间,直接返回。
3,计算睡眠的时间,睡眠时间=上一次记录的睡眠时间+每个令牌产生的时间间隔-(当前时间-上一次访问时间),也就是访问时间间隔
4,如果睡眠时间小于maxSlack,说明请求量很小,距离上一次访问时间已经很久了,将睡眠时间修改成maxSlack,否则无法应对大量突发流量。
5,如果睡眠时间大于0,说明请求量比较大,需要等待一段时间才能返回,调用 t.clock.Sleep(t.sleepFor),进入睡眠状态,同时修改上一次访问时间和休眠时间
6,如果小于等于0,说明请求量不大,可以立即返回,并记录当前时间
mutexLimiter 也实现了上述接口:
type mutexLimiter struct {sync.Mutexlast time.TimesleepFor time.DurationperRequest time.DurationmaxSlack time.Durationclock Clock}
差别就是一个是基于互斥锁实现的,一个是基于原子操作实现的
func (t *mutexLimiter) Take() time.Time {t.Lock()defer t.Unlock()now := t.clock.Now()// If this is our first request, then we allow it.if t.last.IsZero() {t.last = nowreturn t.last}// sleepFor calculates how much time we should sleep based on// the perRequest budget and how long the last request took.// Since the request may take longer than the budget, this number// can get negative, and is summed across requests.t.sleepFor += t.perRequest - now.Sub(t.last)// We shouldn't allow sleepFor to get too negative, since it would mean that// a service that slowed down a lot for a short period of time would get// a much higher RPS following that.if t.sleepFor < t.maxSlack {t.sleepFor = t.maxSlack}// If sleepFor is positive, then we should sleep now.if t.sleepFor > 0 {t.clock.Sleep(t.sleepFor)t.last = now.Add(t.sleepFor)t.sleepFor = 0} else {t.last = now}return t.last}
Leaky Bucket,每个请求的间隔是固定的,然而,在实际上的互联网应用中,流量经常是突发性的。对于这种情况,uber-go 对 Leaky Bucket 做了一些改良,引入了最大松弛量 (maxSlack) 的概念。我们先理解下整体背景: 假如我们要求每秒限定 100 个请求,平均每个请求间隔 10ms。但是实际情况下,有些请求间隔比较长,有些请求间隔比较短。
(1)当 t.sleepFor > 0,代表此前的请求多余出来的时间,无法完全抵消此次的所需量,因此需要 sleep 相应时间, 同时将 t.sleepFor 置为 0。
(2)当 t.sleepFor < 0,说明此次请求间隔大于预期间隔,将多出来的时间累加到 t.sleepFor 即可。
但是,对于某种情况,请求 1 完成后,请求 2 过了很久到达 (好几个小时都有可能),那么此时对于请求 2 的请求间隔 now.Sub(t.last),会非常大。以至于即使后面大量请求瞬时到达,也无法抵消完这个时间。那这样就失去了限流的意义。
为了防止这种情况,ratelimit 就引入了最大松弛量 (maxSlack) 的概念, 该值为负值,表示允许抵消的最长时间,防止以上情况的出现。
推荐阅读
