Go 限流器系列(3)--自适应限流

共 3492字,需浏览 7分钟

 ·

2020-08-26 13:10

漏斗桶/令牌桶确实能够保护系统不被拖垮, 但不管漏斗桶还是令牌桶, 其防护思路都是设定一个指标, 当超过该指标后就阻止或减少流量的继续进入,当系统负载降低到某一水平后则恢复流量的进入。但其通常都是被动的,其实际效果取决于限流阈值设置是否合理,但往往设置合理不是一件容易的事情.

项目日常维护中, 经常能够看到某某同学在群里说:xx系统429了, 然后经过一番查找后发现是一波突然的活动流量, 只能申请再新增几台机器. 过了几天 OP 发现该集群的流量达不到预期又下掉了几台机器, 然后又开始一轮新的循环.

这里先不讨论集群自动伸缩的问题. 这里提出一些问题

  1. 集群增加机器或者减少机器限流阈值是否要重新设置?
  2. 设置限流阈值的依据是什么?
  3. 人力运维成本是否过高?
  4. 当调用方反馈429时, 这个时候重新设置限流, 其实流量高峰已经过了重新评估限流是否有意义?

这些其实都是采用漏斗桶/令牌桶的缺点, 总体来说就是太被动, 不能快速适应流量变化

自适应限流

对于自适应限流来说, 一般都是结合系统的 Load、CPU 使用率以及应用的入口 QPS、平均响应时间和并发量等几个维度的监控指标,通过自适应的流控策略, 让系统的入口流量和系统的负载达到一个平衡,让系统尽可能跑在最大吞吐量的同时保证系统整体的稳定性。

比较出名的自适应限流的实现是 Alibaba Sentinel. 不过由于提前没有发现 Sentinel 有个 golang 版本的实现, 本篇文章就以 Kratos 的 BBR 实现探讨自适应限流的原理.

Kratos 自适应限流

借鉴了 Sentinel 项目的自适应限流系统, 通过综合分析服务的 cpu 使用率、请求成功的 qps 和请求成功的 rt 来做自适应限流保护。

  • cpu: 最近 1s 的 CPU 使用率均值,使用滑动平均计算,采样周期是 250ms
  • inflight: 当前处理中正在处理的请求数量
  • pass: 请求处理成功的量
  • rt: 请求成功的响应耗时

限流公式

cpu > 800 AND (Now - PrevDrop) < 1s AND (MaxPass * MinRt * windows / 1000) < InFlight

  • MaxPass 表示最近 5s 内,单个采样窗口中最大的请求数
  • MinRt 表示最近 5s 内,单个采样窗口中最小的响应时间
  • windows 表示一秒内采样窗口的数量,默认配置中是 5s 50 个采样,那么 windows 的值为 10

kratos 中间件实现

func (b *RateLimiter) Limit() HandlerFunc {
 return func(c *Context) {
  uri := fmt.Sprintf("%s://%s%s", c.Request.URL.Scheme, c.Request.Host, c.Request.URL.Path)
  limiter := b.group.Get(uri)
  done, err := limiter.Allow(c)
  if err != nil {
   _metricServerBBR.Inc(uri, c.Request.Method)
   c.JSON(nil, err)
   c.Abort()
   return
  }
  defer func() {
   done(limit.DoneInfo{Op: limit.Success})
   b.printStats(uri, limiter)
  }()
  c.Next()
 }
}

使用方式

e := bm.DefaultServer(nil)
limiter := bm.NewRateLimiter(nil)
e.Use(limiter.Limit())
e.GET("/api", myHandler)

源码实现

Allow

func (l *BBR) Allow(ctx context.Context, opts ...limit.AllowOption) (func(info limit.DoneInfo)error) {
 allowOpts := limit.DefaultAllowOpts()
 for _, opt := range opts {
  opt.Apply(&allowOpts)
 }
 if l.shouldDrop() { // 判断是否触发限流
  return nil, ecode.LimitExceed
 }
 atomic.AddInt64(&l.inFlight, 1// 增加正在处理请求数
 stime := time.Since(initTime) // 记录请求到来的时间
 return func(do limit.DoneInfo) {
  rt := int64((time.Since(initTime) - stime) / time.Millisecond) // 请求处理成功的响应时长
  l.rtStat.Add(rt) // 增加rtStat响应耗时的统计
  atomic.AddInt64(&l.inFlight, -1// 请求处理成功后, 减少正在处理的请求数
  switch do.Op {
  case limit.Success:
   l.passStat.Add(1// 处理成功后增加成功处理请求数的统计
   return
  default:
   return
  }
 }, nil
}

shouldDrop

func (l *BBR) shouldDrop() bool {
 // 判断目前cpu的使用率是否达到设置的CPU的限制, 默认值800
 if l.cpu() < l.conf.CPUThreshold { 
  // 如果上一次舍弃请求的时间是0, 那么说明没有限流的需求, 直接返回
  prevDrop, _ := l.prevDrop.Load().(time.Duration)
  if prevDrop == 0 {
   return false
  }
  // 如果上一次请求的时间与当前的请求时间小于1s, 那么说明有限流的需求
  if time.Since(initTime)-prevDrop <= time.Second {
   if atomic.LoadInt32(&l.prevDropHit) == 0 {
    atomic.StoreInt32(&l.prevDropHit, 1)
   }
   // 增加正在处理的请求的数量
   inFlight := atomic.LoadInt64(&l.inFlight)
   // 判断正在处理的请求数是否达到系统的最大的请求数量
   return inFlight > 1 && inFlight > l.maxFlight()
  }
  // 清空当前的prevDrop
  l.prevDrop.Store(time.Duration(0))
  return false
 }
 // 增加正在处理的请求的数量
 inFlight := atomic.LoadInt64(&l.inFlight)
 // 判断正在处理的请求数是否达到系统的最大的请求数量
 drop := inFlight > 1 && inFlight > l.maxFlight()
 if drop {
  prevDrop, _ := l.prevDrop.Load().(time.Duration)
  // 如果判断达到了最大请求数量, 并且当前有限流需求
  if prevDrop != 0 {
   return drop
  }
  l.prevDrop.Store(time.Since(initTime))
 }
 return drop
}

maxFlight

该函数是核心函数. 其计算公式: MaxPass * MinRt * windows / 1000. maxPASS/minRT都是基于metric.RollingCounter来实现的, 限于篇幅原因这里就不再具体看其实现(想看的可以去看rolling_counter_test.go还是蛮容易理解的)

func (l *BBR) maxFlight() int64 {
 return int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.winBucketPerSec)/1000.0 + 0.5))
}
  • winBucketPerSec: 每秒内的采样数量,其计算方式:int64(time.Second)/(int64(conf.Window)/int64(conf.WinBucket)), conf.Window默认值10s, conf.WinBucket默认值100. 简化下公式: 1/(10/100) = 10, 所以每秒内的采样数就是10
// 单个采样窗口在一个采样周期中的最大的请求数, 默认的采样窗口是10s, 采样bucket数量100
func (l *BBR) maxPASS() int64 {
 rawMaxPass := atomic.LoadInt64(&l.rawMaxPASS)
 if rawMaxPass > 0 && l.passStat.Timespan() < 1 {
  return rawMaxPass
 }
 // 遍历100个采样bucket, 找到采样bucket中最大的请求数
 rawMaxPass = int64(l.passStat.Reduce(func(iterator metric.Iterator) float64 {
  var result = 1.0
  for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
   bucket := iterator.Bucket()
   count := 0.0
   for _, p := range bucket.Points {
    count += p
   }
   result = math.Max(result, count)
  }
  return result
 }))
 if rawMaxPass == 0 {
  rawMaxPass = 1
 }
 atomic.StoreInt64(&l.rawMaxPASS, rawMaxPass)
 return rawMaxPass
}

// 单个采样窗口中最小的响应时间
func (l *BBR) minRT() int64 {
 rawMinRT := atomic.LoadInt64(&l.rawMinRt)
 if rawMinRT > 0 && l.rtStat.Timespan() < 1 {
  return rawMinRT
 }
 // 遍历100个采样bucket, 找到采样bucket中最小的响应时间
 rawMinRT = int64(math.Ceil(l.rtStat.Reduce(func(iterator metric.Iterator) float64 {
  var result = math.MaxFloat64
  for i := 1; iterator.Next() && i < l.conf.WinBucket; i++ {
   bucket := iterator.Bucket()
   if len(bucket.Points) == 0 {
    continue
   }
   total := 0.0
   for _, p := range bucket.Points {
    total += p
   }
   avg := total / float64(bucket.Count)
   result = math.Min(result, avg)
  }
  return result
 })))
 if rawMinRT <= 0 {
  rawMinRT = 1
 }
 atomic.StoreInt64(&l.rawMinRt, rawMinRT)
 return rawMinRT
}

参考

[1]

alibaba/Sentinel-系统自适应限流: https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81

[2]

go-kratos/kratos-自适应限流保护: https://github.com/go-kratos/kratos/blob/master/doc/wiki-cn/ratelimit.md

[3]

alibaba/sentinel-golang-系统自适应流控: https://github.com/alibaba/sentinel-golang/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E6%B5%81%E6%8E%A7





推荐阅读



学习交流 Go 语言,扫码回复「进群」即可


站长 polarisxu

自己的原创文章

不限于 Go 技术

职场和创业经验


Go语言中文网

每天为你

分享 Go 知识

Go爱好者值得关注



浏览 40
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报