KubeAPIServer 的存储接口实现

k8s技术圈

共 15655字,需浏览 32分钟

 · 2023-06-21

上回讲到 KubeAPIServer 在核心路由注册过程中,主要分成两步,第一步创建 RESTStorage 将后端存储与资源进行绑定,第二步才进行路由注册。

路由注册已经讲了,回过头来看看 RESTStorage 的创建:

// pkg/controlplane/instance.go

func (m *Instance) InstallLegacyAPI(c *completedConfig, restOptionsGetter generic.RESTOptionsGetter) error {
 // 主要是为 LegacyAPI 中各个资源创建 RESTStorage
 // 并将各种资源和对应的后端存储(etcd)的操作绑定
 // 传递 generic.RESTOptionsGetter 类型参数
 // generic.RESTOptionsGetter 是存储接口的实现(先有个印象,稍后再看)
 legacyRESTStorage, apiGroupInfo, err := legacyRESTStorageProvider.NewLegacyRESTStorage(c.ExtraConfig.APIResourceConfigSource, restOptionsGetter)
 if err != nil {
  return fmt.Errorf("error building core storage: %v", err)
 }

 // ...
 // 路由注册
 return nil
}

跳到 NewLegacyRESTStorage 方法,先为每个资源的 RESTStorage 初始化:

// pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
 // ...
 restStorage := LegacyRESTStorage{}

 // 每个资源的初始化都传递了 generic.RESTOptionsGetter 类型参数,即存储接口的实现

 // PodTemplate 资源的 RESTStorage 初始化
 podTemplateStorage, err := podtemplatestore.NewREST(restOptionsGetter)

 // Event 资源的 RESTStorage 初始化
 eventStorage, err := eventstore.NewREST(restOptionsGetter, uint64(c.EventTTL.Seconds()))

 // LimitRange 资源的 RESTStorage 初始化
 limitRangeStorage, err := limitrangestore.NewREST(restOptionsGetter)

 // ResourceQuota 资源的 RESTStorage 初始化
 resourceQuotaStorage, resourceQuotaStatusStorage, err := resourcequotastore.NewREST(restOptionsGetter)

 // Secret 资源的 RESTStorage 初始化
 secretStorage, err := secretstore.NewREST(restOptionsGetter)

 // PersistentVolume 资源的 RESTStorage 初始化
 persistentVolumeStorage, persistentVolumeStatusStorage, err := pvstore.NewREST(restOptionsGetter)

 // PersistentVolumeClaim 资源的 RESTStorage 初始化
 persistentVolumeClaimStorage, persistentVolumeClaimStatusStorage, err := pvcstore.NewREST(restOptionsGetter)

 // ConfigMap 资源的 RESTStorage 初始化
 configMapStorage, err := configmapstore.NewREST(restOptionsGetter)

 // Namespace 资源的 RESTStorage 初始化
 namespaceStorage, namespaceStatusStorage, namespaceFinalizeStorage, err := namespacestore.NewREST(restOptionsGetter)

 // Endpoints 资源的 RESTStorage 初始化
 endpointsStorage, err := endpointsstore.NewREST(restOptionsGetter)

 // Node 资源的 RESTStorage 初始化
 nodeStorage, err := nodestore.NewStorage(restOptionsGetter, c.KubeletClientConfig, c.ProxyTransport)

 // Pod 资源的 RESTStorage 初始化
 podStorage, err := podstore.NewStorage(
  restOptionsGetter,
  nodeStorage.KubeletConnectionInfo,
  c.ProxyTransport,
  podDisruptionClient,
 )

 // ServiceAccount 资源的 RESTStorage 初始化
 var serviceAccountStorage *serviceaccountstore.REST
 if c.ServiceAccountIssuer != nil {
  serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, c.ServiceAccountIssuer, c.APIAudiences, c.ServiceAccountMaxExpiration, podStorage.Pod.Store, secretStorage.Store, c.ExtendExpiration)
 } else {
  serviceAccountStorage, err = serviceaccountstore.NewREST(restOptionsGetter, nilnil0nilnilfalse)
 }


 // ......
}

再将资源和对应的 RESTStorage 进行绑定:

// pkg/registry/core/rest/storage_core.go
func (c LegacyRESTStorageProvider) NewLegacyRESTStorage(apiResourceConfigSource serverstorage.APIResourceConfigSource, restOptionsGetter generic.RESTOptionsGetter) (LegacyRESTStorage, genericapiserver.APIGroupInfo, error) {
 // ......

 // 利用 map 来保存资源的 http path 和对应的 RESTStorage
 storage := map[string]rest.Storage{}
 if resource := "pods"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = podStorage.Pod

  // 对于 Pod 资源,有很多细分的 path 及其 RESTStorage
  storage[resource+"/attach"] = podStorage.Attach
  storage[resource+"/status"] = podStorage.Status
  storage[resource+"/log"] = podStorage.Log
  storage[resource+"/exec"] = podStorage.Exec
  storage[resource+"/portforward"] = podStorage.PortForward
  storage[resource+"/proxy"] = podStorage.Proxy
  storage[resource+"/binding"] = podStorage.Binding
  if podStorage.Eviction != nil {
   storage[resource+"/eviction"] = podStorage.Eviction
  }
  storage[resource+"/ephemeralcontainers"] = podStorage.EphemeralContainers

 }
 if resource := "bindings"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = podStorage.LegacyBinding
 }

 if resource := "podtemplates"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = podTemplateStorage
 }

 if resource := "replicationcontrollers"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = controllerStorage.Controller
  storage[resource+"/status"] = controllerStorage.Status
  if legacyscheme.Scheme.IsVersionRegistered(schema.GroupVersion{Group: "autoscaling", Version: "v1"}) {
   storage[resource+"/scale"] = controllerStorage.Scale
  }
 }

 if resource := "services"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = serviceRESTStorage
  storage[resource+"/proxy"] = serviceRESTProxy
  storage[resource+"/status"] = serviceStatusStorage
 }

 if resource := "endpoints"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = endpointsStorage
 }

 if resource := "nodes"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = nodeStorage.Node
  storage[resource+"/proxy"] = nodeStorage.Proxy
  storage[resource+"/status"] = nodeStorage.Status
 }

 if resource := "events"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = eventStorage
 }

 if resource := "limitranges"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = limitRangeStorage
 }

 if resource := "resourcequotas"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = resourceQuotaStorage
  storage[resource+"/status"] = resourceQuotaStatusStorage
 }

 if resource := "namespaces"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = namespaceStorage
  storage[resource+"/status"] = namespaceStatusStorage
  storage[resource+"/finalize"] = namespaceFinalizeStorage
 }

 if resource := "secrets"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = secretStorage
 }

 if resource := "serviceaccounts"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = serviceAccountStorage
  if serviceAccountStorage.Token != nil {
   storage[resource+"/token"] = serviceAccountStorage.Token
  }
 }

 if resource := "persistentvolumes"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = persistentVolumeStorage
  storage[resource+"/status"] = persistentVolumeStatusStorage
 }

 if resource := "persistentvolumeclaims"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = persistentVolumeClaimStorage
  storage[resource+"/status"] = persistentVolumeClaimStatusStorage
 }

 if resource := "configmaps"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = configMapStorage
 }

 if resource := "componentstatuses"; apiResourceConfigSource.ResourceEnabled(corev1.SchemeGroupVersion.WithResource(resource)) {
  storage[resource] = componentstatus.NewStorage(componentStatusStorage{c.StorageFactory}.serversToValidate)
 }

 if len(storage) > 0 {
  apiGroupInfo.VersionedResourcesStorageMap["v1"] = storage
 }

 return restStorage, apiGroupInfo, nil
}

存储了所有资源路径和对应 RESTStorage 的 map 结构最后会保存在 apiGroupInfo 中并返回给后续的路由注册使用(见上回)。这样后续的 handler 就可以通过对应的 RESTStorage 来操作 etcd 存取数据,同时这种做法还可以划分每个资源对 Storage 操作的权限,保障数据的安全性。

继续以 Pod 资源的 RESTStorage 初始化为例,来到 podstore.NewStorage 方法:

// pkg/registry/core/pod/storage/storage.go
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) (PodStorage, error) {

 // 创建 Store 实例
 store := &genericregistry.Store{
  NewFunc:                   func() runtime.Object { return &api.Pod{} },
  NewListFunc:               func() runtime.Object { return &api.PodList{} },
  PredicateFunc:             registrypod.MatchPod,
  DefaultQualifiedResource:  api.Resource("pods"),
  SingularQualifiedResource: api.Resource("pod"),

  CreateStrategy:      registrypod.Strategy,
  UpdateStrategy:      registrypod.Strategy,
  DeleteStrategy:      registrypod.Strategy,
  ResetFieldsStrategy: registrypod.Strategy,
  ReturnDeletedObject: true,

  TableConvertor: printerstorage.TableConvertor{TableGenerator: printers.NewTableGenerator().With(printersinternal.AddHandlers)},
 }
 // Store 配置
 options := &generic.StoreOptions{
  // RESTOptions 就是传入的存储接口的实现
  RESTOptions: optsGetter,
  AttrFunc:    registrypod.GetAttrs,
  TriggerFunc: map[string]storage.IndexerFunc{"spec.nodeName": registrypod.NodeNameTriggerFunc},
  Indexers:    registrypod.Indexers(),
 }
 // 初始化 Store 实例,传递 options 参数
 if err := store.CompleteWithOptions(options); err != nil {
  return PodStorage{}, err
 }

 // 给 Pod 资源的路径绑定上 Store 实例,返回 PodStorage 对象
 statusStore := *store
 statusStore.UpdateStrategy = registrypod.StatusStrategy
 statusStore.ResetFieldsStrategy = registrypod.StatusStrategy
 ephemeralContainersStore := *store
 ephemeralContainersStore.UpdateStrategy = registrypod.EphemeralContainersStrategy

 bindingREST := &BindingREST{store: store}
 return PodStorage{
  Pod:                 &REST{store, proxyTransport},
  Binding:             &BindingREST{store: store},
  LegacyBinding:       &LegacyBindingREST{bindingREST},
  Eviction:            newEvictionStorage(&statusStore, podDisruptionBudgetClient),
  Status:              &StatusREST{store: &statusStore},
  EphemeralContainers: &EphemeralContainersREST{store: &ephemeralContainersStore},
  Log:                 &podrest.LogREST{Store: store, KubeletConn: k},
  Proxy:               &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
  Exec:                &podrest.ExecREST{Store: store, KubeletConn: k},
  Attach:              &podrest.AttachREST{Store: store, KubeletConn: k},
  PortForward:         &podrest.PortForwardREST{Store: store, KubeletConn: k},
 }, nil
}

跳到初始化 Store 实例的 store.CompleteWithOptions 方法:

// k8s.io/apiserver/pkg/registry/generic/registry/store.go
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
 // ...

 // RESTOptions 是存储接口的实现
 // 调用 RESTOptions 的 GetRESTOptions 方法获取对应的存储实现的选项 opts
 opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
 if err != nil {
  return err
 }

 // 将 opts 中的值赋给 e 即 Store 实例,完成对 Store 实例的初始化
 if e.DeleteCollectionWorkers == 0 {
  e.DeleteCollectionWorkers = opts.DeleteCollectionWorkers
 }

 e.EnableGarbageCollection = opts.EnableGarbageCollection
 // ...

 if e.Storage.Storage == nil {
  e.Storage.Codec = opts.StorageConfig.Codec
  var err error
  e.Storage.Storage, e.DestroyFunc, err = opts.Decorator(
   opts.StorageConfig,
   prefix,
   keyFunc,
   e.NewFunc,
   e.NewListFunc,
   attrFunc,
   options.TriggerFunc,
   options.Indexers,
  )
  if err != nil {
   return err
  }
  e.StorageVersioner = opts.StorageConfig.EncodeVersioner

  if opts.CountMetricPollPeriod > 0 {
   stopFunc := e.startObservingCount(opts.CountMetricPollPeriod, opts.StorageObjectCountTracker)
   previousDestroy := e.DestroyFunc
   var once sync.Once
   e.DestroyFunc = func() {
    once.Do(func() {
     stopFunc()
     if previousDestroy != nil {
      previousDestroy()
     }
    })
   }
  }
 }

 return nil
}

到这里,Pod 资源的 RESTStorage 初始化完成。

接下来就开始看看一直提到的存储接口的实现 generic.RESTOptionsGetter ,它是一个接口类型:

// k8s.io/apiserver/pkg/registry/generic/options.go

type RESTOptionsGetter interface {
 GetRESTOptions(resource schema.GroupResource) (RESTOptions, error)
}

要找到 KubeAPIServer 对应的实现,我们要回到最开始为 KubeAPIServer 创建配置的地方,即第 2 回中创建服务调用链的地方:

// cmd/kube-apiserver/app/server.go
func CreateServerChain(completedOptions completedServerRunOptions) (*aggregatorapiserver.APIAggregator, error) {
 // 为 KubeAPIServer 创建配置
 kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
 if err != nil {
  return nil, err
 }

 // ...
 return aggregatorServer, nil
}

// cmd/kube-apiserver/app/server.go
func CreateKubeAPIServerConfig(s completedServerRunOptions) (
 *controlplane.Config,
 aggregatorapiserver.ServiceResolver,
 []admission.PluginInitializer,
 error,
)
 {
 proxyTransport := CreateProxyTransport()
 // 跳到 buildGenericConfig
 genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)

 // ...
 return config, serviceResolver, pluginInitializers, nil
}

// cmd/kube-apiserver/app/server.go
func buildGenericConfig(
 s *options.ServerRunOptions,
 proxyTransport *http.Transport,
)
 (
 genericConfig *genericapiserver.Config,
 versionedInformers clientgoinformers.SharedInformerFactory,
 serviceResolver aggregatorapiserver.ServiceResolver,
 pluginInitializers []admission.PluginInitializer,
 admissionPostStartHook genericapiserver.PostStartHookFunc,
 storageFactory *serverstorage.DefaultStorageFactory,
 lastErr error,
)
 {
 // ...

 // RESTOptionsGetter 的初始化
 if lastErr = s.Etcd.ApplyWithStorageFactoryTo(storageFactory, genericConfig); lastErr != nil {
  return
 }

 // ...

 return
}

// k8s.io/apiserver/pkg/server/options/etcd.go
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
 //...

 // RESTOptionsGetter 的实现是 StorageFactoryRestOptionsFactory
 c.RESTOptionsGetter = &StorageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
 return nil
}

总结下完整的调用链:

存储接口的实现:

CreateServerChain 创建服务调用链 → CreateKubeAPIServerConfig 创建 kubeAPIServerConfig 配置 → buildGenericConfig 生成 kubeAPIServerConfig 配置中的 genericConfig 配置 → ApplyWithStorageFactoryTo 初始化 genericConfig 配置的 RESTOptionsGetter

存储接口的使用:

CreateServerChain 创建服务调用链 → CreateKubeAPIServer 初始化服务传入 kubeAPIServerConfig 配置 → kubeAPIServerConfig.Complete().New(delegateAPIServer)InstallLegacyAPI 核心路由注册传入 kubeAPIServerConfig.GenericConfig.RESTOptionsGetter 存储实现

回到刚才, 得知 RESTOptionsGetter 使用了 StorageFactoryRestOptionsFactory 来实现存储接口:

// k8s.io/apiserver/pkg/server/options/etcd.go

// RESTOptionsGetter 接口的一个实现
type StorageFactoryRestOptionsFactory struct {
 Options        EtcdOptions
 StorageFactory serverstorage.StorageFactory
}

func (f *StorageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
 storageConfig, err := f.StorageFactory.NewConfig(resource)
 if err != nil {
  return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
 }

 ret := generic.RESTOptions{
  StorageConfig:             storageConfig,
  Decorator:                 generic.UndecoratedStorage,
  DeleteCollectionWorkers:   f.Options.DeleteCollectionWorkers,
  EnableGarbageCollection:   f.Options.EnableGarbageCollection,
  ResourcePrefix:            f.StorageFactory.ResourcePrefix(resource),
  CountMetricPollPeriod:     f.Options.StorageConfig.CountMetricPollPeriod,
  StorageObjectCountTracker: f.Options.StorageConfig.StorageObjectCountTracker,
 }

 if f.Options.EnableWatchCache {
  sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
  if err != nil {
   return generic.RESTOptions{}, err
  }
  size, ok := sizes[resource]
  if ok && size > 0 {
   klog.Warningf("Dropping watch-cache-size for %v - watchCache size is now dynamic", resource)
  }
  if ok && size <= 0 {
   klog.V(3).InfoS("Not using watch cache""resource", resource)
   // 不使用 watch cache 的 Storage 实现
   ret.Decorator = generic.UndecoratedStorage
  } else {
   klog.V(3).InfoS("Using watch cache""resource", resource)
   // 使用 watch cache 的 Storage 实现
   ret.Decorator = genericregistry.StorageWithCacher()
  }
 }

 return ret, nil
}

不论是否使用 watch cache 的 Storage 实现,都会调用 generic.NewRawStorage 方法:

func UndecoratedStorage(
 config *storagebackend.ConfigForResource,
 resourcePrefix string,
 keyFunc func(obj runtime.Object)
 (string, error),

 newFunc func() runtime.Object,
 newListFunc func() runtime.Object,
 getAttrsFunc storage.AttrFunc,
 trigger storage.IndexerFuncs,
 indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {
 return NewRawStorage(config, newFunc)
}

func StorageWithCacher() generic.StorageDecorator {
 return func(
  storageConfig *storagebackend.ConfigForResource,
  resourcePrefix string,
  keyFunc func(obj runtime.Object)
 (string, error),

  newFunc func() runtime.Object,
  newListFunc func() runtime.Object,
  getAttrsFunc storage.AttrFunc,
  triggerFuncs storage.IndexerFuncs,
  indexers *cache.Indexers) (storage.Interface, factory.DestroyFunc, error) {

  s, d, err := generic.NewRawStorage(storageConfig, newFunc)
  //...

  return cacher, destroyFunc, nil
 }
}

看到 NewRawStorage 方法:

// k8s.io/apiserver/pkg/registry/generic/storage_decorator.go
func NewRawStorage(config *storagebackend.ConfigForResource, newFunc func() runtime.Object(storage.Interface, factory.DestroyFunc, error) {
 return factory.Create(*config, newFunc)
}

// k8s.io/apiserver/pkg/storage/storagebackend/factory/factory.go
func Create(c storagebackend.ConfigForResource, newFunc func() runtime.Object(storage.Interface, DestroyFunc, error) {
 switch c.Type {
 case storagebackend.StorageTypeETCD2:
  return nilnil, fmt.Errorf("%s is no longer a supported storage backend", c.Type)
 case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
  // 只支持 etcd V3
  return newETCD3Storage(c, newFunc)
 default:
  return nilnil, fmt.Errorf("unknown storage type: %s", c.Type)
 }
}

来到终点站 newETCD3Storage 方法:

// k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd3.go

func newETCD3Storage(c storagebackend.ConfigForResource, newFunc func() runtime.Object(storage.Interface, DestroyFunc, error) {
 // ...

 // 创建 etcd V3 的 client
 client, err := newETCD3Client(c.Transport)
 if err != nil {
  stopCompactor()
  return nilnil, err
 }

 // ...
 return etcd3.New(client, c.Codec, newFunc, c.Prefix, c.GroupResource, transformer, c.Paging, c.LeaseManagerConfig), destroyFunc, nil
}

var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, error) {
 // ...

 cfg := clientv3.Config{
  DialTimeout:          dialTimeout,
  DialKeepAliveTime:    keepaliveTime,
  DialKeepAliveTimeout: keepaliveTimeout,
  DialOptions:          dialOptions,
  Endpoints:            c.ServerList,
  TLS:                  tlsConfig,
  Logger:               etcd3ClientLogger,
 }

 // 调用 etcd 官方库 go.etcd.io/etcd/client/v3
 return clientv3.New(cfg)
}

至此,etcd 的初始化便完成。

浏览 19
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报