K8s client-go 的四种客户端

k8s技术圈

共 92358字,需浏览 185分钟

 ·

2023-09-07 21:04

前面都在讲 kube-apiserver ,后续 Kubernetes 其它组件,都会使用 client-go 库来调用 kube-apiserver 提供的 API 服务进行资源对象的操作。

Kubernetes 的源码 staging/src/k8s.io/client-go 中集成了 client-go ,所有组件都是调用 vendor 本地库的方式引入 client-go 库,不需要考虑和集群版本的兼容性,但如果是平时我们自己基于 Kubernetes 做二次开发用到 client-go 库时,则需要注意下所导入的库和 Kubernetes 版本的对应关系。

对于 Kubernetes 1.27.2 ,client-go 源码的部分目录如下:

根据不同功能,可以将其划分为四种客户端:

  • RESTClient:最基础的客户端,仅对 HTTP Request 进行了封装。实现位置在 rest 目录
  • ClientSet:基于 RESTClient 实现,封装了 Kubernetes 内置资源(Resource)和版本(Version)的方法。实现位置在 kubernetes 目录
  • DynamicClient:动态客户端,基于 RESTClient 实现,封装了 Kubernetes 任意资源(包括 CRD 自定义资源)和版本的方法。实现位置在 dynamic 目录
  • DiscoveryClient:发现客户端,基于 RESTClient 实现,用于发现 kube-apiserver 所支持的资源组(Group)、资源版本(Versions)和资源信息(Resources)。实现位置在 discovery 目录

RESTClient

作为最基础的客户端,其接口定义如下:

// k8s.io/client-go/rest/client.go

type Interface interface {
 // 返回速率限制器
 GetRateLimiter() flowcontrol.RateLimiter
 // 构建 (POST, Put, Patch, Get, DELETE) 请求器
 Verb(verb string) *Request
 // 构建 POST 请求器
 Post() *Request
 // 构建 Put 请求器
 Put() *Request
 // 构建 Patch 请求器
 Patch(pt types.PatchType) *Request
 // 构建 Get 请求器
 Get() *Request
 // 构建 Delete 请求器
 Delete() *Request
 // 返回 API 版本
 APIVersion() schema.GroupVersion
}

RESTClient 的实现很简单,主要提供一个对外的接口调用:

// k8s.io/client-go/rest/client.go

type RESTClient struct {
 base *url.URL
 versionedAPIPath string
 content ClientContentConfig
 createBackoffMgr func() BackoffManager
 rateLimiter flowcontrol.RateLimiter
 warningHandler WarningHandler
 // http 客户端
 Client *http.Client
}

func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
 if len(config.ContentType) == 0 {
  config.ContentType = "application/json"
 }

 base := *baseURL
 if !strings.HasSuffix(base.Path, "/") {
  base.Path += "/"
 }
 base.RawQuery = ""
 base.Fragment = ""

 return &RESTClient{
  base:             &base,
  versionedAPIPath: versionedAPIPath,
  content:          config,
  createBackoffMgr: readExpBackoffConfig,
  rateLimiter:      rateLimiter,

  Client: client,
 }, nil
}

func (c *RESTClient) Verb(verb string) *Request {
 // 构建请求器
 return NewRequest(c).Verb(verb)
}

func (c *RESTClient) Post() *Request {
 return c.Verb("POST")
}

// 其余请求器的构建方法省略,同 Post

核心的处理全在 Request 请求器上,来到构建请求器的 NewRequest 方法:

// k8s.io/client-go/rest/request.go

type Request struct {
 c *RESTClient
 namespace    string
 resource     string
 // 省略
}

// 传入 RESTClient 参数
func NewRequest(c *RESTClient) *Request {
 // ...
 r := &Request{
  c:              c,
  // ...
 }

 switch {
 case len(c.content.AcceptContentTypes) > 0:
  r.SetHeader("Accept", c.content.AcceptContentTypes)
 case len(c.content.ContentType) > 0:
  r.SetHeader("Accept", c.content.ContentType+", */*")
 }
 return r
}

// 设置要访问的命名空间 (<resource>/[ns/<namespace>/]<name>)
func (r *Request) Namespace(namespace string) *Request {
 if r.err != nil {
  return r
 }
 if r.namespaceSet {
  r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
  return r
 }
 if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
  r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
  return r
 }
 r.namespaceSet = true
 r.namespace = namespace
 return r
}

// 设置要访问的资源 (<resource>/[ns/<namespace>/]<name>)
func (r *Request) Resource(resource string) *Request {
 if r.err != nil {
  return r
 }
 if len(r.resource) != 0 {
  r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
  return r
 }
 if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
  r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
  return r
 }
 r.resource = resource
 return r
}

Request 请求器的本质就是对命名空间、资源等 K8s 的概念进行了一个通用的封装,通过调用相应的方法就可以构造出最终的请求 URL 和参数:

func (r *Request) URL() *url.URL {
 p := r.pathPrefix
 if r.namespaceSet && len(r.namespace) > 0 {
  // 命名空间
  p = path.Join(p, "namespaces", r.namespace)
 }
 if len(r.resource) != 0 {
  // 资源
  p = path.Join(p, strings.ToLower(r.resource))
 }
 // Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
 if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
  p = path.Join(p, r.resourceName, r.subresource, r.subpath)
 }

 finalURL := &url.URL{}
 if r.c.base != nil {
  *finalURL = *r.c.base
 }
 finalURL.Path = p

 query := url.Values{}
 for key, values := range r.params {
  for _, value := range values {
   query.Add(key, value)
  }
 }

 // timeout is handled specially here.
 if r.timeout != 0 {
  // 参数
  query.Set("timeout", r.timeout.String())
 }
 finalURL.RawQuery = query.Encode()
 return finalURL
}

最后调用 Do 方法发起请求,并返回请求的结果:

// 请求结果
type Result struct {
 body        []byte
 warnings    []net.WarningHeader
 contentType string
 err         error
 statusCode  int

 decoder runtime.Decoder
}

func (r *Request) Do(ctx context.Context) Result {
 var result Result
 // 发起请求
 err := r.request(ctx, func(req *http.Request, resp *http.Response) {
  result = r.transformResponse(resp, req)
 })
 if err != nil {
  return Result{err: err}
 }
 if result.err == nil || len(result.body) > 0 {
  metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
 }
 return result
}

func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)error {
 // ...

 client := r.c.Client
 // 如果 http 客户端没有初始化,则使用默认的 http 客户端
 if client == nil {
  client = http.DefaultClient
 }

 // ...

 // 重试机制
 retry := r.retryFn(r.maxRetries)
 for {
  if err := retry.Before(ctx, r); err != nil {
   return retry.WrapPreviousError(err)
  }
  // 构造 http.Request 对象
  req, err := r.newHTTPRequest(ctx)
  if err != nil {
   return err
  }
  // 调用 http 客户端的 Do 方法发起 HTTP 请求
  resp, err := client.Do(req)
  // The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
  // https://pkg.go.dev/net/http#Request
  if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
   metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
  }
  retry.After(ctx, r, resp, err)

  done := func() bool {
   defer readAndCloseResponseBody(resp)

   // if the server returns an error in err, the response will be nil.
   f := func(req *http.Request, resp *http.Response) {
    if resp == nil {
     return
    }
    fn(req, resp)
   }

   if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
    return false
   }

   f(req, resp)
   return true
  }()
  if done {
   return retry.WrapPreviousError(err)
  }
 }
}

func (r *Request) newHTTPRequest(ctx context.Context) (*http.Request, error) {
 var body io.Reader
 switch {
 case r.body != nil && r.bodyBytes != nil:
  return nil, fmt.Errorf("cannot set both body and bodyBytes")
 case r.body != nil:
  body = r.body
 case r.bodyBytes != nil:
  // Create a new reader specifically for this request.
  // Giving each request a dedicated reader allows retries to avoid races resetting the request body.
  body = bytes.NewReader(r.bodyBytes)
 }

 // 构造请求 url
 url := r.URL().String()
 // 初始化 http.Request 对象
 req, err := http.NewRequest(r.verb, url, body)
 if err != nil {
  return nil, err
 }
 req = req.WithContext(ctx)
 req.Header = r.headers
 return req, nil
}

例如,我们要请求默认命名空间下的 Pod 资源,可以通过如下代码实现:

package main

import (
 "context"
 "fmt"

 corev1 "k8s.io/api/core/v1"
 "k8s.io/client-go/kubernetes/scheme"
 "k8s.io/client-go/rest"
 "k8s.io/client-go/tools/clientcmd"
)

func main() {
 // 通过 kubeconfig 生成配置
 config, err := clientcmd.BuildConfigFromFlags(""".kube/config")
 if err != nil {
  panic(err)
 }

 config.APIPath = "api"
 // 手动指定版本
 config.GroupVersion = &corev1.SchemeGroupVersion
 config.NegotiatedSerializer = scheme.Codecs

 // 创建 RESTClient
 restClient, err := rest.RESTClientFor(config)
 if err != nil {
  panic(err)
 }

 // 查询 default 的 pod
 result := &corev1.PodList{}
 if err := restClient.
  Get().
  Namespace("default").
  Resource("pods").
  Do(context.TODO()).
  Into(result); err != nil {
  panic(err)
 }

 for _, item := range result.Items {
  fmt.Printf("%v \t [%v]\n", item.Name, item.Status.Phase)
 }
}

// $ go run main.go
// nginx    [Pending]

ClientSet

如果清楚了 RESTClient 的使用,ClientSet 实际很简单,首先 ClientSet 对外提供了一层各种版本的接口:

// k8s.io/client-go/kubernetes/clientset.go

type Interface interface {
 Discovery() discovery.DiscoveryInterface
 AdmissionregistrationV1() admissionregistrationv1.AdmissionregistrationV1Interface
 AdmissionregistrationV1alpha1() admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Interface
 AdmissionregistrationV1beta1() admissionregistrationv1beta1.AdmissionregistrationV1beta1Interface
 InternalV1alpha1() internalv1alpha1.InternalV1alpha1Interface
 AppsV1() appsv1.AppsV1Interface
 AppsV1beta1() appsv1beta1.AppsV1beta1Interface
 AppsV1beta2() appsv1beta2.AppsV1beta2Interface
 AuthenticationV1() authenticationv1.AuthenticationV1Interface
 AuthenticationV1alpha1() authenticationv1alpha1.AuthenticationV1alpha1Interface
 AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface
 AuthorizationV1() authorizationv1.AuthorizationV1Interface
 AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface
 AutoscalingV1() autoscalingv1.AutoscalingV1Interface
 AutoscalingV2() autoscalingv2.AutoscalingV2Interface
 AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface
 AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface
 BatchV1() batchv1.BatchV1Interface
 BatchV1beta1() batchv1beta1.BatchV1beta1Interface
 CertificatesV1() certificatesv1.CertificatesV1Interface
 CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface
 CertificatesV1alpha1() certificatesv1alpha1.CertificatesV1alpha1Interface
 CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface
 CoordinationV1() coordinationv1.CoordinationV1Interface
 CoreV1() corev1.CoreV1Interface
 DiscoveryV1() discoveryv1.DiscoveryV1Interface
 DiscoveryV1beta1() discoveryv1beta1.DiscoveryV1beta1Interface
 EventsV1() eventsv1.EventsV1Interface
 EventsV1beta1() eventsv1beta1.EventsV1beta1Interface
 ExtensionsV1beta1() extensionsv1beta1.ExtensionsV1beta1Interface
 FlowcontrolV1alpha1() flowcontrolv1alpha1.FlowcontrolV1alpha1Interface
 FlowcontrolV1beta1() flowcontrolv1beta1.FlowcontrolV1beta1Interface
 FlowcontrolV1beta2() flowcontrolv1beta2.FlowcontrolV1beta2Interface
 FlowcontrolV1beta3() flowcontrolv1beta3.FlowcontrolV1beta3Interface
 NetworkingV1() networkingv1.NetworkingV1Interface
 NetworkingV1alpha1() networkingv1alpha1.NetworkingV1alpha1Interface
 NetworkingV1beta1() networkingv1beta1.NetworkingV1beta1Interface
 NodeV1() nodev1.NodeV1Interface
 NodeV1alpha1() nodev1alpha1.NodeV1alpha1Interface
 NodeV1beta1() nodev1beta1.NodeV1beta1Interface
 PolicyV1() policyv1.PolicyV1Interface
 PolicyV1beta1() policyv1beta1.PolicyV1beta1Interface
 RbacV1() rbacv1.RbacV1Interface
 RbacV1beta1() rbacv1beta1.RbacV1beta1Interface
 RbacV1alpha1() rbacv1alpha1.RbacV1alpha1Interface
 ResourceV1alpha2() resourcev1alpha2.ResourceV1alpha2Interface
 SchedulingV1alpha1() schedulingv1alpha1.SchedulingV1alpha1Interface
 SchedulingV1beta1() schedulingv1beta1.SchedulingV1beta1Interface
 SchedulingV1() schedulingv1.SchedulingV1Interface
 StorageV1beta1() storagev1beta1.StorageV1beta1Interface
 StorageV1() storagev1.StorageV1Interface
 StorageV1alpha1() storagev1alpha1.StorageV1alpha1Interface
}

以 CoreV1 为例,又继续提供了一层 CoreV1 版本的资源对象接口,其中内置了 RESTClient 客户端:

// k8s.io/client-go/kubernetes/typed/core/v1/core_client.go

type CoreV1Interface interface {
 RESTClient() rest.Interface
 ComponentStatusesGetter
 ConfigMapsGetter
 EndpointsGetter
 EventsGetter
 LimitRangesGetter
 NamespacesGetter
 NodesGetter
 PersistentVolumesGetter
 PersistentVolumeClaimsGetter
 PodsGetter
 PodTemplatesGetter
 ReplicationControllersGetter
 ResourceQuotasGetter
 SecretsGetter
 ServicesGetter
 ServiceAccountsGetter
}

以 PodsGetter 接口为例,该接口对 Pod 资源的各种操作进行了封装:

// k8s.io/client-go/kubernetes/typed/core/v1/pod.go

type PodsGetter interface {
 Pods(namespace string) PodInterface
}

type PodInterface interface {
 Create(ctx context.Context, pod *v1.Pod, opts metav1.CreateOptions) (*v1.Pod, error)
 Update(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
 UpdateStatus(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
 Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
 DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
 Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Pod, error)
 List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error)
 Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
 Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Pod, err error)
 Apply(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
 ApplyStatus(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
 UpdateEphemeralContainers(ctx context.Context, podName string, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)

 PodExpansion
}

对于 Pod 资源 List 方法的实现:

type pods struct {
 client rest.Interface
 ns     string
}

func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
 var timeout time.Duration
 if opts.TimeoutSeconds != nil {
  timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
 }
 result = &v1.PodList{}
 err = c.client.Get().
  Namespace(c.ns).
  Resource("pods").
  VersionedParams(&opts, scheme.ParameterCodec).
  Timeout(timeout).
  Do(ctx).
  Into(result)
 return
}

实际就是我们之前使用 RESTClient 获取 Pod 资源列表的写法。

基于对各种资源操作的封装,现在使用 ClientSet 客户端来获取 Pod 的写法就简单多了:

package main

import (
 "context"
 "fmt"

 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 "k8s.io/client-go/kubernetes"

 "k8s.io/client-go/tools/clientcmd"
)

func main() {
 // 通过 kubeconfig 生成配置
 config, err := clientcmd.BuildConfigFromFlags(""".kube/config")
 if err != nil {
  panic(err)
 }

 // 创建 ClientSet
 clientSet, err := kubernetes.NewForConfig(config)
 if err != nil {
  panic(err)
 }

 // 查询 default 的 pod
 pods, err := clientSet.
  CoreV1().
  Pods("default").
  List(context.TODO(), metav1.ListOptions{})
 if err != nil {
  panic(err)
 }

 for _, pod := range pods.Items {
  fmt.Printf("%v \t [%v]\n", pod.Name, pod.Status.Phase)
 }
}

// $ go run main.go
// nginx    [Pending]

DynamicClient

从 ClientSet 客户端的源码实现就可以看出,它只支持 K8s 内置资源的操作。对于 CRD 自定义资源,就需要使用 DynamicClient 动态客户端。

接口定义如下:

// k8s.io/client-go/dynamic/interface.go

type Interface interface {
 Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface
}

type ResourceInterface interface {
 Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error)
 Update(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error)
 UpdateStatus(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error)
 Delete(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error
 DeleteCollection(ctx context.Context, options metav1.DeleteOptions, listOptions metav1.ListOptions) error
 Get(ctx context.Context, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error)
 List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
 Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
 Patch(ctx context.Context, name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error)
 Apply(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions, subresources ...string) (*unstructured.Unstructured, error)
 ApplyStatus(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions) (*unstructured.Unstructured, error)
}

type NamespaceableResourceInterface interface {
 Namespace(string) ResourceInterface
 ResourceInterface
}

DynamicClient 客户端的实现,同样内置 RESTClient 客户端:

// k8s.io/client-go/dynamic/simple.go

type DynamicClient struct {
 client rest.Interface
}

func (c *DynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface {
 return &dynamicResourceClient{client: c, resource: resource}
}

具体的资源操作实现在 dynamicResourceClient ,以 List 方法为例:

type dynamicResourceClient struct {
 client    *DynamicClient
 namespace string
 resource  schema.GroupVersionResource
}

func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
 if err := validateNamespaceWithOptionalName(c.namespace); err != nil {
  return nil, err
 }
 // 调用 RESTClient 客户端
 result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do(ctx)
 if err := result.Error(); err != nil {
  return nil, err
 }
 // 返回 []byte 类型的结果
 retBytes, err := result.Raw()
 if err != nil {
  return nil, err
 }
 // 将结果解码到 UnstructuredList 类型(无法预知的数据结构)
 uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
 if err != nil {
  return nil, err
 }
 if list, ok := uncastObj.(*unstructured.UnstructuredList); ok {
  return list, nil
 }

 list, err := uncastObj.(*unstructured.Unstructured).ToList()
 if err != nil {
  return nil, err
 }
 return list, nil
}

DynamicClient 客户端本质上也是调用 RESTClient 客户端,只是最后返回的请求结果是一个 Unstructured 或 UnstructuredList 类型(无法预知的数据结构),这也是为什么被称为动态客户端。

type UnstructuredList struct {
 Object map[string]interface{}

 // Items is a list of unstructured objects.
 Items []Unstructured `json:"items"`
}

type Unstructured struct {
 // Object is a JSON compatible map with string, float, int, bool, []interface{}, or
 // map[string]interface{}
 // children.
 Object map[string]interface{}
}

DynamicClient 可以处理 ClientSet 无法处理的 CRD 资源,既是优点也是缺点,动态就意味着缺乏类型安全,返回的请求结果也需要通过反射才能转换成实际的结构体类型,处理起来就略微麻烦了点:

package main

import (
 "context"
 "fmt"
 corev1 "k8s.io/api/core/v1"
 "k8s.io/apimachinery/pkg/runtime"
 "k8s.io/apimachinery/pkg/runtime/schema"
 "k8s.io/client-go/dynamic"

 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 "k8s.io/client-go/tools/clientcmd"
)

func main() {
 // 通过 kubeconfig 生成配置
 config, err := clientcmd.BuildConfigFromFlags(""".kube/config")
 if err != nil {
  panic(err)
 }

 // 创建 DynamicClient
 dynamicClient, err := dynamic.NewForConfig(config)
 if err != nil {
  panic(err.Error())
 }

 // 查询 default 的 pod
 unstructuredList, err := dynamicClient.Resource(schema.GroupVersionResource{
  Group:    "",
  Version:  "v1",
  Resource: "pods",
 }).Namespace("default").List(context.TODO(), metav1.ListOptions{})
 if err != nil {
  panic(err.Error())
 }

 // 使用反射将 unstructuredList 的数据转成对应的结构体类型 corev1.PodList
 pods := &corev1.PodList{}
 if err = runtime.DefaultUnstructuredConverter.FromUnstructured(
  unstructuredList.UnstructuredContent(),
  pods,
 ); err != nil {
  panic(err.Error())
 }

 for _, pod := range pods.Items {
  fmt.Printf("%v \t [%v]\n", pod.Name, pod.Status.Phase)
 }
}

// $ go run main.go
// nginx    [Pending]

DiscoveryClient

在上面示例中,请求某个资源时需要指定 GVR :资源组(Group)、资源版本(Versions)和资源信息(Resources)。例如 Pod 资源对应的 GVR 为:

schema.GroupVersionResource{
  Group:    "",
  Version:  "v1",
  Resource: "pods",
}

在 K8s 中有很多的资源对象,我们不可能全部记住,除了翻阅官方文档,还可以通过 DiscoveryClient 客户端获取,我们熟悉的 kubectl api-resources 命令实际也是通过该客户端获取的:

$ kubectl api-resources
NAME                              SHORTNAMES   APIVERSION                             NAMESPACED   KIND
bindings                                       v1                                     true         Binding
componentstatuses                 cs           v1                                     false        ComponentStatus
configmaps                        cm           v1                                     true         ConfigMap
endpoints                         ep           v1                                     true         Endpoints
events                            ev           v1                                     true         Event
limitranges                       limits       v1                                     true         LimitRange
namespaces                        ns           v1                                     false        Namespace
nodes                             no           v1                                     false        Node
persistentvolumeclaims            pvc          v1                                     true         PersistentVolumeClaim
persistentvolumes                 pv           v1                                     false        PersistentVolume
pods                              po           v1                                     true         Pod
podtemplates                                   v1                                     true         PodTemplate
replicationcontrollers            rc           v1                                     true         ReplicationController
resourcequotas                    quota        v1                                     true         ResourceQuota
secrets                                        v1                                     true         Secret
serviceaccounts                   sa           v1                                     true         ServiceAccount
services                          svc          v1                                     true         Service
mutatingwebhookconfigurations                  admissionregistration.k8s.io/v1        false        MutatingWebhookConfiguration
validatingwebhookconfigurations                admissionregistration.k8s.io/v1        false        ValidatingWebhookConfiguration
customresourcedefinitions         crd,crds     apiextensions.k8s.io/v1                false        CustomResourceDefinition
apiservices                                    apiregistration.k8s.io/v1              false        APIService
controllerrevisions                            apps/v1                                true         ControllerRevision
daemonsets                        ds           apps/v1                                true         DaemonSet
deployments                       deploy       apps/v1                                true         Deployment
replicasets                       rs           apps/v1                                true         ReplicaSet
statefulsets                      sts          apps/v1                                true         StatefulSet
tokenreviews                                   authentication.k8s.io/v1               false        TokenReview
localsubjectaccessreviews                      authorization.k8s.io/v1                true         LocalSubjectAccessReview
selfsubjectaccessreviews                       authorization.k8s.io/v1                false        SelfSubjectAccessReview
selfsubjectrulesreviews                        authorization.k8s.io/v1                false        SelfSubjectRulesReview
subjectaccessreviews                           authorization.k8s.io/v1                false        SubjectAccessReview
horizontalpodautoscalers          hpa          autoscaling/v2                         true         HorizontalPodAutoscaler
cronjobs                          cj           batch/v1                               true         CronJob
jobs                                           batch/v1                               true         Job
certificatesigningrequests        csr          certificates.k8s.io/v1                 false        CertificateSigningRequest
leases                                         coordination.k8s.io/v1                 true         Lease
endpointslices                                 discovery.k8s.io/v1                    true         EndpointSlice
events                            ev           events.k8s.io/v1                       true         Event
flowschemas                                    flowcontrol.apiserver.k8s.io/v1beta3   false        FlowSchema
prioritylevelconfigurations                    flowcontrol.apiserver.k8s.io/v1beta3   false        PriorityLevelConfiguration
ingressclasses                                 networking.k8s.io/v1                   false        IngressClass
ingresses                         ing          networking.k8s.io/v1                   true         Ingress
networkpolicies                   netpol       networking.k8s.io/v1                   true         NetworkPolicy
runtimeclasses                                 node.k8s.io/v1                         false        RuntimeClass
poddisruptionbudgets              pdb          policy/v1                              true         PodDisruptionBudget
clusterrolebindings                            rbac.authorization.k8s.io/v1           false        ClusterRoleBinding
clusterroles                                   rbac.authorization.k8s.io/v1           false        ClusterRole
rolebindings                                   rbac.authorization.k8s.io/v1           true         RoleBinding
roles                                          rbac.authorization.k8s.io/v1           true         Role
priorityclasses                   pc           scheduling.k8s.io/v1                   false        PriorityClass
csidrivers                                     storage.k8s.io/v1                      false        CSIDriver
csinodes                                       storage.k8s.io/v1                      false        CSINode
csistoragecapacities                           storage.k8s.io/v1                      true         CSIStorageCapacity
storageclasses                    sc           storage.k8s.io/v1                      false        StorageClass
volumeattachments                              storage.k8s.io/v1                      false        VolumeAttachment

以下是 DiscoveryClient 客户端的接口定义,可以分为聚合发现和缓存发现两种,同样都内置了 RESTClient 客户端:

// k8s.io/client-go/discovery/discovery_client.go

// 聚合发现
type AggregatedDiscoveryInterface interface {
 DiscoveryInterface
 GroupsAndMaybeResources() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error, error)
}

// 缓存发现
type CachedDiscoveryInterface interface {
 DiscoveryInterface
 Fresh() bool
 Invalidate()
}

type DiscoveryInterface interface {
 RESTClient() restclient.Interface
 ServerGroupsInterface
 ServerResourcesInterface
 ServerVersionInterface
 OpenAPISchemaInterface
 OpenAPIV3SchemaInterface
 // Returns copy of current discovery client that will only
 // receive the legacy discovery format, or pointer to current
 // discovery client if it does not support legacy-only discovery.
 WithLegacy() DiscoveryInterface
}

type ServerGroupsInterface interface {
 // 返回资源组
 ServerGroups() (*metav1.APIGroupList, error)
}

type ServerResourcesInterface interface {
  // 返回指定资源组和版本的资源信息
 ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error)
 // 返回所有资源组和资源信息
 ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error)
 // 返回所有资源信息(首选版本)
 ServerPreferredResources() ([]*metav1.APIResourceList, error)
 // 返回所有支持命名空间的资源信息(首选版本)
 ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error)
}

type ServerVersionInterface interface {
 ServerVersion() (*version.Info, error)
}

type OpenAPISchemaInterface interface {
 OpenAPISchema() (*openapi_v2.Document, error)
}

type OpenAPIV3SchemaInterface interface {
 OpenAPIV3() openapi.Client
}

先看聚合发现客户端的 ServerGroupsAndResources 方法实现,即最基本的 DiscoveryClient :

type DiscoveryClient struct {
 restClient restclient.Interface

 LegacyPrefix string
 // Forces the client to request only "unaggregated" (legacy) discovery.
 UseLegacyDiscovery bool
}

var _ AggregatedDiscoveryInterface = &DiscoveryClient{}

func (d *DiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
 return withRetries(defaultRetries, func() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
  // 调用 ServerGroupsAndResources 方法
  return ServerGroupsAndResources(d)
 })
}

func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
 var sgs *metav1.APIGroupList
 var resources []*metav1.APIResourceList
 var failedGVs map[schema.GroupVersion]error
 var err error

 if ad, ok := d.(AggregatedDiscoveryInterface); ok {
  // 如果是聚合发现客户端
  var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
  // 调用 GroupsAndMaybeResources 方法
  sgs, resourcesByGV, failedGVs, err = ad.GroupsAndMaybeResources()
  for _, resourceList := range resourcesByGV {
   // 资源信息
   resources = append(resources, resourceList)
  }
 } else {
  sgs, err = d.ServerGroups()
 }

 if sgs == nil {
  return nilnil, err
 }
 resultGroups := []*metav1.APIGroup{}
 for i := range sgs.Groups {
  resultGroups = append(resultGroups, &sgs.Groups[i])
 }
 if resources != nil {
  var ferr error
  if len(failedGVs) > 0 {
   ferr = &ErrGroupDiscoveryFailed{Groups: failedGVs}
  }
  // 如果资源信息不为空,则返回
  return resultGroups, resources, ferr
 }

 // 如果没有查询到资源信息,继续往下处理,此时主要为缓存发现的逻辑,暂时略过
 // ...
}

对于 DiscoveryClient 聚合发现的主要实现在 GroupsAndMaybeResources 方法:

func (d *DiscoveryClient) GroupsAndMaybeResources() (
 *metav1.APIGroupList,
 map[schema.GroupVersion]*metav1.APIResourceList,
 map[schema.GroupVersion]error,
 error)
 {
 // 首先从 /api 下载遗留的组和资源
 groups, resources, failedGVs, err := d.downloadLegacy()
 if err != nil {
  return nilnilnil, err
 }
 // 从 /apis 下载发现的组和(可能的)资源
 apiGroups, apiResources, failedApisGVs, aerr := d.downloadAPIs()
 if aerr != nil {
  return nilnilnil, aerr
 }
 // 将从 /apis 下载的组合并到遗留的组中
 for _, group := range apiGroups.Groups {
  groups.Groups = append(groups.Groups, group)
 }
 // 仅当两个端点都返回资源时,才返回资源
 if resources != nil && apiResources != nil {
  for gv, resourceList := range apiResources {
   resources[gv] = resourceList
  }
 } else if resources != nil {
  resources = nil
 }
 // 将从 /api 和 /apis 下载的失败的 GroupVersion 合并
 for gv, err := range failedApisGVs {
  failedGVs[gv] = err
 }
 return groups, resources, failedGVs, err
}

func (d *DiscoveryClient) downloadLegacy() (
 *metav1.APIGroupList,
 map[schema.GroupVersion]*metav1.APIResourceList,
 map[schema.GroupVersion]error,
 error)
 {
 accept := acceptDiscoveryFormats
 if d.UseLegacyDiscovery {
  accept = AcceptV1
 }
 var responseContentType string
 // 使用 RESTClient 客户端发起请求
 body, err := d.restClient.Get().
  AbsPath("/api").
  SetHeader("Accept", accept).
  Do(context.TODO()).
  ContentType(&responseContentType).
  Raw()
 apiGroupList := &metav1.APIGroupList{}
 failedGVs := map[schema.GroupVersion]error{}
 if err != nil {
  // Tolerate 404, since aggregated api servers can return it.
  if errors.IsNotFound(err) {
   // Return empty structures and no error.
   emptyGVMap := map[schema.GroupVersion]*metav1.APIResourceList{}
   return apiGroupList, emptyGVMap, failedGVs, nil
  } else {
   return nilnilnil, err
  }
 }

 var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
 // Switch on content-type server responded with: aggregated or unaggregated.
 switch {
 case isV2Beta1ContentType(responseContentType):
  var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
  err = json.Unmarshal(body, &aggregatedDiscovery)
  if err != nil {
   return nilnilnil, err
  }
  apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
 default:
  // Default is unaggregated discovery v1.
  var v metav1.APIVersions
  err = json.Unmarshal(body, &v)
  if err != nil {
   return nilnilnil, err
  }
  apiGroup := metav1.APIGroup{}
  if len(v.Versions) != 0 {
   apiGroup = apiVersionsToAPIGroup(&v)
  }
  apiGroupList.Groups = []metav1.APIGroup{apiGroup}
 }

 return apiGroupList, resourcesByGV, failedGVs, nil
}

func (d *DiscoveryClient) downloadAPIs() (
 *metav1.APIGroupList,
 map[schema.GroupVersion]*metav1.APIResourceList,
 map[schema.GroupVersion]error,
 error)
 {
 accept := acceptDiscoveryFormats
 if d.UseLegacyDiscovery {
  accept = AcceptV1
 }
 var responseContentType string
 // 使用 RESTClient 客户端发起请求
 body, err := d.restClient.Get().
  AbsPath("/apis").
  SetHeader("Accept", accept).
  Do(context.TODO()).
  ContentType(&responseContentType).
  Raw()
 if err != nil {
  return nilnilnil, err
 }

 apiGroupList := &metav1.APIGroupList{}
 failedGVs := map[schema.GroupVersion]error{}
 var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
 // Switch on content-type server responded with: aggregated or unaggregated.
 switch {
 case isV2Beta1ContentType(responseContentType):
  var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
  err = json.Unmarshal(body, &aggregatedDiscovery)
  if err != nil {
   return nilnilnil, err
  }
  apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
 default:
  // Default is unaggregated discovery v1.
  err = json.Unmarshal(body, apiGroupList)
  if err != nil {
   return nilnilnil, err
  }
 }

 return apiGroupList, resourcesByGV, failedGVs, nil
}

可以看到 DiscoveryClient 客户端就是通过 RESTClient 客户端调用 /api 和 /apis 获取到所有的资源信息聚合后返回。

使用示例如下:

package main

import (
 "fmt"
 "k8s.io/apimachinery/pkg/runtime/schema"
 "k8s.io/client-go/discovery"
 "k8s.io/client-go/tools/clientcmd"
)

func main() {
 // 通过 kubeconfig 生成配置
 config, err := clientcmd.BuildConfigFromFlags(""".kube/config")
 if err != nil {
  panic(err)
 }

 // 创建 DiscoveryClient
 discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
 if err != nil {
  panic(err.Error())
 }

 _, apiResources, err := discoveryClient.ServerGroupsAndResources()
 if err != nil {
  panic(err.Error())
 }

 for _, apiResource := range apiResources {
  gv, err := schema.ParseGroupVersion(apiResource.GroupVersion)
  if err != nil {
   panic(err.Error())
  }
  for _, resource := range apiResource.APIResources {
   fmt.Printf("[G]%v [V]%v [R]%v\n", gv.Group, gv.Version, resource.Name)
  }
 }
}

// $ go run main.go
//  [G]node.k8s.io [V]v1 [R]runtimeclasses
//  [G]authorization.k8s.io [V]v1 [R]localsubjectaccessreviews
//  [G]authorization.k8s.io [V]v1 [R]selfsubjectaccessreviews
//  [G]authorization.k8s.io [V]v1 [R]selfsubjectrulesreviews
//  [G]authorization.k8s.io [V]v1 [R]subjectaccessreviews
//  [G]discovery.k8s.io [V]v1 [R]endpointslices
//  [G]storage.k8s.io [V]v1 [R]csidrivers
//  [G]storage.k8s.io [V]v1 [R]csinodes
//  [G]storage.k8s.io [V]v1 [R]csistoragecapacities
//  [G]storage.k8s.io [V]v1 [R]storageclasses
//  [G]storage.k8s.io [V]v1 [R]volumeattachments
//  [G]storage.k8s.io [V]v1 [R]volumeattachments/status
//  [G]coordination.k8s.io [V]v1 [R]leases
//  [G]autoscaling [V]v1 [R]horizontalpodautoscalers
//  [G]autoscaling [V]v1 [R]horizontalpodautoscalers/status
//  [G]authentication.k8s.io [V]v1 [R]tokenreviews
//  [G]networking.k8s.io [V]v1 [R]ingressclasses
//  [G]networking.k8s.io [V]v1 [R]ingresses
//  [G]networking.k8s.io [V]v1 [R]ingresses/status
//  [G]networking.k8s.io [V]v1 [R]networkpolicies
//  [G]networking.k8s.io [V]v1 [R]networkpolicies/status
//  [G]admissionregistration.k8s.io [V]v1 [R]mutatingwebhookconfigurations
//  [G]admissionregistration.k8s.io [V]v1 [R]validatingwebhookconfigurations
//  [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]flowschemas
//  [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]flowschemas/status
//  [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]prioritylevelconfigurations
//  [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]prioritylevelconfigurations/status
//  [G]certificates.k8s.io [V]v1 [R]certificatesigningrequests
//  [G]certificates.k8s.io [V]v1 [R]certificatesigningrequests/approval
//  [G]certificates.k8s.io [V]v1 [R]certificatesigningrequests/status
//  [G]events.k8s.io [V]v1 [R]events
//  [G]apiregistration.k8s.io [V]v1 [R]apiservices
//  [G]apiregistration.k8s.io [V]v1 [R]apiservices/status
//  [G] [V]v1 [R]bindings
//  [G] [V]v1 [R]componentstatuses
//  [G] [V]v1 [R]configmaps
//  [G] [V]v1 [R]endpoints
//  [G] [V]v1 [R]events
//  [G] [V]v1 [R]limitranges
//  [G] [V]v1 [R]namespaces
//  [G] [V]v1 [R]namespaces/finalize
//  [G] [V]v1 [R]namespaces/status
//  [G] [V]v1 [R]nodes
//  [G] [V]v1 [R]nodes/proxy
//  [G] [V]v1 [R]nodes/status
//  [G] [V]v1 [R]persistentvolumeclaims
//  [G] [V]v1 [R]persistentvolumeclaims/status
//  [G] [V]v1 [R]persistentvolumes
//  [G] [V]v1 [R]persistentvolumes/status
//  [G] [V]v1 [R]pods
//  [G] [V]v1 [R]pods/attach
//  [G] [V]v1 [R]pods/binding
//  [G] [V]v1 [R]pods/ephemeralcontainers
//  [G] [V]v1 [R]pods/eviction
//  [G] [V]v1 [R]pods/exec
//  [G] [V]v1 [R]pods/log
//  [G] [V]v1 [R]pods/portforward
//  [G] [V]v1 [R]pods/proxy
//  [G] [V]v1 [R]pods/status
//  [G] [V]v1 [R]podtemplates
//  [G] [V]v1 [R]replicationcontrollers
//  [G] [V]v1 [R]replicationcontrollers/scale
//  [G] [V]v1 [R]replicationcontrollers/status
//  [G] [V]v1 [R]resourcequotas
//  [G] [V]v1 [R]resourcequotas/status
//  [G] [V]v1 [R]secrets
//  [G] [V]v1 [R]serviceaccounts
//  [G] [V]v1 [R]serviceaccounts/token
//  [G] [V]v1 [R]services
//  [G] [V]v1 [R]services/proxy
//  [G] [V]v1 [R]services/status
//  [G]apps [V]v1 [R]controllerrevisions
//  [G]apps [V]v1 [R]daemonsets
//  [G]apps [V]v1 [R]daemonsets/status
//  [G]apps [V]v1 [R]deployments
//  [G]apps [V]v1 [R]deployments/scale
//  [G]apps [V]v1 [R]deployments/status
//  [G]apps [V]v1 [R]replicasets
//  [G]apps [V]v1 [R]replicasets/scale
//  [G]apps [V]v1 [R]replicasets/status
//  [G]apps [V]v1 [R]statefulsets
//  [G]apps [V]v1 [R]statefulsets/scale
//  [G]apps [V]v1 [R]statefulsets/status
//  [G]scheduling.k8s.io [V]v1 [R]priorityclasses
//  [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]flowschemas
//  [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]flowschemas/status
//  [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]prioritylevelconfigurations
//  [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]prioritylevelconfigurations/status
//  [G]policy [V]v1 [R]poddisruptionbudgets
//  [G]policy [V]v1 [R]poddisruptionbudgets/status
//  [G]rbac.authorization.k8s.io [V]v1 [R]clusterrolebindings
//  [G]rbac.authorization.k8s.io [V]v1 [R]clusterroles
//  [G]rbac.authorization.k8s.io [V]v1 [R]rolebindings
//  [G]rbac.authorization.k8s.io [V]v1 [R]roles
//  [G]batch [V]v1 [R]cronjobs
//  [G]batch [V]v1 [R]cronjobs/status
//  [G]batch [V]v1 [R]jobs
//  [G]batch [V]v1 [R]jobs/status
//  [G]autoscaling [V]v2 [R]horizontalpodautoscalers
//  [G]autoscaling [V]v2 [R]horizontalpodautoscalers/status
//  [G]apiextensions.k8s.io [V]v1 [R]customresourcedefinitions
//  [G]apiextensions.k8s.io [V]v1 [R]customresourcedefinitions/status

对于缓存发现客户端,又分为基于内存(memCacheClient)和基于磁盘(CachedDiscoveryClient)两种,其中 CachedDiscoveryClient 实际上套用了 memCacheClient ,而 memCacheClient 则套用了聚合发现 DiscoveryClient :

// k8s.io/client-go/discovery/cached/disk/cached_discovery.go

type CachedDiscoveryClient struct {
 // 内部套的缓存发现客户端,这里实际是 memCacheClient
 delegate discovery.DiscoveryInterface
 // 缓存的磁盘路径
 cacheDirectory string
 // 缓存有效期
 ttl time.Duration
 // ...省略
}

func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCacheDir, httpCacheDir string, ttl time.Duration) (*CachedDiscoveryClient, error) {
 if len(httpCacheDir) > 0 {
  config = restclient.CopyConfig(config)
  config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
   return newCacheRoundTripper(httpCacheDir, rt)
  })
 }

 // 聚合发现 DiscoveryClient
 discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
 if err != nil {
  return nil, err
 }

 // 构建基于磁盘的缓存发现,其中套了一层基于内存的,基于内存的里面套了一层基本的 DiscoveryClient
 return newCachedDiscoveryClient(memory.NewMemCacheClient(discoveryClient), discoveryCacheDir, ttl), nil
}

func newCachedDiscoveryClient(delegate discovery.DiscoveryInterface, cacheDirectory string, ttl time.Duration) *CachedDiscoveryClient {
 return &CachedDiscoveryClient{
  delegate:       delegate,
  cacheDirectory: cacheDirectory,
  ttl:            ttl,
  ourFiles:       map[string]struct{}{},
  fresh:          true,
 }
}

CachedDiscoveryClient 客户端的 ServerGroupsAndResources 方法实现:

// k8s.io/client-go/discovery/cached/disk/cached_discovery.go

func (d *CachedDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
 // 实际上调用的就是之前的聚合发现 DiscoveryClient 所调用的 discovery.ServerGroupsAndResources 方法
 return discovery.ServerGroupsAndResources(d)
}

看到之前的 discovery.ServerGroupsAndResources 方法:

// k8s.io/client-go/discovery/discovery_client.go

func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
 var sgs *metav1.APIGroupList
 var resources []*metav1.APIResourceList
 var failedGVs map[schema.GroupVersion]error
 var err error

 if ad, ok := d.(AggregatedDiscoveryInterface); ok {
  // 聚合发现客户端,上面讲过了,省略 ...
 } else {
  // 对于缓存发现客户端,调用其 ServerGroups 方法
  sgs, err = d.ServerGroups()
 }

 // ... 稍后继续
}

这里的 d 是 CachedDiscoveryClient ,看到其 ServerGroups 方法:

// k8s.io/client-go/discovery/cached/disk/cached_discovery.go

func (d *CachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
 // 从本地磁盘文件中查询缓存数据
 filename := filepath.Join(d.cacheDirectory, "servergroups.json")
 cachedBytes, err := d.getCachedFile(filename)
 if err == nil {
  // 从缓存中查询成功,解码后返回资源组
  cachedGroups := &metav1.APIGroupList{}
  if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedGroups); err == nil {
   klog.V(10).Infof("returning cached discovery info from %v", filename)
   return cachedGroups, nil
  }
 }

 // 未能从缓存中查询到,或缓存已过期,则调用内部 delegate 的 ServerGroups 方法
 // 这里 delegate 是 memCacheClient
 liveGroups, err := d.delegate.ServerGroups()
 if err != nil {
  klog.V(3).Infof("skipped caching discovery info due to %v", err)
  return liveGroups, err
 }
 if liveGroups == nil || len(liveGroups.Groups) == 0 {
  klog.V(3).Infof("skipped caching discovery info, no groups found")
  return liveGroups, err
 }

 // 将数据写入到本地磁盘缓存,以供下次查询时使用
 if err := d.writeCachedFile(filename, liveGroups); err != nil {
  klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
 }

 return liveGroups, nil
}

查询缓存文件和写入缓存文件的方式很简单,实际就是对文件的一个读写,以及 ttl 的判断,直接贴出:

func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
 // after invalidation ignore cache files not created by this process
 d.mutex.Lock()
 _, ourFile := d.ourFiles[filename]
 if d.invalidated && !ourFile {
  d.mutex.Unlock()
  return nil, errors.New("cache invalidated")
 }
 d.mutex.Unlock()

 file, err := os.Open(filename)
 if err != nil {
  return nil, err
 }
 defer file.Close()

 fileInfo, err := file.Stat()
 if err != nil {
  return nil, err
 }

 if time.Now().After(fileInfo.ModTime().Add(d.ttl)) {
  return nil, errors.New("cache expired")
 }

 // the cache is present and its valid.  Try to read and use it.
 cachedBytes, err := io.ReadAll(file)
 if err != nil {
  return nil, err
 }

 d.mutex.Lock()
 defer d.mutex.Unlock()
 d.fresh = d.fresh && ourFile

 return cachedBytes, nil
}

func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Object) error {
 if err := os.MkdirAll(filepath.Dir(filename), 0750); err != nil {
  return err
 }

 bytes, err := runtime.Encode(scheme.Codecs.LegacyCodec(), obj)
 if err != nil {
  return err
 }

 f, err := os.CreateTemp(filepath.Dir(filename), filepath.Base(filename)+".")
 if err != nil {
  return err
 }
 defer os.Remove(f.Name())
 _, err = f.Write(bytes)
 if err != nil {
  return err
 }

 err = os.Chmod(f.Name(), 0660)
 if err != nil {
  return err
 }

 name := f.Name()
 err = f.Close()
 if err != nil {
  return err
 }

 // atomic rename
 d.mutex.Lock()
 defer d.mutex.Unlock()
 err = os.Rename(name, filename)
 if err == nil {
  d.ourFiles[filename] = struct{}{}
 }
 return err
}

主要是 CachedDiscoveryClient 在缓存文件中查询不到数据或者缓存过期后,会调用 memCacheClient 的 ServerGroups 方法:

// k8s.io/client-go/discovery/cached/memory/memcache.go

// 资源信息缓存实体
type cacheEntry struct {
 resourceList *metav1.APIResourceList
 err          error
}

type memCacheClient struct {
 // 内部套的发现客户端,这里是最基本的聚合发现 DiscoveryClient
 delegate discovery.DiscoveryInterface

 lock                        sync.RWMutex
 // 资源信息缓存
 groupToServerResources      map[string]*cacheEntry
 // 资源组缓存
 groupList                   *metav1.APIGroupList
 cacheValid                  bool
 openapiClient               openapi.Client
 receivedAggregatedDiscovery bool
}

func (d *memCacheClient) ServerGroups() (*metav1.APIGroupList, error) {
 // 调用 GroupsAndMaybeResources 方法
 groups, _, _, err := d.GroupsAndMaybeResources()
 if err != nil {
  return nil, err
 }
 return groups, nil
}

func (d *memCacheClient) GroupsAndMaybeResources() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error, error) {
 d.lock.Lock()
 defer d.lock.Unlock()

 if !d.cacheValid {
  // 刷新内存的缓存状态
  if err := d.refreshLocked(); err != nil {
   return nilnilnil, err
  }
 }
 // 从内存中构造出需要返回的数据结构,省略
 return d.groupList, resourcesMap, failedGVs, nil
}

func (d *memCacheClient) refreshLocked() error {
 var gl *metav1.APIGroupList
 var err error

 // delegate 是最基本的聚合发现 DiscoveryClient
 if ad, ok := d.delegate.(discovery.AggregatedDiscoveryInterface); ok {
  var resources map[schema.GroupVersion]*metav1.APIResourceList
  var failedGVs map[schema.GroupVersion]error
  // 调用 DiscoveryClient 的 GroupsAndMaybeResources 方法
  // 最开始的时候讲过了
  // 通过 RESTClient 客户端调用 /api 和 /apis 获取到所有的资源信息聚合后返回
  gl, resources, failedGVs, err = ad.GroupsAndMaybeResources()
  if resources != nil && err == nil {
   // Cache the resources.
   d.groupToServerResources = map[string]*cacheEntry{}
   d.groupList = gl
   for gv, resources := range resources {
    d.groupToServerResources[gv.String()] = &cacheEntry{resources, nil}
   }
   // Cache GroupVersion discovery errors
   for gv, err := range failedGVs {
    d.groupToServerResources[gv.String()] = &cacheEntry{nil, err}
   }
   d.receivedAggregatedDiscovery = true
   d.cacheValid = true
   return nil
  }
 } else {
  gl, err = d.delegate.ServerGroups()
 }
 // ...
 return nil
}

memCacheClient 实际上就是调用 DiscoveryClient ,然后将数据缓存在内存中。

总结一下就是,CachedDiscoveryClient 首先从本地磁盘文件中取缓存的资源组信息,如果取不到或者过期了,就去 memCacheClient 的内存中取,内存中也取不到或者被刷新了,就调用 DiscoveryClient 向 kube-apiserver 发起请求获取。

回到 discovery.ServerGroupsAndResources 方法:

// k8s.io/client-go/discovery/discovery_client.go

func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
 var sgs *metav1.APIGroupList
 var resources []*metav1.APIResourceList
 var failedGVs map[schema.GroupVersion]error
 var err error

 if ad, ok := d.(AggregatedDiscoveryInterface); ok {
  // 聚合发现客户端,上面讲过了,省略 ...
 } else {
  // 对于缓存发现客户端,调用其 ServerGroups 方法,得到资源组 sgs
  sgs, err = d.ServerGroups()
 }

 if sgs == nil {
  return nilnil, err
 }
 // 调用 fetchGroupVersionResources 传入资源组 sgs 获取资源信息
 groupVersionResources, failedGroups := fetchGroupVersionResources(d, sgs)

 // order results by group/version discovery order
 result := []*metav1.APIResourceList{}
 for _, apiGroup := range sgs.Groups {
  for _, version := range apiGroup.Versions {
   gv := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
   if resources, ok := groupVersionResources[gv]; ok {
    result = append(result, resources)
   }
  }
 }

 if len(failedGroups) == 0 {
  return resultGroups, result, nil
 }

 return resultGroups, result, &ErrGroupDiscoveryFailed{Groups: failedGroups}
}

看到最终的 fetchGroupVersionResources 方法:

// k8s.io/client-go/discovery/discovery_client.go

func fetchGroupVersionResources(d DiscoveryInterface, apiGroups *metav1.APIGroupList) (map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error) {
 groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList)
 failedGroups := make(map[schema.GroupVersion]error)

 wg := &sync.WaitGroup{}
 resultLock := &sync.Mutex{}
 // 循环资源组
 for _, apiGroup := range apiGroups.Groups {
  // 循环资源版本
  for _, version := range apiGroup.Versions {
   // 根据资源组和资源版本去获取资源信息
   groupVersion := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
   wg.Add(1)
   go func() {
    defer wg.Done()
    defer utilruntime.HandleCrash()

    // 调用 CachedDiscoveryClient 的 ServerResourcesForGroupVersion 方法
    apiResourceList, err := d.ServerResourcesForGroupVersion(groupVersion.String())

    // lock to record results
    resultLock.Lock()
    defer resultLock.Unlock()

    if err != nil {
     // TODO: maybe restrict this to NotFound errors
     failedGroups[groupVersion] = err
    }
    if apiResourceList != nil {
     // even in case of error, some fallback might have been returned
     groupVersionResources[groupVersion] = apiResourceList
    }
   }()
  }
 }
 // 等待所有并发查询完成
 wg.Wait()

 return groupVersionResources, failedGroups
}

fetchGroupVersionResources 方法开启了并发去调用 ServerResourcesForGroupVersion 方法根据资源组版本来查询资源信息:

// k8s.io/client-go/discovery/cached/disk/cached_discovery.go

func (d *CachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
 // 从本地磁盘缓存文件中查询资源信息
 filename := filepath.Join(d.cacheDirectory, groupVersion, "serverresources.json")
 cachedBytes, err := d.getCachedFile(filename)
 // don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
 if err == nil {
  cachedResources := &metav1.APIResourceList{}
  if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedResources); err == nil {
   klog.V(10).Infof("returning cached discovery info from %v", filename)
   return cachedResources, nil
  }
 }

 // 如果缓存没有或者过期了,则调用 memCacheClient 的 ServerResourcesForGroupVersion 方法
 liveResources, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
 if err != nil {
  klog.V(3).Infof("skipped caching discovery info due to %v", err)
  return liveResources, err
 }
 if liveResources == nil || len(liveResources.APIResources) == 0 {
  klog.V(3).Infof("skipped caching discovery info, no resources found")
  return liveResources, err
 }

 // 将缓存写入本地磁盘缓存
 if err := d.writeCachedFile(filename, liveResources); err != nil {
  klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
 }

 return liveResources, nil
}

ServerResourcesForGroupVersion 方法和 ServerGroups 方法的逻辑是完全一样的,这里不重复了。感兴趣自己去看。

最后,基于缓存的发现客户端的使用示例如下:

package main

import (
 "fmt"
 "k8s.io/apimachinery/pkg/runtime/schema"
 "k8s.io/client-go/discovery/cached/disk"
 "k8s.io/client-go/tools/clientcmd"
 "time"
)

func main() {
 // 通过 kubeconfig 生成配置
 config, err := clientcmd.BuildConfigFromFlags(""".kube/config")
 if err != nil {
  panic(err)
 }

 // 创建 DiscoveryClient
 //discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)

 // 创建带缓存的 DiscoveryClient
 discoveryClient, err := disk.NewCachedDiscoveryClientForConfig(config, ".kube/cache"".kube/http-cache", time.Minute*10)
 if err != nil {
  panic(err.Error())
 }

 _, apiResources, err := discoveryClient.ServerGroupsAndResources()
 if err != nil {
  panic(err.Error())
 }

 for _, apiResource := range apiResources {
  gv, err := schema.ParseGroupVersion(apiResource.GroupVersion)
  if err != nil {
   panic(err.Error())
  }
  for _, resource := range apiResource.APIResources {
   fmt.Printf("[G]%v [V]%v [R]%v\n", gv.Group, gv.Version, resource.Name)
  }
 }
}

本地缓存目录如下:

本回完,下一回介绍 client-go 的 Informer 机制。

浏览 1916
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报