kube-apiserver 重启引起的全量对象更新

k8s技术圈

共 9121字,需浏览 19分钟

 ·

2021-04-06 19:52

现象

k8s master进行线上升级,notifier利用client-go提供的informer机制注册了EndPoint的Update Handler,当kube-apiserver重启时触发了大量的update事件,触发依赖的第三方服务限流。

原因排查

在测试环境进行了测试,并且在注册update事件处理函数中调用 reflect.DeepEqual(old, new) 进行了比较,发现返回true,即old与new完全相同却产生了update事件。

接下来就是到事件产生的地方去寻找原因,主要有两个地方,一个是reflect的ListAndWatch,相当于元数据的生产者,另一个是sharedIndexedInformer的HandleDeltas,消费元数据并生成对应类型的事件分发下去,接下来分别看

HandleDeltas (事件来源)

 func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {    s.blockDeltas.Lock()    defer s.blockDeltas.Unlock()        // from oldest to newest    for _, d := range obj.(Deltas) {      switch d.Type {      case Sync, Added, Updated:          isSync := d.Type == Sync          s.cacheMutationDetector.AddObject(d.Object)                // 重点关注          if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {            if err := s.indexer.Update(d.Object); err != nil {                return err            }            s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)          } else {            if err := s.indexer.Add(d.Object); err != nil {                return err            }            s.processor.distribute(addNotification{newObj: d.Object}, isSync)          }      case Deleted:          if err := s.indexer.Delete(d.Object); err != nil {            return err          }          s.processor.distribute(deleteNotification{oldObj: d.Object}, false)      }    }    return nil }

很容易看出来,当delta类型为非Delete时,informer会从自己的indexer(带索引的缓存)中获取指定的object是否存在(注意这里其实是从object计算出key,然后用key寻找到的),如果存在则更新缓存且分发一个update事件。可以继续看后续对分发的这个notification的处理,都是直接处理,没有任何去重逻辑。到这里就可以理解为啥会收到全量的update事件了,正式因为此时缓存里已经有了对应数据,而在分发事件时并没有比较缓存中的object是否和新来的object一致就直接当成update处理了,导致客户端收到全量的更新事件。那问题又来了,为什么重启apiserver时会往deltafifo里全量扔一遍数据,正常不应该是从最后的resourceVersion开始重新watch吗?继续看下面的处理

ListAndWatch (全量数据的来源)

// 代码位置 k8s.io/client-go/tools/cache/reflector.go  // ListAndWatch first lists all items and get the resource version at the moment of call, // and then use the resource version to watch. // It returns error if ListAndWatch didn't even try to initialize watch. func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {    klog.V(3).Infof("Listing and watching %v from %s", r.expectedType, r.name)    var resourceVersion string     // Explicitly set "0" as resource version - it's fine for the List()    // to be served from cache and potentially be delayed relative to    // etcd contents. Reflector framework will catch up via Watch() eventually.    options := metav1.ListOptions{ResourceVersion: "0"}     if err := func() error {      initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})      defer initTrace.LogIfLong(10 * time.Second)      var list runtime.Object      var err error      listCh := make(chan struct{}, 1)      panicCh := make(chan interface{}, 1)      go func() {          defer func() {            if r := recover(); r != nil {                panicCh <- r            }          }()          // Attempt to gather list in chunks, if supported by listerWatcher, if not, the first          // list request will return the full response.          pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {            return r.listerWatcher.List(opts)          }))          if r.WatchListPageSize != 0 {            pager.PageSize = r.WatchListPageSize          }          // Pager falls back to full list if paginated list calls fail due to an "Expired" error.          list, err = pager.List(context.Background(), options)          close(listCh)      }()      select {      case <-stopCh:          return nil      case r := <-panicCh:          panic(r)      case <-listCh:      }      if err != nil {          return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedType, err)      }      initTrace.Step("Objects listed")      listMetaInterface, err := meta.ListAccessor(list)      if err != nil {          return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)      }      resourceVersion = listMetaInterface.GetResourceVersion()      initTrace.Step("Resource version extracted")      items, err := meta.ExtractList(list)      if err != nil {          return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)      }      initTrace.Step("Objects extracted")      if err := r.syncWith(items, resourceVersion); err != nil {          return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)      }      initTrace.Step("SyncWith done")      r.setLastSyncResourceVersion(resourceVersion)      initTrace.Step("Resource version updated")      return nil    }(); err != nil {      return err    }     resyncerrc := make(chan error, 1)    cancelCh := make(chan struct{})    defer close(cancelCh)    go func() {      resyncCh, cleanup := r.resyncChan()      defer func() {          cleanup() // Call the last one written into cleanup      }()      for {          select {          case <-resyncCh:          case <-stopCh:            return          case <-cancelCh:            return          }          if r.ShouldResync == nil || r.ShouldResync() {            klog.V(4).Infof("%s: forcing resync", r.name)            if err := r.store.Resync(); err != nil {                resyncerrc <- err                return            }          }          cleanup()          resyncCh, cleanup = r.resyncChan()      }    }()     for {      // give the stopCh a chance to stop the loop, even in case of continue statements further down on errors      select {      case <-stopCh:          return nil      default:      }       timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))      options = metav1.ListOptions{          ResourceVersion: resourceVersion,          // We want to avoid situations of hanging watchers. Stop any wachers that do not          // receive any events within the timeout window.          TimeoutSeconds: &timeoutSeconds,          // To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.          // Reflector doesn't assume bookmarks are returned at all (if the server do not support          // watch bookmarks, it will ignore this field).          AllowWatchBookmarks: true,      }       w, err := r.listerWatcher.Watch(options)      if err != nil {          switch err {          case io.EOF:            // watch closed normally          case io.ErrUnexpectedEOF:            klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedType, err)          default:                        // 这里报出来的错误            utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedType, err))          }          // If this is "connection refused" error, it means that most likely apiserver is not responsive.          // It doesn't make sense to re-list all objects because most likely we will be able to restart          // watch where we ended.          // If that's the case wait and resend watch request.          // 关键点:正常应该执行continue,结果却执行到了return nil          if urlError, ok := err.(*url.Error); ok {            if opError, ok := urlError.Err.(*net.OpError); ok {                if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {                  time.Sleep(time.Second)                  continue                }            }          }          return nil      }       if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {          if err != errorStopRequested {            switch {            case apierrs.IsResourceExpired(err):                klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedType, err)            default:                klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedType, err)            }          }          return nil      }    } }

其中有一段注释写的很明白,如果是"connection refused" error, 意味着很可能是apiserver无响应了,这个时候需要做的是等一段时间然后从结束的resourceVersion开始重新watch而不是重新执行relist。看注释的话其实是没有问题的,符合正常逻辑,即断开了之后重新watch而不用同步全量数据,但为什么还会收到全量的upade事件呢,原因就在下面的判断逻辑

if urlError, ok := err.(*url.Error); ok {    if opError, ok := urlError.Err.(*net.OpError); ok {      if errno, ok := opError.Err.(syscall.Errno); ok && errno == syscall.ECONNREFUSED {        time.Sleep(time.Second)        continue      }    }}

这里不太容易看出来有什么问题,猛一看肯定觉得没问题,确实是connection refused error啊,其实不是的。实际返回的错误是

&url.Error{Err: &net.OpError{Err: &os.SyscallError{Err: &syscall.ECONNREFUSED}}}

而上面代码判断的是如下的error

&url.Error{Err: &net.OpError{Err: &syscall.ECONNREFUSED}}

正因为判断失效,没有执行到continue而是直接return nil,这就会导致整个函数退出,然后外层有一个循环开始重新调用ListAndWatch,然后开始list,开始watch,全量数据就是这么来的。正常我们是期望直接执行到continue,跳过list直接watch的,就不会有全量同步的问题了。

总结

至此,已经清楚了具体的原因,ListAndWatch的修改很简单,已经给官方提了 pull request:https://github.com/kubernetes/kubernetes/pull/81634 修复这个问题,并合入1.16中,自1.16之后功能正常。


K8S 进阶训练营


 点击屏末  | 即刻学习
浏览 16
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报