kubernetes client-go源码阅读10之Informer

邓胖

共 22550字,需浏览 46分钟

 · 2023-08-23

只要读k8s源代码一定会读informer的代码的,因为informer相当优秀,大多数分布式项目(比如OpenStack)在解决组件间通信的问题时都会选择如kafka,rabbitmaq之类的消息队列,但是k8s不走寻常路,选择了自己解决,解决的方案是informer。

假设我们没有informer,那么我们应该如何从api server获取数据?

一般而言,我们有两种方式, 一是全量获取,二是增量获取,两种都各有优缺点,前者优点是,每次可以获取全量的最新状态, 逻辑简单,但是缺点很明显,如果请求频次过于频繁,就会有比较大的性能消耗,  如果频次过低就不够实时,但是依旧有比较大的性能消耗,想象一个100节点的集群,1000个deployment,  1000个ReplicaSet,  5000千个pod,  加个每个对象都只占5k,  就接近50MB, 这显然会占用比较多的带宽,这是让人难以接受的,而且数据的时效性不够高也是难以接受的,所以对于一个中大型集群而言,不能使用这种方式。

第二种方式是增量获取更新,这种方式的优点是时效性高占用资源低,但是相较于第一种方式而言,实现起来稍显复杂,复杂度在于两点,一是我们需要有健壮的容错机制,比如出错怎么办? 如果跳过可能导致状态不一致, 比如漏掉一个更新的请求, 那么对应的资源一直得不到正确的处理, 所以我们需要一种重试机制, 二是, 我们需要缓存全量的数据用于快速的检索,  比如定时轮训的检查资源,但我们不可能总是等收到增量更新才开始业务逻辑,所以增量更新的逻辑比较复杂,  并且增量更新不能单独存在, 因为我们需要全量的资源, 所以需要配合第一种方式。

那么怎么平衡这两种获取资源的方式呢?  k8s的选择是,**我全都要!!!**。

e7c2ec1fecd7e3a27b99d143ef9a2bf3.webp

我全要图片

快速入门

一般来说informer会跟workque, controller在一起,这点从k8s的源代码可以很明显的看到,不过为了简单起见,这里只看informer的部分。

      
      package main

import (
 "context"
 "flag"
 "fmt"
 "os"
 "os/signal"
 "path/filepath"
 "syscall"
 "time"

 "k8s.io/client-go/util/homedir"
 "k8s.io/klog/v2"

 v1 "k8s.io/api/core/v1"
 "k8s.io/apimachinery/pkg/fields"
 "k8s.io/client-go/kubernetes"
 "k8s.io/client-go/tools/cache"
 "k8s.io/client-go/tools/clientcmd"
)

func main() {
 var kubeconfig *string
 var master string

 if home := homedir.HomeDir(); home != "" {
  kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube""config"), "(optional) absolute path to the kubeconfig file")
 } else {
  kubeconfig = flag.String("kubeconfig""""absolute path to the kubeconfig file")
 }
 flag.StringVar(&master, "master""""master url")
 flag.Parse()

 config, err := clientcmd.BuildConfigFromFlags(master, *kubeconfig)
 if err != nil {
  klog.Fatal(err)
 }

 clientset, err := kubernetes.NewForConfig(config)
 if err != nil {
  klog.Fatal(err)
 }

 // 1.
 podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

 // 2.
 _, informer := cache.NewInformer(podListWatcher, &v1.Pod{}, 60*time.Second, cache.ResourceEventHandlerFuncs{
  AddFunc: func(obj interface{}) {
        // 3.
  AddFunc: func(obj interface{}) {
   key, err := cache.MetaNamespaceKeyFunc(obj)
   if err == nil {
    fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Add: ", key)
   }
  },
  UpdateFunc: func(old interface{}, new interface{}) {
   key, err := cache.MetaNamespaceKeyFunc(new)
   if err == nil {
    fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Update: ", key)
   }
  },
  DeleteFunc: func(obj interface{}) {
            // 4.
   key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
   if err == nil {
    fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Delete: ", key)
   }
  },
 })
    // 5. 
 ctx, cancel := context.WithCancel(context.Background())
 defer cancel()
 ch := make(chan os.Signal, 1)
 signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
 go func() {
  <-ch
  klog.Info("Received termination, signaling shutdown")
  cancel()
 }()
    // 6.
 informer.Run(ctx.Done())
}

输出如下:

      
      2023-06-03 14:19:21 Add:  default/example-fcsjwbzzf2
2023-06-03 14:20:21 Update:  default/example-fcsjwbzzf2
2023-06-03 14:21:21 Update:  default/example-fcsjwbzzf2

注意: 每分钟以Update的形式再次调用UpdateFunc

实名吐槽Golang的时间格式化!!!

代码分解如下:

  1. 创建ListWatch对象,用于获取资源最新列表及后续更新
  2. 创建informer对象,传入必要的参数
  3. 注册各种回调函数, 如AddFunc等
  4. 删除的对象和其他对象不同,所以需要不同的方法来获取key
  5. 设置退出信号量,k8s的惯用操作了
  6. 启动informer

通过上面的代码可以知道,创建informer有两件比较重要的事情,一是创建ListWatch,二是注册回调函数。

ListWatch

ListWatch就如名字指明的那样,List,Watch,前者是拉取指定资源的资源列表,比如default命名空间下的所有Pod资源,Watch是在前者拉取完成之后开始监听之后所有的资源变化(前者会得到一个版本号,watch可以借助这个版本号,只获取版本号之后的资源),比如新增,更新,删除等变化。

      
      podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())

func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
    // 1.
 optionsModifier := func(options *metav1.ListOptions) {
  options.FieldSelector = fieldSelector.String()
 }
 return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}

func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
    // 2.
 listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
  optionsModifier(&options)
  return c.Get().
   Namespace(namespace).
   Resource(resource).
   VersionedParams(&options, metav1.ParameterCodec).
   Do(context.TODO()).
   Get()
 }
    // 3.
 watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
  options.Watch = true
  optionsModifier(&options)
  return c.Get().
   Namespace(namespace).
   Resource(resource).
   VersionedParams(&options, metav1.ParameterCodec).
   Watch(context.TODO())
 }
 return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

代码分解如下:

  1. 设置Options, Options以函数的形式传入也是k8s一个比较常用的模式了。
  2. 简单的在静态客户端的Get方法上包装一层函数
  3. 简单的在静态客户端的Watch方法上包装一层函数

可以看到ListWatch的内部构造并不复杂,仅仅是将GetWatch方法组合起来而已。

Informer

因为本文主要分析informer,所以会略过其中Store的部分,我们暂且将其作为一个存储的黑盒子即可,以后有文章再详细说明。

      
      _, informer := cache.NewInformer(podListWatcher, &v1.Pod{}, 60*time.Second, cache.ResourceEventHandlerFuncs{
    // 略过代码部分
}

func NewInformer(
 lw ListerWatcher,
 objType runtime.Object,
 resyncPeriod time.Duration,
 h ResourceEventHandler,
)
 (Store, Controller)
 {
 // 1.
 clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
 return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}
                                 
func newInformer(
 lw ListerWatcher,
 objType runtime.Object,
 resyncPeriod time.Duration,
 h ResourceEventHandler,
 clientState Store,
)
 Controller
 {
 // 2.
 fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
  KnownObjects:          clientState,
  EmitDeltaTypeReplaced: true,
 })

 cfg := &Config{
  Queue:            fifo,
  ListerWatcher:    lw,
  ObjectType:       objType,
  FullResyncPeriod: resyncPeriod,
  RetryOnError:     false,
  // 3.
  Process: func(obj interface{}) error {
   // from oldest to newest
   for _, d := range obj.(Deltas) {
    switch d.Type {
    case Sync, Replaced, Added, Updated:
     if old, exists, err := clientState.Get(d.Object); err == nil && exists {
      if err := clientState.Update(d.Object); err != nil {
       return err
      }
      h.OnUpdate(old, d.Object)
     } else {
      if err := clientState.Add(d.Object); err != nil {
       return err
      }
      h.OnAdd(d.Object)
     }
    case Deleted:
     if err := clientState.Delete(d.Object); err != nil {
      return err
     }
     h.OnDelete(d.Object)
    }
   }
   return nil
  },
 }
 return New(cfg)
}
                                 
func New(c *Config) Controller {
 ctlr := &controller{
  config: *c,
  clock:  &clock.RealClock{},
 }
 return ctlr
}

代码分解如下:

  1. 创建一个Store, 它用来存储informer获取到的资源
  2. 将Store再包装一层(k8s的传统操作了),提供先入先出(fifo)的功能
  3. informer处理的主函数,根据对象类型调用对应的回调函数,以及将对象更新到绑定的Store

这里有一个值得注意的点,informer是一个符合Controller接口的对象,阅读过k8s源代码或者写过operator的对controller应该不会陌生,这是k8s比较重要的对象了,或者说模式。

总的来说,informer的初始化过程还是比较清晰的,主要分为两步,创建队列(fifo),配置处理逻辑(Process),既然初始化不复杂,那么复杂的就是Run方法。

Run

那么看看informer怎么运行的吧

      
      informer.Run(ctx.Done())

func (c *controller) Run(stopCh <-chan struct{}) {
 defer utilruntime.HandleCrash()
 go func() {
  <-stopCh
  c.config.Queue.Close()
 }()
    // 1.
 r := NewReflector(
  c.config.ListerWatcher,
  c.config.ObjectType,
  c.config.Queue,
  c.config.FullResyncPeriod,
 )
    // 2.
 r.ShouldResync = c.config.ShouldResync
 r.WatchListPageSize = c.config.WatchListPageSize
 r.clock = c.clock
 if c.config.WatchErrorHandler != nil {
  r.watchErrorHandler = c.config.WatchErrorHandler
 }
 c.reflectorMutex.Lock()
 c.reflector = r
 c.reflectorMutex.Unlock()
 var wg wait.Group
 // 3.
 wg.StartWithChannel(stopCh, r.Run)
    // 4.
 wait.Until(c.processLoop, time.Second, stopCh)
 wg.Wait()
}

代码分解如下:

  1. 创建Reflector, reflector负责和apiserver通信,不断的将数据同步给informer
  2. 配置Reflector的各项参数
  3. 启动Reflector
  4. 启动informer的主循环

由于processLoop比较简单,我们先看看它的源代码。

      
      func (c *controller) processLoop() {
 for {
  obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
  if err != nil {
   if err == ErrFIFOClosed {
    return
   }
   if c.config.RetryOnError {
    // This is the safe way to re-enqueue.
    c.config.Queue.AddIfNotPresent(obj)
   }
  }
 }
}

代码很简单,应该不需要特别的说明,就是传入之前的Process方法用于处理队列传入的各个对象。

Reflector

Reflector的初始化并不复杂,代码如下

      
      r := NewReflector(
  c.config.ListerWatcher,
  c.config.ObjectType,
  c.config.Queue,
  c.config.FullResyncPeriod,
)

func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
 return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}

func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
 // 省略其他代码
 r.setExpectedType(expectedType)
 return r
}

func (r *Reflector) setExpectedType(expectedType interface{}) {
 r.expectedType = reflect.TypeOf(expectedType)
 if obj, ok := expectedType.(*unstructured.Unstructured); ok {
  gvk := obj.GroupVersionKind()
  r.expectedGVK = &gvk
  r.expectedTypeName = gvk.String()
 }
}

上面的代码唯一值得提的是setExpectedType,k8s的对象总是要知道gvk的。

然后就是Reflector的运行逻辑

      
      
func (r *Reflector) Run(stopCh <-chan struct{}) {
    // 1.
 wait.BackoffUntil(func() {
        // 2.
  if err := r.ListAndWatch(stopCh); err != nil {
   r.watchErrorHandler(r, err)
  }
 }, r.backoffManager, true, stopCh)
}

代码分解如下:

  1. client-go提供的重试帮助函数,只要没有收到终止信号就会不断的重试传入的方法
  2. List And Watch, 获取列表并监听资源更新

重头戏就是ListAndWatch了。

      
      func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
 var resourceVersion string
    // 1.
 options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}

    // 2.
 if err := func() error {
  var list runtime.Object
  var paginatedResult bool
  var err error
  listCh := make(chan struct{}, 1)
  panicCh := make(chan interface{}, 1)
  go func() {
            // 3.
   pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
    return r.listerWatcher.List(opts)
   }))
            
   // 4.
   list, paginatedResult, err = pager.List(context.Background(), options)
   close(listCh)
  }()
  // 5.
  items, err := meta.ExtractList(list)
        // 6.
  if err := r.syncWith(items, resourceVersion); err != nil {
   return fmt.Errorf("unable to sync list result: %v", err)
  }
        // 7.
  r.setLastSyncResourceVersion(resourceVersion)
  return nil
 }(); err != nil {
  return err
 }

 resyncerrc := make(chan error, 1)
 cancelCh := make(chan struct{})
 defer close(cancelCh)
    // 8.
 go func() {
  resyncCh, cleanup := r.resyncChan()
        if r.ShouldResync == nil || r.ShouldResync() {
            klog.V(4).Infof("%s: forcing resync", r.name)
            if err := r.store.Resync(); err != nil {
                resyncerrc <- err
                return
            }
        }
        cleanup()
  resyncCh, cleanup = r.resyncChan()
  }
 }()
 // 9.
 for {
        // 10.
  w, err := r.listerWatcher.Watch(options)
  // 11.
  if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
   return nil
  }
 }
}

func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
 found := make([]interface{}, 0len(items))
 for _, item := range items {
  found = append(found, item)
 }
 return r.store.Replace(found, resourceVersion)
}

代码分解如下:

  1. 设置资源版本号ResourceVersion, k8s提供了一种以版本号过滤的资源的方式,如果是首次,那么是0,如果因为网络等原因重试,就可以增量的获取遗落的资源列表,而不需再次全量的获取一遍
  2. 将List的逻辑放在一个匿名函数中统一处理错误,常见操作了。
  3. 构建一个分页器,分批获取资源列表。
  4. 开始获取,这里的List其实就是调用之前传入的lw.List
  5. 获取列表,这一步会检查列表对象是否合法以及做一定的转换。
  6. 将数据同步到Store,使用它的Replace方法,这可以在上面源代码的最后看到具体操作。
  7. 设置ResourceVersion,如果后续出错,就可以从这个资源版本开始了
  8. resync, 就是将Store里面的数据以Update的事件形式再次传入informer,会触发UpdateFunc回调函数。
  9. 监听的循环
  10. 通过之前传入的lw的Watch方法,获得watch.Interface,这个接口会不断的给出变更对象
  11. 处理上一步传来的事件。

最后就是Reflector的核心方法了。

      
      func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
loop:
 for {
  select {
  case <-stopCh:
   return errorStopRequested
  case err := <-errc:
   return err
        // 1.
  case event, ok := <-w.ResultChan():
   if !ok {
    break loop
   }
            // 2.
   if event.Type == watch.Error {
    return apierrors.FromObject(event.Object)
   }
   if r.expectedType != nil {
    if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
     utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
     continue
    }
   }
   if r.expectedGVK != nil {
    if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
     utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
     continue
    }
   }
            // 3.
   meta, err := meta.Accessor(event.Object)
   newResourceVersion := meta.GetResourceVersion()
   switch event.Type {
   case watch.Added:
    err := r.store.Add(event.Object)
   case watch.Modified:
    err := r.store.Update(event.Object)
   case watch.Deleted:
    err := r.store.Delete(event.Object)
   case watch.Bookmark:
   default:
    utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
   }
   *resourceVersion = newResourceVersion
   r.setLastSyncResourceVersion(newResourceVersion)
   if rvu, ok := r.store.(ResourceVersionUpdater); ok {
    rvu.UpdateResourceVersion(newResourceVersion)
   }
   eventCount++
  }
 }
 return nil
}

代码分解如下:

  1. 获取监听到的事件
  2. 判断事件是否正常,是否符合预期的GVK等
  3. 不同的事件以不同方法更新,这样可以触发不同的回调函数

总的来说,reflector做的事情就是将数据更新到Store里面,而Informer会不断的从Store里面读取数据,当读到数据后就调用对应的回调函数。

总结

Informer是k8s里面非常重要的数据同步机制,理解了Informer就可以很容易找到k8s相关组件的主要业务逻辑了,可以想象的到,业务逻辑一定注册在回调函数中,不过真实的代码要多了一层抽象,因为k8s源代码里回调函数逻辑一般是判断一下就扔进workqueue了。

client-go的代码部分差不多结束了,后面阅读k8s的源代码。

浏览 38
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报