Go 第三方库源码分析:juju/ratelimit

Go语言精选

共 6371字,需浏览 13分钟

 ·

2021-09-13 20:14

https://github.com/juju/ratelimit 是一个基于令牌桶算法的限流器:令牌桶就是想象有一个固定大小的桶,系统会以恒定速率向桶中放 Token,桶满则暂时不放。漏桶算法和令牌桶算法的主要区别在于,"漏桶算法"能够强行限制数据的传输速率(或请求频率),而"令牌桶算法"在能够限制数据的平均传输速率外,还允许某种程度的突发传输。


首先看下如何使用:

import "github.com/juju/ratelimit"
var tokenBucket ratelimit.Bucket = nil
func init() { // func NewBucket(fillInterval time.Duration, capacity int64) *Bucket // fillInterval令牌填充的时间间隔 // capacity令牌桶的最大容量 tokenBucket = ratelimit.NewBucket(200time.Millisecond, 20)}
func Handler() { available := tokenBucket.TakeAvailable(1) if available <= 0 { // 限流处理 } // handling}

下面看下源码实现,juju/ratelimit实现很简单,一共只有两个源码文件和一个测试文件:

ratelimit.goratelimit_test.goreader.go

下面我们分析下常用的这两个接口的实现:


1,ratelimit.NewBucket

传入的两个参数分别是产生令牌的的间隔和桶的容量

func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {  return NewBucketWithClock(fillInterval, capacity, nil)}
func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {  return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)}

默认一个间隔周期内就产生一个token,如果是高并发情况下,可以通过参数quantum控制产生多个。第三个参数是一个clock  interface,主要是方便mock测试,如果传nil用的就是realClock{}

// Clock represents the passage of time in a way that// can be faked out for tests.type Clock interface {  // Now returns the current time.  Now() time.Time  // Sleep sleeps for at least the given duration.  Sleep(d time.Duration)}

realClock是实现了上述接口的结构体:

// realClock implements Clock in terms of standard time functions.type realClock struct{}
// Now implements Clock.Now by calling time.Now.func (realClock) Now() time.Time { return time.Now()}
// Now implements Clock.Sleep by calling time.Sleep.func (realClock) Sleep(d time.Duration) { time.Sleep(d)}

上面几个函数仅仅是对这个函数的一个简单包装,加上默认参数,方便一般场景的使用,最终都是调用了这个函数

func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {  if clock == nil {    clock = realClock{}  }  if fillInterval <= 0 {    panic("token bucket fill interval is not > 0")  }  if capacity <= 0 {    panic("token bucket capacity is not > 0")  }  if quantum <= 0 {    panic("token bucket quantum is not > 0")  }  return &Bucket{    clock:           clock,    startTime:       clock.Now(),    latestTick:      0,    fillInterval:    fillInterval,    capacity:        capacity,    quantum:         quantum,    availableTokens: capacity,  }}

出来参数检验外,最后生成了结构体Bucket的指针

type Bucket struct {  clock Clock
// startTime holds the moment when the bucket was // first created and ticks began. startTime time.Time
// capacity holds the overall capacity of the bucket. capacity int64
// quantum holds how many tokens are added on // each tick. quantum int64
// fillInterval holds the interval between each tick. fillInterval time.Duration
// mu guards the fields below it. mu sync.Mutex
// availableTokens holds the number of available // tokens as of the associated latestTick. // It will be negative when there are consumers // waiting for tokens. availableTokens int64
// latestTick holds the latest tick for which // we know the number of tokens in the bucket. latestTick int64}

Bucket里面出了存储初始化必要的参数外,多了两个变量:

availableTokens当前可用的令牌数量

latestTick从程序运行到上一次访问的时候,一共产生了多少次计数(如果quantum等于1的话 ,就是一共产生的令牌数量


2,TakeAvailable

有一个参数,每次取的token数量,一般是一个,为了并发安全,一般会加锁:

func (tb *Bucket) TakeAvailable(count int64) int64 {  tb.mu.Lock()  defer tb.mu.Unlock()  return tb.takeAvailable(tb.clock.Now(), count)}

调用了令牌桶计算的核心函数takeAvailable,第一个参数表示是当前时间,用于计算一共产生了多少个token:

func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {  if count <= 0 {    return 0  }  tb.adjustavailableTokens(tb.currentTick(now))  if tb.availableTokens <= 0 {    return 0  }  if count > tb.availableTokens {    count = tb.availableTokens  }  tb.availableTokens -= count  return count}

其中tb.adjustavailableTokens(tb.currentTick(now))用于计算修改可用token数量availableTokens,如果availableTokens<=0,说明限流了;如果输入的count比availableTokens,我么最多只能获取availableTokens个token,获取后,我们把availableTokens减去已经使用的token数量。

func (tb *Bucket) currentTick(now time.Time) int64 {  return int64(now.Sub(tb.startTime) / tb.fillInterval)}

计算出了从开始运行到,当前时间内时间一共跳变了多少次,也就是一共产生了多少次令牌。

func (tb *Bucket) adjustavailableTokens(tick int64) {  lastTick := tb.latestTick  tb.latestTick = tick  if tb.availableTokens >= tb.capacity {    return  }  tb.availableTokens += (tick - lastTick) * tb.quantum  if tb.availableTokens > tb.capacity {    tb.availableTokens = tb.capacity  }  return}

1,如果可用token数量大于等于令牌桶的容量,说明很长时间没有流量来获取token了,不用处理。

2,计算上一次获取token 到现时刻,产生的token数量,把它加到availableTokens上

3,如果availableTokens数量比capacity大,说明溢出了,修改availableTokens为capacity。


以上就是令牌桶算法的核心逻辑。当然,这个包还封装了一些其他的灵活的取令牌的接口,比如

func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {  tb.mu.Lock()  defer tb.mu.Unlock()  return tb.take(tb.clock.Now(), count, maxWait)}

这个函数就是获取,在maxWait time.Duration超时的前提下,产生count个token,需要等待的时间间隔。

func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {  if count <= 0 {    return 0, true  }
tick := tb.currentTick(now) tb.adjustavailableTokens(tick) avail := tb.availableTokens - count if avail >= 0 { tb.availableTokens = avail return 0, true } // Round up the missing tokens to the nearest multiple // of quantum - the tokens won't be available until // that tick.
// endTick holds the tick when all the requested tokens will // become available. endTick := tick + (-avail+tb.quantum-1)/tb.quantum endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval) waitTime := endTime.Sub(now) if waitTime > maxWait { return 0, false } tb.availableTokens = avail return waitTime, true}

函数的前半部分和takeAvailable一模一样,后面逻辑表示,如果令牌不够的情况下:

1,计算还缺多少个令牌

2,计算缺这么多令牌需要跳变多少次

3,计算跳变这些次数需要的时间

4,判断需要的时间是否超时

还有一个wait接口,用来计算,获取count个令牌需要的时间,然后sleep这么长时间。

func (tb *Bucket) Wait(count int64) {  if d := tb.Take(count); d > 0 {    tb.clock.Sleep(d)  }}

以上就是令牌桶算法的核心源码实现,

ratelimit/reader.go

里面实现了基于上述限流器实现的读限速和写限速,原理是通过读写buff的长度来控制Wait函数的等待时间,实现读写限速的

func (r *reader) Read(buf []byte) (int, error) {  n, err := r.r.Read(buf)  if n <= 0 {    return n, err  }  r.bucket.Wait(int64(n))  return n, err}


推荐阅读


福利

我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。

浏览 17
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报