kubernetes client-go源码阅读10之Informer
只要读k8s源代码一定会读informer的代码的,因为informer相当优秀,大多数分布式项目(比如OpenStack)在解决组件间通信的问题时都会选择如kafka,rabbitmaq之类的消息队列,但是k8s不走寻常路,选择了自己解决,解决的方案是informer。
假设我们没有informer,那么我们应该如何从api server获取数据?
一般而言,我们有两种方式, 一是全量获取,二是增量获取,两种都各有优缺点,前者优点是,每次可以获取全量的最新状态, 逻辑简单,但是缺点很明显,如果请求频次过于频繁,就会有比较大的性能消耗, 如果频次过低就不够实时,但是依旧有比较大的性能消耗,想象一个100节点的集群,1000个deployment, 1000个ReplicaSet, 5000千个pod, 加个每个对象都只占5k, 就接近50MB, 这显然会占用比较多的带宽,这是让人难以接受的,而且数据的时效性不够高也是难以接受的,所以对于一个中大型集群而言,不能使用这种方式。
第二种方式是增量获取更新,这种方式的优点是时效性高占用资源低,但是相较于第一种方式而言,实现起来稍显复杂,复杂度在于两点,一是我们需要有健壮的容错机制,比如出错怎么办? 如果跳过可能导致状态不一致, 比如漏掉一个更新的请求, 那么对应的资源一直得不到正确的处理, 所以我们需要一种重试机制, 二是, 我们需要缓存全量的数据用于快速的检索, 比如定时轮训的检查资源,但我们不可能总是等收到增量更新才开始业务逻辑,所以增量更新的逻辑比较复杂, 并且增量更新不能单独存在, 因为我们需要全量的资源, 所以需要配合第一种方式。
那么怎么平衡这两种获取资源的方式呢? k8s的选择是,**我全都要!!!**。
我全要图片
快速入门
一般来说informer会跟workque, controller在一起,这点从k8s的源代码可以很明显的看到,不过为了简单起见,这里只看informer的部分。
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"k8s.io/client-go/util/homedir"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
var kubeconfig *string
var master string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.StringVar(&master, "master", "", "master url")
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags(master, *kubeconfig)
if err != nil {
klog.Fatal(err)
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatal(err)
}
// 1.
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
// 2.
_, informer := cache.NewInformer(podListWatcher, &v1.Pod{}, 60*time.Second, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// 3.
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Add: ", key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Update: ", key)
}
},
DeleteFunc: func(obj interface{}) {
// 4.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
fmt.Println(time.Now().Format("2006-01-02 15:04:05"), "Delete: ", key)
}
},
})
// 5.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)
go func() {
<-ch
klog.Info("Received termination, signaling shutdown")
cancel()
}()
// 6.
informer.Run(ctx.Done())
}
输出如下:
2023-06-03 14:19:21 Add: default/example-fcsjwbzzf2
2023-06-03 14:20:21 Update: default/example-fcsjwbzzf2
2023-06-03 14:21:21 Update: default/example-fcsjwbzzf2
注意: 每分钟以Update的形式再次调用UpdateFunc
实名吐槽Golang的时间格式化!!!
代码分解如下:
- 创建
ListWatch
对象,用于获取资源最新列表及后续更新 - 创建informer对象,传入必要的参数
- 注册各种回调函数, 如AddFunc等
- 删除的对象和其他对象不同,所以需要不同的方法来获取key
- 设置退出信号量,k8s的惯用操作了
- 启动informer
通过上面的代码可以知道,创建informer有两件比较重要的事情,一是创建ListWatch
,二是注册回调函数。
ListWatch
ListWatch就如名字指明的那样,List,Watch,前者是拉取指定资源的资源列表,比如default命名空间下的所有Pod资源,Watch是在前者拉取完成之后开始监听之后所有的资源变化(前者会得到一个版本号,watch可以借助这个版本号,只获取版本号之后的资源),比如新增,更新,删除等变化。
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
func NewListWatchFromClient(c Getter, resource string, namespace string, fieldSelector fields.Selector) *ListWatch {
// 1.
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fieldSelector.String()
}
return NewFilteredListWatchFromClient(c, resource, namespace, optionsModifier)
}
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
// 2.
listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Do(context.TODO()).
Get()
}
// 3.
watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
options.Watch = true
optionsModifier(&options)
return c.Get().
Namespace(namespace).
Resource(resource).
VersionedParams(&options, metav1.ParameterCodec).
Watch(context.TODO())
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
代码分解如下:
- 设置Options, Options以函数的形式传入也是k8s一个比较常用的模式了。
- 简单的在静态客户端的
Get
方法上包装一层函数 - 简单的在静态客户端的
Watch
方法上包装一层函数
可以看到ListWatch的内部构造并不复杂,仅仅是将Get
和Watch
方法组合起来而已。
Informer
因为本文主要分析informer,所以会略过其中Store
的部分,我们暂且将其作为一个存储的黑盒子即可,以后有文章再详细说明。
_, informer := cache.NewInformer(podListWatcher, &v1.Pod{}, 60*time.Second, cache.ResourceEventHandlerFuncs{
// 略过代码部分
}
func NewInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
) (Store, Controller) {
// 1.
clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
) Controller {
// 2.
fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
KnownObjects: clientState,
EmitDeltaTypeReplaced: true,
})
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
// 3.
Process: func(obj interface{}) error {
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Replaced, Added, Updated:
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return New(cfg)
}
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
代码分解如下:
- 创建一个Store, 它用来存储informer获取到的资源
- 将Store再包装一层(k8s的传统操作了),提供先入先出(fifo)的功能
- informer处理的主函数,根据对象类型调用对应的回调函数,以及将对象更新到绑定的Store
这里有一个值得注意的点,informer是一个符合Controller接口的对象,阅读过k8s源代码或者写过operator的对controller应该不会陌生,这是k8s比较重要的对象了,或者说模式。
总的来说,informer的初始化过程还是比较清晰的,主要分为两步,创建队列(fifo),配置处理逻辑(Process),既然初始化不复杂,那么复杂的就是Run
方法。
Run
那么看看informer怎么运行的吧
informer.Run(ctx.Done())
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 1.
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
// 2.
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.clock = c.clock
if c.config.WatchErrorHandler != nil {
r.watchErrorHandler = c.config.WatchErrorHandler
}
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
// 3.
wg.StartWithChannel(stopCh, r.Run)
// 4.
wait.Until(c.processLoop, time.Second, stopCh)
wg.Wait()
}
代码分解如下:
- 创建Reflector, reflector负责和apiserver通信,不断的将数据同步给informer
- 配置Reflector的各项参数
- 启动Reflector
- 启动informer的主循环
由于processLoop比较简单,我们先看看它的源代码。
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
代码很简单,应该不需要特别的说明,就是传入之前的Process
方法用于处理队列传入的各个对象。
Reflector
Reflector的初始化并不复杂,代码如下
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
// 省略其他代码
r.setExpectedType(expectedType)
return r
}
func (r *Reflector) setExpectedType(expectedType interface{}) {
r.expectedType = reflect.TypeOf(expectedType)
if obj, ok := expectedType.(*unstructured.Unstructured); ok {
gvk := obj.GroupVersionKind()
r.expectedGVK = &gvk
r.expectedTypeName = gvk.String()
}
}
上面的代码唯一值得提的是setExpectedType
,k8s的对象总是要知道gvk的。
然后就是Reflector的运行逻辑
func (r *Reflector) Run(stopCh <-chan struct{}) {
// 1.
wait.BackoffUntil(func() {
// 2.
if err := r.ListAndWatch(stopCh); err != nil {
r.watchErrorHandler(r, err)
}
}, r.backoffManager, true, stopCh)
}
代码分解如下:
- client-go提供的重试帮助函数,只要没有收到终止信号就会不断的重试传入的方法
- List And Watch, 获取列表并监听资源更新
重头戏就是ListAndWatch了。
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
var resourceVersion string
// 1.
options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
// 2.
if err := func() error {
var list runtime.Object
var paginatedResult bool
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
// 3.
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
// 4.
list, paginatedResult, err = pager.List(context.Background(), options)
close(listCh)
}()
// 5.
items, err := meta.ExtractList(list)
// 6.
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("unable to sync list result: %v", err)
}
// 7.
r.setLastSyncResourceVersion(resourceVersion)
return nil
}(); err != nil {
return err
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
// 8.
go func() {
resyncCh, cleanup := r.resyncChan()
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
// 9.
for {
// 10.
w, err := r.listerWatcher.Watch(options)
// 11.
if err := r.watchHandler(start, w, &resourceVersion, resyncerrc, stopCh); err != nil {
return nil
}
}
}
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
代码分解如下:
- 设置资源版本号
ResourceVersion
, k8s提供了一种以版本号过滤的资源的方式,如果是首次,那么是0,如果因为网络等原因重试,就可以增量的获取遗落的资源列表,而不需再次全量的获取一遍 - 将List的逻辑放在一个匿名函数中统一处理错误,常见操作了。
- 构建一个分页器,分批获取资源列表。
- 开始获取,这里的List其实就是调用之前传入的
lw.List
。 - 获取列表,这一步会检查列表对象是否合法以及做一定的转换。
- 将数据同步到Store,使用它的
Replace
方法,这可以在上面源代码的最后看到具体操作。 - 设置
ResourceVersion
,如果后续出错,就可以从这个资源版本开始了 - resync, 就是将Store里面的数据以Update的事件形式再次传入informer,会触发UpdateFunc回调函数。
- 监听的循环
- 通过之前传入的lw的Watch方法,获得
watch.Interface
,这个接口会不断的给出变更对象 - 处理上一步传来的事件。
最后就是Reflector的核心方法了。
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error {
loop:
for {
select {
case <-stopCh:
return errorStopRequested
case err := <-errc:
return err
// 1.
case event, ok := <-w.ResultChan():
if !ok {
break loop
}
// 2.
if event.Type == watch.Error {
return apierrors.FromObject(event.Object)
}
if r.expectedType != nil {
if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a))
continue
}
}
if r.expectedGVK != nil {
if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a {
utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a))
continue
}
}
// 3.
meta, err := meta.Accessor(event.Object)
newResourceVersion := meta.GetResourceVersion()
switch event.Type {
case watch.Added:
err := r.store.Add(event.Object)
case watch.Modified:
err := r.store.Update(event.Object)
case watch.Deleted:
err := r.store.Delete(event.Object)
case watch.Bookmark:
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event))
}
*resourceVersion = newResourceVersion
r.setLastSyncResourceVersion(newResourceVersion)
if rvu, ok := r.store.(ResourceVersionUpdater); ok {
rvu.UpdateResourceVersion(newResourceVersion)
}
eventCount++
}
}
return nil
}
代码分解如下:
- 获取监听到的事件
- 判断事件是否正常,是否符合预期的GVK等
- 不同的事件以不同方法更新,这样可以触发不同的回调函数
总的来说,reflector做的事情就是将数据更新到Store里面,而Informer会不断的从Store里面读取数据,当读到数据后就调用对应的回调函数。
总结
Informer是k8s里面非常重要的数据同步机制,理解了Informer就可以很容易找到k8s相关组件的主要业务逻辑了,可以想象的到,业务逻辑一定注册在回调函数中,不过真实的代码要多了一层抽象,因为k8s源代码里回调函数逻辑一般是判断一下就扔进workqueue了。
client-go的代码部分差不多结束了,后面阅读k8s的源代码。