K8s client-go 的四种客户端
共 92358字,需浏览 185分钟
·
2023-09-07 21:04
前面都在讲 kube-apiserver ,后续 Kubernetes 其它组件,都会使用 client-go 库来调用 kube-apiserver 提供的 API 服务进行资源对象的操作。
Kubernetes 的源码 staging/src/k8s.io/client-go
中集成了 client-go ,所有组件都是调用 vendor 本地库的方式引入 client-go 库,不需要考虑和集群版本的兼容性,但如果是平时我们自己基于 Kubernetes 做二次开发用到 client-go 库时,则需要注意下所导入的库和 Kubernetes 版本的对应关系。
对于 Kubernetes 1.27.2 ,client-go 源码的部分目录如下:
根据不同功能,可以将其划分为四种客户端:
-
RESTClient:最基础的客户端,仅对 HTTP Request 进行了封装。实现位置在 rest 目录 -
ClientSet:基于 RESTClient 实现,封装了 Kubernetes 内置资源(Resource)和版本(Version)的方法。实现位置在 kubernetes 目录 -
DynamicClient:动态客户端,基于 RESTClient 实现,封装了 Kubernetes 任意资源(包括 CRD 自定义资源)和版本的方法。实现位置在 dynamic 目录 -
DiscoveryClient:发现客户端,基于 RESTClient 实现,用于发现 kube-apiserver 所支持的资源组(Group)、资源版本(Versions)和资源信息(Resources)。实现位置在 discovery 目录
RESTClient
作为最基础的客户端,其接口定义如下:
// k8s.io/client-go/rest/client.go
type Interface interface {
// 返回速率限制器
GetRateLimiter() flowcontrol.RateLimiter
// 构建 (POST, Put, Patch, Get, DELETE) 请求器
Verb(verb string) *Request
// 构建 POST 请求器
Post() *Request
// 构建 Put 请求器
Put() *Request
// 构建 Patch 请求器
Patch(pt types.PatchType) *Request
// 构建 Get 请求器
Get() *Request
// 构建 Delete 请求器
Delete() *Request
// 返回 API 版本
APIVersion() schema.GroupVersion
}
RESTClient 的实现很简单,主要提供一个对外的接口调用:
// k8s.io/client-go/rest/client.go
type RESTClient struct {
base *url.URL
versionedAPIPath string
content ClientContentConfig
createBackoffMgr func() BackoffManager
rateLimiter flowcontrol.RateLimiter
warningHandler WarningHandler
// http 客户端
Client *http.Client
}
func NewRESTClient(baseURL *url.URL, versionedAPIPath string, config ClientContentConfig, rateLimiter flowcontrol.RateLimiter, client *http.Client) (*RESTClient, error) {
if len(config.ContentType) == 0 {
config.ContentType = "application/json"
}
base := *baseURL
if !strings.HasSuffix(base.Path, "/") {
base.Path += "/"
}
base.RawQuery = ""
base.Fragment = ""
return &RESTClient{
base: &base,
versionedAPIPath: versionedAPIPath,
content: config,
createBackoffMgr: readExpBackoffConfig,
rateLimiter: rateLimiter,
Client: client,
}, nil
}
func (c *RESTClient) Verb(verb string) *Request {
// 构建请求器
return NewRequest(c).Verb(verb)
}
func (c *RESTClient) Post() *Request {
return c.Verb("POST")
}
// 其余请求器的构建方法省略,同 Post
核心的处理全在 Request 请求器上,来到构建请求器的 NewRequest 方法:
// k8s.io/client-go/rest/request.go
type Request struct {
c *RESTClient
namespace string
resource string
// 省略
}
// 传入 RESTClient 参数
func NewRequest(c *RESTClient) *Request {
// ...
r := &Request{
c: c,
// ...
}
switch {
case len(c.content.AcceptContentTypes) > 0:
r.SetHeader("Accept", c.content.AcceptContentTypes)
case len(c.content.ContentType) > 0:
r.SetHeader("Accept", c.content.ContentType+", */*")
}
return r
}
// 设置要访问的命名空间 (<resource>/[ns/<namespace>/]<name>)
func (r *Request) Namespace(namespace string) *Request {
if r.err != nil {
return r
}
if r.namespaceSet {
r.err = fmt.Errorf("namespace already set to %q, cannot change to %q", r.namespace, namespace)
return r
}
if msgs := IsValidPathSegmentName(namespace); len(msgs) != 0 {
r.err = fmt.Errorf("invalid namespace %q: %v", namespace, msgs)
return r
}
r.namespaceSet = true
r.namespace = namespace
return r
}
// 设置要访问的资源 (<resource>/[ns/<namespace>/]<name>)
func (r *Request) Resource(resource string) *Request {
if r.err != nil {
return r
}
if len(r.resource) != 0 {
r.err = fmt.Errorf("resource already set to %q, cannot change to %q", r.resource, resource)
return r
}
if msgs := IsValidPathSegmentName(resource); len(msgs) != 0 {
r.err = fmt.Errorf("invalid resource %q: %v", resource, msgs)
return r
}
r.resource = resource
return r
}
Request 请求器的本质就是对命名空间、资源等 K8s 的概念进行了一个通用的封装,通过调用相应的方法就可以构造出最终的请求 URL 和参数:
func (r *Request) URL() *url.URL {
p := r.pathPrefix
if r.namespaceSet && len(r.namespace) > 0 {
// 命名空间
p = path.Join(p, "namespaces", r.namespace)
}
if len(r.resource) != 0 {
// 资源
p = path.Join(p, strings.ToLower(r.resource))
}
// Join trims trailing slashes, so preserve r.pathPrefix's trailing slash for backwards compatibility if nothing was changed
if len(r.resourceName) != 0 || len(r.subpath) != 0 || len(r.subresource) != 0 {
p = path.Join(p, r.resourceName, r.subresource, r.subpath)
}
finalURL := &url.URL{}
if r.c.base != nil {
*finalURL = *r.c.base
}
finalURL.Path = p
query := url.Values{}
for key, values := range r.params {
for _, value := range values {
query.Add(key, value)
}
}
// timeout is handled specially here.
if r.timeout != 0 {
// 参数
query.Set("timeout", r.timeout.String())
}
finalURL.RawQuery = query.Encode()
return finalURL
}
最后调用 Do 方法发起请求,并返回请求的结果:
// 请求结果
type Result struct {
body []byte
warnings []net.WarningHeader
contentType string
err error
statusCode int
decoder runtime.Decoder
}
func (r *Request) Do(ctx context.Context) Result {
var result Result
// 发起请求
err := r.request(ctx, func(req *http.Request, resp *http.Response) {
result = r.transformResponse(resp, req)
})
if err != nil {
return Result{err: err}
}
if result.err == nil || len(result.body) > 0 {
metrics.ResponseSize.Observe(ctx, r.verb, r.URL().Host, float64(len(result.body)))
}
return result
}
func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Response)) error {
// ...
client := r.c.Client
// 如果 http 客户端没有初始化,则使用默认的 http 客户端
if client == nil {
client = http.DefaultClient
}
// ...
// 重试机制
retry := r.retryFn(r.maxRetries)
for {
if err := retry.Before(ctx, r); err != nil {
return retry.WrapPreviousError(err)
}
// 构造 http.Request 对象
req, err := r.newHTTPRequest(ctx)
if err != nil {
return err
}
// 调用 http 客户端的 Do 方法发起 HTTP 请求
resp, err := client.Do(req)
// The value -1 or a value of 0 with a non-nil Body indicates that the length is unknown.
// https://pkg.go.dev/net/http#Request
if req.ContentLength >= 0 && !(req.Body != nil && req.ContentLength == 0) {
metrics.RequestSize.Observe(ctx, r.verb, r.URL().Host, float64(req.ContentLength))
}
retry.After(ctx, r, resp, err)
done := func() bool {
defer readAndCloseResponseBody(resp)
// if the server returns an error in err, the response will be nil.
f := func(req *http.Request, resp *http.Response) {
if resp == nil {
return
}
fn(req, resp)
}
if retry.IsNextRetry(ctx, r, req, resp, err, isErrRetryableFunc) {
return false
}
f(req, resp)
return true
}()
if done {
return retry.WrapPreviousError(err)
}
}
}
func (r *Request) newHTTPRequest(ctx context.Context) (*http.Request, error) {
var body io.Reader
switch {
case r.body != nil && r.bodyBytes != nil:
return nil, fmt.Errorf("cannot set both body and bodyBytes")
case r.body != nil:
body = r.body
case r.bodyBytes != nil:
// Create a new reader specifically for this request.
// Giving each request a dedicated reader allows retries to avoid races resetting the request body.
body = bytes.NewReader(r.bodyBytes)
}
// 构造请求 url
url := r.URL().String()
// 初始化 http.Request 对象
req, err := http.NewRequest(r.verb, url, body)
if err != nil {
return nil, err
}
req = req.WithContext(ctx)
req.Header = r.headers
return req, nil
}
例如,我们要请求默认命名空间下的 Pod 资源,可以通过如下代码实现:
package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 通过 kubeconfig 生成配置
config, err := clientcmd.BuildConfigFromFlags("", ".kube/config")
if err != nil {
panic(err)
}
config.APIPath = "api"
// 手动指定版本
config.GroupVersion = &corev1.SchemeGroupVersion
config.NegotiatedSerializer = scheme.Codecs
// 创建 RESTClient
restClient, err := rest.RESTClientFor(config)
if err != nil {
panic(err)
}
// 查询 default 的 pod
result := &corev1.PodList{}
if err := restClient.
Get().
Namespace("default").
Resource("pods").
Do(context.TODO()).
Into(result); err != nil {
panic(err)
}
for _, item := range result.Items {
fmt.Printf("%v \t [%v]\n", item.Name, item.Status.Phase)
}
}
// $ go run main.go
// nginx [Pending]
ClientSet
如果清楚了 RESTClient 的使用,ClientSet 实际很简单,首先 ClientSet 对外提供了一层各种版本的接口:
// k8s.io/client-go/kubernetes/clientset.go
type Interface interface {
Discovery() discovery.DiscoveryInterface
AdmissionregistrationV1() admissionregistrationv1.AdmissionregistrationV1Interface
AdmissionregistrationV1alpha1() admissionregistrationv1alpha1.AdmissionregistrationV1alpha1Interface
AdmissionregistrationV1beta1() admissionregistrationv1beta1.AdmissionregistrationV1beta1Interface
InternalV1alpha1() internalv1alpha1.InternalV1alpha1Interface
AppsV1() appsv1.AppsV1Interface
AppsV1beta1() appsv1beta1.AppsV1beta1Interface
AppsV1beta2() appsv1beta2.AppsV1beta2Interface
AuthenticationV1() authenticationv1.AuthenticationV1Interface
AuthenticationV1alpha1() authenticationv1alpha1.AuthenticationV1alpha1Interface
AuthenticationV1beta1() authenticationv1beta1.AuthenticationV1beta1Interface
AuthorizationV1() authorizationv1.AuthorizationV1Interface
AuthorizationV1beta1() authorizationv1beta1.AuthorizationV1beta1Interface
AutoscalingV1() autoscalingv1.AutoscalingV1Interface
AutoscalingV2() autoscalingv2.AutoscalingV2Interface
AutoscalingV2beta1() autoscalingv2beta1.AutoscalingV2beta1Interface
AutoscalingV2beta2() autoscalingv2beta2.AutoscalingV2beta2Interface
BatchV1() batchv1.BatchV1Interface
BatchV1beta1() batchv1beta1.BatchV1beta1Interface
CertificatesV1() certificatesv1.CertificatesV1Interface
CertificatesV1beta1() certificatesv1beta1.CertificatesV1beta1Interface
CertificatesV1alpha1() certificatesv1alpha1.CertificatesV1alpha1Interface
CoordinationV1beta1() coordinationv1beta1.CoordinationV1beta1Interface
CoordinationV1() coordinationv1.CoordinationV1Interface
CoreV1() corev1.CoreV1Interface
DiscoveryV1() discoveryv1.DiscoveryV1Interface
DiscoveryV1beta1() discoveryv1beta1.DiscoveryV1beta1Interface
EventsV1() eventsv1.EventsV1Interface
EventsV1beta1() eventsv1beta1.EventsV1beta1Interface
ExtensionsV1beta1() extensionsv1beta1.ExtensionsV1beta1Interface
FlowcontrolV1alpha1() flowcontrolv1alpha1.FlowcontrolV1alpha1Interface
FlowcontrolV1beta1() flowcontrolv1beta1.FlowcontrolV1beta1Interface
FlowcontrolV1beta2() flowcontrolv1beta2.FlowcontrolV1beta2Interface
FlowcontrolV1beta3() flowcontrolv1beta3.FlowcontrolV1beta3Interface
NetworkingV1() networkingv1.NetworkingV1Interface
NetworkingV1alpha1() networkingv1alpha1.NetworkingV1alpha1Interface
NetworkingV1beta1() networkingv1beta1.NetworkingV1beta1Interface
NodeV1() nodev1.NodeV1Interface
NodeV1alpha1() nodev1alpha1.NodeV1alpha1Interface
NodeV1beta1() nodev1beta1.NodeV1beta1Interface
PolicyV1() policyv1.PolicyV1Interface
PolicyV1beta1() policyv1beta1.PolicyV1beta1Interface
RbacV1() rbacv1.RbacV1Interface
RbacV1beta1() rbacv1beta1.RbacV1beta1Interface
RbacV1alpha1() rbacv1alpha1.RbacV1alpha1Interface
ResourceV1alpha2() resourcev1alpha2.ResourceV1alpha2Interface
SchedulingV1alpha1() schedulingv1alpha1.SchedulingV1alpha1Interface
SchedulingV1beta1() schedulingv1beta1.SchedulingV1beta1Interface
SchedulingV1() schedulingv1.SchedulingV1Interface
StorageV1beta1() storagev1beta1.StorageV1beta1Interface
StorageV1() storagev1.StorageV1Interface
StorageV1alpha1() storagev1alpha1.StorageV1alpha1Interface
}
以 CoreV1 为例,又继续提供了一层 CoreV1 版本的资源对象接口,其中内置了 RESTClient 客户端:
// k8s.io/client-go/kubernetes/typed/core/v1/core_client.go
type CoreV1Interface interface {
RESTClient() rest.Interface
ComponentStatusesGetter
ConfigMapsGetter
EndpointsGetter
EventsGetter
LimitRangesGetter
NamespacesGetter
NodesGetter
PersistentVolumesGetter
PersistentVolumeClaimsGetter
PodsGetter
PodTemplatesGetter
ReplicationControllersGetter
ResourceQuotasGetter
SecretsGetter
ServicesGetter
ServiceAccountsGetter
}
以 PodsGetter 接口为例,该接口对 Pod 资源的各种操作进行了封装:
// k8s.io/client-go/kubernetes/typed/core/v1/pod.go
type PodsGetter interface {
Pods(namespace string) PodInterface
}
type PodInterface interface {
Create(ctx context.Context, pod *v1.Pod, opts metav1.CreateOptions) (*v1.Pod, error)
Update(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
UpdateStatus(ctx context.Context, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Pod, error)
List(ctx context.Context, opts metav1.ListOptions) (*v1.PodList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.Pod, err error)
Apply(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
ApplyStatus(ctx context.Context, pod *corev1.PodApplyConfiguration, opts metav1.ApplyOptions) (result *v1.Pod, err error)
UpdateEphemeralContainers(ctx context.Context, podName string, pod *v1.Pod, opts metav1.UpdateOptions) (*v1.Pod, error)
PodExpansion
}
对于 Pod 资源 List 方法的实现:
type pods struct {
client rest.Interface
ns string
}
func (c *pods) List(ctx context.Context, opts metav1.ListOptions) (result *v1.PodList, err error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result = &v1.PodList{}
err = c.client.Get().
Namespace(c.ns).
Resource("pods").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do(ctx).
Into(result)
return
}
实际就是我们之前使用 RESTClient 获取 Pod 资源列表的写法。
基于对各种资源操作的封装,现在使用 ClientSet 客户端来获取 Pod 的写法就简单多了:
package main
import (
"context"
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 通过 kubeconfig 生成配置
config, err := clientcmd.BuildConfigFromFlags("", ".kube/config")
if err != nil {
panic(err)
}
// 创建 ClientSet
clientSet, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
// 查询 default 的 pod
pods, err := clientSet.
CoreV1().
Pods("default").
List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err)
}
for _, pod := range pods.Items {
fmt.Printf("%v \t [%v]\n", pod.Name, pod.Status.Phase)
}
}
// $ go run main.go
// nginx [Pending]
DynamicClient
从 ClientSet 客户端的源码实现就可以看出,它只支持 K8s 内置资源的操作。对于 CRD 自定义资源,就需要使用 DynamicClient 动态客户端。
接口定义如下:
// k8s.io/client-go/dynamic/interface.go
type Interface interface {
Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface
}
type ResourceInterface interface {
Create(ctx context.Context, obj *unstructured.Unstructured, options metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error)
Update(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error)
UpdateStatus(ctx context.Context, obj *unstructured.Unstructured, options metav1.UpdateOptions) (*unstructured.Unstructured, error)
Delete(ctx context.Context, name string, options metav1.DeleteOptions, subresources ...string) error
DeleteCollection(ctx context.Context, options metav1.DeleteOptions, listOptions metav1.ListOptions) error
Get(ctx context.Context, name string, options metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error)
List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, options metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error)
Apply(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions, subresources ...string) (*unstructured.Unstructured, error)
ApplyStatus(ctx context.Context, name string, obj *unstructured.Unstructured, options metav1.ApplyOptions) (*unstructured.Unstructured, error)
}
type NamespaceableResourceInterface interface {
Namespace(string) ResourceInterface
ResourceInterface
}
DynamicClient 客户端的实现,同样内置 RESTClient 客户端:
// k8s.io/client-go/dynamic/simple.go
type DynamicClient struct {
client rest.Interface
}
func (c *DynamicClient) Resource(resource schema.GroupVersionResource) NamespaceableResourceInterface {
return &dynamicResourceClient{client: c, resource: resource}
}
具体的资源操作实现在 dynamicResourceClient ,以 List 方法为例:
type dynamicResourceClient struct {
client *DynamicClient
namespace string
resource schema.GroupVersionResource
}
func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
if err := validateNamespaceWithOptionalName(c.namespace); err != nil {
return nil, err
}
// 调用 RESTClient 客户端
result := c.client.client.Get().AbsPath(c.makeURLSegments("")...).SpecificallyVersionedParams(&opts, dynamicParameterCodec, versionV1).Do(ctx)
if err := result.Error(); err != nil {
return nil, err
}
// 返回 []byte 类型的结果
retBytes, err := result.Raw()
if err != nil {
return nil, err
}
// 将结果解码到 UnstructuredList 类型(无法预知的数据结构)
uncastObj, err := runtime.Decode(unstructured.UnstructuredJSONScheme, retBytes)
if err != nil {
return nil, err
}
if list, ok := uncastObj.(*unstructured.UnstructuredList); ok {
return list, nil
}
list, err := uncastObj.(*unstructured.Unstructured).ToList()
if err != nil {
return nil, err
}
return list, nil
}
DynamicClient 客户端本质上也是调用 RESTClient 客户端,只是最后返回的请求结果是一个 Unstructured 或 UnstructuredList 类型(无法预知的数据结构),这也是为什么被称为动态客户端。
type UnstructuredList struct {
Object map[string]interface{}
// Items is a list of unstructured objects.
Items []Unstructured `json:"items"`
}
type Unstructured struct {
// Object is a JSON compatible map with string, float, int, bool, []interface{}, or
// map[string]interface{}
// children.
Object map[string]interface{}
}
DynamicClient 可以处理 ClientSet 无法处理的 CRD 资源,既是优点也是缺点,动态就意味着缺乏类型安全,返回的请求结果也需要通过反射才能转换成实际的结构体类型,处理起来就略微麻烦了点:
package main
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 通过 kubeconfig 生成配置
config, err := clientcmd.BuildConfigFromFlags("", ".kube/config")
if err != nil {
panic(err)
}
// 创建 DynamicClient
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// 查询 default 的 pod
unstructuredList, err := dynamicClient.Resource(schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}).Namespace("default").List(context.TODO(), metav1.ListOptions{})
if err != nil {
panic(err.Error())
}
// 使用反射将 unstructuredList 的数据转成对应的结构体类型 corev1.PodList
pods := &corev1.PodList{}
if err = runtime.DefaultUnstructuredConverter.FromUnstructured(
unstructuredList.UnstructuredContent(),
pods,
); err != nil {
panic(err.Error())
}
for _, pod := range pods.Items {
fmt.Printf("%v \t [%v]\n", pod.Name, pod.Status.Phase)
}
}
// $ go run main.go
// nginx [Pending]
DiscoveryClient
在上面示例中,请求某个资源时需要指定 GVR :资源组(Group)、资源版本(Versions)和资源信息(Resources)。例如 Pod 资源对应的 GVR 为:
schema.GroupVersionResource{
Group: "",
Version: "v1",
Resource: "pods",
}
在 K8s 中有很多的资源对象,我们不可能全部记住,除了翻阅官方文档,还可以通过 DiscoveryClient 客户端获取,我们熟悉的 kubectl api-resources
命令实际也是通过该客户端获取的:
$ kubectl api-resources
NAME SHORTNAMES APIVERSION NAMESPACED KIND
bindings v1 true Binding
componentstatuses cs v1 false ComponentStatus
configmaps cm v1 true ConfigMap
endpoints ep v1 true Endpoints
events ev v1 true Event
limitranges limits v1 true LimitRange
namespaces ns v1 false Namespace
nodes no v1 false Node
persistentvolumeclaims pvc v1 true PersistentVolumeClaim
persistentvolumes pv v1 false PersistentVolume
pods po v1 true Pod
podtemplates v1 true PodTemplate
replicationcontrollers rc v1 true ReplicationController
resourcequotas quota v1 true ResourceQuota
secrets v1 true Secret
serviceaccounts sa v1 true ServiceAccount
services svc v1 true Service
mutatingwebhookconfigurations admissionregistration.k8s.io/v1 false MutatingWebhookConfiguration
validatingwebhookconfigurations admissionregistration.k8s.io/v1 false ValidatingWebhookConfiguration
customresourcedefinitions crd,crds apiextensions.k8s.io/v1 false CustomResourceDefinition
apiservices apiregistration.k8s.io/v1 false APIService
controllerrevisions apps/v1 true ControllerRevision
daemonsets ds apps/v1 true DaemonSet
deployments deploy apps/v1 true Deployment
replicasets rs apps/v1 true ReplicaSet
statefulsets sts apps/v1 true StatefulSet
tokenreviews authentication.k8s.io/v1 false TokenReview
localsubjectaccessreviews authorization.k8s.io/v1 true LocalSubjectAccessReview
selfsubjectaccessreviews authorization.k8s.io/v1 false SelfSubjectAccessReview
selfsubjectrulesreviews authorization.k8s.io/v1 false SelfSubjectRulesReview
subjectaccessreviews authorization.k8s.io/v1 false SubjectAccessReview
horizontalpodautoscalers hpa autoscaling/v2 true HorizontalPodAutoscaler
cronjobs cj batch/v1 true CronJob
jobs batch/v1 true Job
certificatesigningrequests csr certificates.k8s.io/v1 false CertificateSigningRequest
leases coordination.k8s.io/v1 true Lease
endpointslices discovery.k8s.io/v1 true EndpointSlice
events ev events.k8s.io/v1 true Event
flowschemas flowcontrol.apiserver.k8s.io/v1beta3 false FlowSchema
prioritylevelconfigurations flowcontrol.apiserver.k8s.io/v1beta3 false PriorityLevelConfiguration
ingressclasses networking.k8s.io/v1 false IngressClass
ingresses ing networking.k8s.io/v1 true Ingress
networkpolicies netpol networking.k8s.io/v1 true NetworkPolicy
runtimeclasses node.k8s.io/v1 false RuntimeClass
poddisruptionbudgets pdb policy/v1 true PodDisruptionBudget
clusterrolebindings rbac.authorization.k8s.io/v1 false ClusterRoleBinding
clusterroles rbac.authorization.k8s.io/v1 false ClusterRole
rolebindings rbac.authorization.k8s.io/v1 true RoleBinding
roles rbac.authorization.k8s.io/v1 true Role
priorityclasses pc scheduling.k8s.io/v1 false PriorityClass
csidrivers storage.k8s.io/v1 false CSIDriver
csinodes storage.k8s.io/v1 false CSINode
csistoragecapacities storage.k8s.io/v1 true CSIStorageCapacity
storageclasses sc storage.k8s.io/v1 false StorageClass
volumeattachments storage.k8s.io/v1 false VolumeAttachment
以下是 DiscoveryClient 客户端的接口定义,可以分为聚合发现和缓存发现两种,同样都内置了 RESTClient 客户端:
// k8s.io/client-go/discovery/discovery_client.go
// 聚合发现
type AggregatedDiscoveryInterface interface {
DiscoveryInterface
GroupsAndMaybeResources() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error, error)
}
// 缓存发现
type CachedDiscoveryInterface interface {
DiscoveryInterface
Fresh() bool
Invalidate()
}
type DiscoveryInterface interface {
RESTClient() restclient.Interface
ServerGroupsInterface
ServerResourcesInterface
ServerVersionInterface
OpenAPISchemaInterface
OpenAPIV3SchemaInterface
// Returns copy of current discovery client that will only
// receive the legacy discovery format, or pointer to current
// discovery client if it does not support legacy-only discovery.
WithLegacy() DiscoveryInterface
}
type ServerGroupsInterface interface {
// 返回资源组
ServerGroups() (*metav1.APIGroupList, error)
}
type ServerResourcesInterface interface {
// 返回指定资源组和版本的资源信息
ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error)
// 返回所有资源组和资源信息
ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error)
// 返回所有资源信息(首选版本)
ServerPreferredResources() ([]*metav1.APIResourceList, error)
// 返回所有支持命名空间的资源信息(首选版本)
ServerPreferredNamespacedResources() ([]*metav1.APIResourceList, error)
}
type ServerVersionInterface interface {
ServerVersion() (*version.Info, error)
}
type OpenAPISchemaInterface interface {
OpenAPISchema() (*openapi_v2.Document, error)
}
type OpenAPIV3SchemaInterface interface {
OpenAPIV3() openapi.Client
}
先看聚合发现客户端的 ServerGroupsAndResources 方法实现,即最基本的 DiscoveryClient :
type DiscoveryClient struct {
restClient restclient.Interface
LegacyPrefix string
// Forces the client to request only "unaggregated" (legacy) discovery.
UseLegacyDiscovery bool
}
var _ AggregatedDiscoveryInterface = &DiscoveryClient{}
func (d *DiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
return withRetries(defaultRetries, func() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
// 调用 ServerGroupsAndResources 方法
return ServerGroupsAndResources(d)
})
}
func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
var sgs *metav1.APIGroupList
var resources []*metav1.APIResourceList
var failedGVs map[schema.GroupVersion]error
var err error
if ad, ok := d.(AggregatedDiscoveryInterface); ok {
// 如果是聚合发现客户端
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
// 调用 GroupsAndMaybeResources 方法
sgs, resourcesByGV, failedGVs, err = ad.GroupsAndMaybeResources()
for _, resourceList := range resourcesByGV {
// 资源信息
resources = append(resources, resourceList)
}
} else {
sgs, err = d.ServerGroups()
}
if sgs == nil {
return nil, nil, err
}
resultGroups := []*metav1.APIGroup{}
for i := range sgs.Groups {
resultGroups = append(resultGroups, &sgs.Groups[i])
}
if resources != nil {
var ferr error
if len(failedGVs) > 0 {
ferr = &ErrGroupDiscoveryFailed{Groups: failedGVs}
}
// 如果资源信息不为空,则返回
return resultGroups, resources, ferr
}
// 如果没有查询到资源信息,继续往下处理,此时主要为缓存发现的逻辑,暂时略过
// ...
}
对于 DiscoveryClient 聚合发现的主要实现在 GroupsAndMaybeResources 方法:
func (d *DiscoveryClient) GroupsAndMaybeResources() (
*metav1.APIGroupList,
map[schema.GroupVersion]*metav1.APIResourceList,
map[schema.GroupVersion]error,
error) {
// 首先从 /api 下载遗留的组和资源
groups, resources, failedGVs, err := d.downloadLegacy()
if err != nil {
return nil, nil, nil, err
}
// 从 /apis 下载发现的组和(可能的)资源
apiGroups, apiResources, failedApisGVs, aerr := d.downloadAPIs()
if aerr != nil {
return nil, nil, nil, aerr
}
// 将从 /apis 下载的组合并到遗留的组中
for _, group := range apiGroups.Groups {
groups.Groups = append(groups.Groups, group)
}
// 仅当两个端点都返回资源时,才返回资源
if resources != nil && apiResources != nil {
for gv, resourceList := range apiResources {
resources[gv] = resourceList
}
} else if resources != nil {
resources = nil
}
// 将从 /api 和 /apis 下载的失败的 GroupVersion 合并
for gv, err := range failedApisGVs {
failedGVs[gv] = err
}
return groups, resources, failedGVs, err
}
func (d *DiscoveryClient) downloadLegacy() (
*metav1.APIGroupList,
map[schema.GroupVersion]*metav1.APIResourceList,
map[schema.GroupVersion]error,
error) {
accept := acceptDiscoveryFormats
if d.UseLegacyDiscovery {
accept = AcceptV1
}
var responseContentType string
// 使用 RESTClient 客户端发起请求
body, err := d.restClient.Get().
AbsPath("/api").
SetHeader("Accept", accept).
Do(context.TODO()).
ContentType(&responseContentType).
Raw()
apiGroupList := &metav1.APIGroupList{}
failedGVs := map[schema.GroupVersion]error{}
if err != nil {
// Tolerate 404, since aggregated api servers can return it.
if errors.IsNotFound(err) {
// Return empty structures and no error.
emptyGVMap := map[schema.GroupVersion]*metav1.APIResourceList{}
return apiGroupList, emptyGVMap, failedGVs, nil
} else {
return nil, nil, nil, err
}
}
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
// Switch on content-type server responded with: aggregated or unaggregated.
switch {
case isV2Beta1ContentType(responseContentType):
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
err = json.Unmarshal(body, &aggregatedDiscovery)
if err != nil {
return nil, nil, nil, err
}
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
default:
// Default is unaggregated discovery v1.
var v metav1.APIVersions
err = json.Unmarshal(body, &v)
if err != nil {
return nil, nil, nil, err
}
apiGroup := metav1.APIGroup{}
if len(v.Versions) != 0 {
apiGroup = apiVersionsToAPIGroup(&v)
}
apiGroupList.Groups = []metav1.APIGroup{apiGroup}
}
return apiGroupList, resourcesByGV, failedGVs, nil
}
func (d *DiscoveryClient) downloadAPIs() (
*metav1.APIGroupList,
map[schema.GroupVersion]*metav1.APIResourceList,
map[schema.GroupVersion]error,
error) {
accept := acceptDiscoveryFormats
if d.UseLegacyDiscovery {
accept = AcceptV1
}
var responseContentType string
// 使用 RESTClient 客户端发起请求
body, err := d.restClient.Get().
AbsPath("/apis").
SetHeader("Accept", accept).
Do(context.TODO()).
ContentType(&responseContentType).
Raw()
if err != nil {
return nil, nil, nil, err
}
apiGroupList := &metav1.APIGroupList{}
failedGVs := map[schema.GroupVersion]error{}
var resourcesByGV map[schema.GroupVersion]*metav1.APIResourceList
// Switch on content-type server responded with: aggregated or unaggregated.
switch {
case isV2Beta1ContentType(responseContentType):
var aggregatedDiscovery apidiscovery.APIGroupDiscoveryList
err = json.Unmarshal(body, &aggregatedDiscovery)
if err != nil {
return nil, nil, nil, err
}
apiGroupList, resourcesByGV, failedGVs = SplitGroupsAndResources(aggregatedDiscovery)
default:
// Default is unaggregated discovery v1.
err = json.Unmarshal(body, apiGroupList)
if err != nil {
return nil, nil, nil, err
}
}
return apiGroupList, resourcesByGV, failedGVs, nil
}
可以看到 DiscoveryClient 客户端就是通过 RESTClient 客户端调用 /api 和 /apis 获取到所有的资源信息聚合后返回。
使用示例如下:
package main
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/tools/clientcmd"
)
func main() {
// 通过 kubeconfig 生成配置
config, err := clientcmd.BuildConfigFromFlags("", ".kube/config")
if err != nil {
panic(err)
}
// 创建 DiscoveryClient
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
panic(err.Error())
}
_, apiResources, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
panic(err.Error())
}
for _, apiResource := range apiResources {
gv, err := schema.ParseGroupVersion(apiResource.GroupVersion)
if err != nil {
panic(err.Error())
}
for _, resource := range apiResource.APIResources {
fmt.Printf("[G]%v [V]%v [R]%v\n", gv.Group, gv.Version, resource.Name)
}
}
}
// $ go run main.go
// [G]node.k8s.io [V]v1 [R]runtimeclasses
// [G]authorization.k8s.io [V]v1 [R]localsubjectaccessreviews
// [G]authorization.k8s.io [V]v1 [R]selfsubjectaccessreviews
// [G]authorization.k8s.io [V]v1 [R]selfsubjectrulesreviews
// [G]authorization.k8s.io [V]v1 [R]subjectaccessreviews
// [G]discovery.k8s.io [V]v1 [R]endpointslices
// [G]storage.k8s.io [V]v1 [R]csidrivers
// [G]storage.k8s.io [V]v1 [R]csinodes
// [G]storage.k8s.io [V]v1 [R]csistoragecapacities
// [G]storage.k8s.io [V]v1 [R]storageclasses
// [G]storage.k8s.io [V]v1 [R]volumeattachments
// [G]storage.k8s.io [V]v1 [R]volumeattachments/status
// [G]coordination.k8s.io [V]v1 [R]leases
// [G]autoscaling [V]v1 [R]horizontalpodautoscalers
// [G]autoscaling [V]v1 [R]horizontalpodautoscalers/status
// [G]authentication.k8s.io [V]v1 [R]tokenreviews
// [G]networking.k8s.io [V]v1 [R]ingressclasses
// [G]networking.k8s.io [V]v1 [R]ingresses
// [G]networking.k8s.io [V]v1 [R]ingresses/status
// [G]networking.k8s.io [V]v1 [R]networkpolicies
// [G]networking.k8s.io [V]v1 [R]networkpolicies/status
// [G]admissionregistration.k8s.io [V]v1 [R]mutatingwebhookconfigurations
// [G]admissionregistration.k8s.io [V]v1 [R]validatingwebhookconfigurations
// [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]flowschemas
// [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]flowschemas/status
// [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]prioritylevelconfigurations
// [G]flowcontrol.apiserver.k8s.io [V]v1beta3 [R]prioritylevelconfigurations/status
// [G]certificates.k8s.io [V]v1 [R]certificatesigningrequests
// [G]certificates.k8s.io [V]v1 [R]certificatesigningrequests/approval
// [G]certificates.k8s.io [V]v1 [R]certificatesigningrequests/status
// [G]events.k8s.io [V]v1 [R]events
// [G]apiregistration.k8s.io [V]v1 [R]apiservices
// [G]apiregistration.k8s.io [V]v1 [R]apiservices/status
// [G] [V]v1 [R]bindings
// [G] [V]v1 [R]componentstatuses
// [G] [V]v1 [R]configmaps
// [G] [V]v1 [R]endpoints
// [G] [V]v1 [R]events
// [G] [V]v1 [R]limitranges
// [G] [V]v1 [R]namespaces
// [G] [V]v1 [R]namespaces/finalize
// [G] [V]v1 [R]namespaces/status
// [G] [V]v1 [R]nodes
// [G] [V]v1 [R]nodes/proxy
// [G] [V]v1 [R]nodes/status
// [G] [V]v1 [R]persistentvolumeclaims
// [G] [V]v1 [R]persistentvolumeclaims/status
// [G] [V]v1 [R]persistentvolumes
// [G] [V]v1 [R]persistentvolumes/status
// [G] [V]v1 [R]pods
// [G] [V]v1 [R]pods/attach
// [G] [V]v1 [R]pods/binding
// [G] [V]v1 [R]pods/ephemeralcontainers
// [G] [V]v1 [R]pods/eviction
// [G] [V]v1 [R]pods/exec
// [G] [V]v1 [R]pods/log
// [G] [V]v1 [R]pods/portforward
// [G] [V]v1 [R]pods/proxy
// [G] [V]v1 [R]pods/status
// [G] [V]v1 [R]podtemplates
// [G] [V]v1 [R]replicationcontrollers
// [G] [V]v1 [R]replicationcontrollers/scale
// [G] [V]v1 [R]replicationcontrollers/status
// [G] [V]v1 [R]resourcequotas
// [G] [V]v1 [R]resourcequotas/status
// [G] [V]v1 [R]secrets
// [G] [V]v1 [R]serviceaccounts
// [G] [V]v1 [R]serviceaccounts/token
// [G] [V]v1 [R]services
// [G] [V]v1 [R]services/proxy
// [G] [V]v1 [R]services/status
// [G]apps [V]v1 [R]controllerrevisions
// [G]apps [V]v1 [R]daemonsets
// [G]apps [V]v1 [R]daemonsets/status
// [G]apps [V]v1 [R]deployments
// [G]apps [V]v1 [R]deployments/scale
// [G]apps [V]v1 [R]deployments/status
// [G]apps [V]v1 [R]replicasets
// [G]apps [V]v1 [R]replicasets/scale
// [G]apps [V]v1 [R]replicasets/status
// [G]apps [V]v1 [R]statefulsets
// [G]apps [V]v1 [R]statefulsets/scale
// [G]apps [V]v1 [R]statefulsets/status
// [G]scheduling.k8s.io [V]v1 [R]priorityclasses
// [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]flowschemas
// [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]flowschemas/status
// [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]prioritylevelconfigurations
// [G]flowcontrol.apiserver.k8s.io [V]v1beta2 [R]prioritylevelconfigurations/status
// [G]policy [V]v1 [R]poddisruptionbudgets
// [G]policy [V]v1 [R]poddisruptionbudgets/status
// [G]rbac.authorization.k8s.io [V]v1 [R]clusterrolebindings
// [G]rbac.authorization.k8s.io [V]v1 [R]clusterroles
// [G]rbac.authorization.k8s.io [V]v1 [R]rolebindings
// [G]rbac.authorization.k8s.io [V]v1 [R]roles
// [G]batch [V]v1 [R]cronjobs
// [G]batch [V]v1 [R]cronjobs/status
// [G]batch [V]v1 [R]jobs
// [G]batch [V]v1 [R]jobs/status
// [G]autoscaling [V]v2 [R]horizontalpodautoscalers
// [G]autoscaling [V]v2 [R]horizontalpodautoscalers/status
// [G]apiextensions.k8s.io [V]v1 [R]customresourcedefinitions
// [G]apiextensions.k8s.io [V]v1 [R]customresourcedefinitions/status
对于缓存发现客户端,又分为基于内存(memCacheClient)和基于磁盘(CachedDiscoveryClient)两种,其中 CachedDiscoveryClient 实际上套用了 memCacheClient ,而 memCacheClient 则套用了聚合发现 DiscoveryClient :
// k8s.io/client-go/discovery/cached/disk/cached_discovery.go
type CachedDiscoveryClient struct {
// 内部套的缓存发现客户端,这里实际是 memCacheClient
delegate discovery.DiscoveryInterface
// 缓存的磁盘路径
cacheDirectory string
// 缓存有效期
ttl time.Duration
// ...省略
}
func NewCachedDiscoveryClientForConfig(config *restclient.Config, discoveryCacheDir, httpCacheDir string, ttl time.Duration) (*CachedDiscoveryClient, error) {
if len(httpCacheDir) > 0 {
config = restclient.CopyConfig(config)
config.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return newCacheRoundTripper(httpCacheDir, rt)
})
}
// 聚合发现 DiscoveryClient
discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, err
}
// 构建基于磁盘的缓存发现,其中套了一层基于内存的,基于内存的里面套了一层基本的 DiscoveryClient
return newCachedDiscoveryClient(memory.NewMemCacheClient(discoveryClient), discoveryCacheDir, ttl), nil
}
func newCachedDiscoveryClient(delegate discovery.DiscoveryInterface, cacheDirectory string, ttl time.Duration) *CachedDiscoveryClient {
return &CachedDiscoveryClient{
delegate: delegate,
cacheDirectory: cacheDirectory,
ttl: ttl,
ourFiles: map[string]struct{}{},
fresh: true,
}
}
CachedDiscoveryClient 客户端的 ServerGroupsAndResources 方法实现:
// k8s.io/client-go/discovery/cached/disk/cached_discovery.go
func (d *CachedDiscoveryClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
// 实际上调用的就是之前的聚合发现 DiscoveryClient 所调用的 discovery.ServerGroupsAndResources 方法
return discovery.ServerGroupsAndResources(d)
}
看到之前的 discovery.ServerGroupsAndResources 方法:
// k8s.io/client-go/discovery/discovery_client.go
func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
var sgs *metav1.APIGroupList
var resources []*metav1.APIResourceList
var failedGVs map[schema.GroupVersion]error
var err error
if ad, ok := d.(AggregatedDiscoveryInterface); ok {
// 聚合发现客户端,上面讲过了,省略 ...
} else {
// 对于缓存发现客户端,调用其 ServerGroups 方法
sgs, err = d.ServerGroups()
}
// ... 稍后继续
}
这里的 d 是 CachedDiscoveryClient ,看到其 ServerGroups 方法:
// k8s.io/client-go/discovery/cached/disk/cached_discovery.go
func (d *CachedDiscoveryClient) ServerGroups() (*metav1.APIGroupList, error) {
// 从本地磁盘文件中查询缓存数据
filename := filepath.Join(d.cacheDirectory, "servergroups.json")
cachedBytes, err := d.getCachedFile(filename)
if err == nil {
// 从缓存中查询成功,解码后返回资源组
cachedGroups := &metav1.APIGroupList{}
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedGroups); err == nil {
klog.V(10).Infof("returning cached discovery info from %v", filename)
return cachedGroups, nil
}
}
// 未能从缓存中查询到,或缓存已过期,则调用内部 delegate 的 ServerGroups 方法
// 这里 delegate 是 memCacheClient
liveGroups, err := d.delegate.ServerGroups()
if err != nil {
klog.V(3).Infof("skipped caching discovery info due to %v", err)
return liveGroups, err
}
if liveGroups == nil || len(liveGroups.Groups) == 0 {
klog.V(3).Infof("skipped caching discovery info, no groups found")
return liveGroups, err
}
// 将数据写入到本地磁盘缓存,以供下次查询时使用
if err := d.writeCachedFile(filename, liveGroups); err != nil {
klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
}
return liveGroups, nil
}
查询缓存文件和写入缓存文件的方式很简单,实际就是对文件的一个读写,以及 ttl 的判断,直接贴出:
func (d *CachedDiscoveryClient) getCachedFile(filename string) ([]byte, error) {
// after invalidation ignore cache files not created by this process
d.mutex.Lock()
_, ourFile := d.ourFiles[filename]
if d.invalidated && !ourFile {
d.mutex.Unlock()
return nil, errors.New("cache invalidated")
}
d.mutex.Unlock()
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
return nil, err
}
if time.Now().After(fileInfo.ModTime().Add(d.ttl)) {
return nil, errors.New("cache expired")
}
// the cache is present and its valid. Try to read and use it.
cachedBytes, err := io.ReadAll(file)
if err != nil {
return nil, err
}
d.mutex.Lock()
defer d.mutex.Unlock()
d.fresh = d.fresh && ourFile
return cachedBytes, nil
}
func (d *CachedDiscoveryClient) writeCachedFile(filename string, obj runtime.Object) error {
if err := os.MkdirAll(filepath.Dir(filename), 0750); err != nil {
return err
}
bytes, err := runtime.Encode(scheme.Codecs.LegacyCodec(), obj)
if err != nil {
return err
}
f, err := os.CreateTemp(filepath.Dir(filename), filepath.Base(filename)+".")
if err != nil {
return err
}
defer os.Remove(f.Name())
_, err = f.Write(bytes)
if err != nil {
return err
}
err = os.Chmod(f.Name(), 0660)
if err != nil {
return err
}
name := f.Name()
err = f.Close()
if err != nil {
return err
}
// atomic rename
d.mutex.Lock()
defer d.mutex.Unlock()
err = os.Rename(name, filename)
if err == nil {
d.ourFiles[filename] = struct{}{}
}
return err
}
主要是 CachedDiscoveryClient 在缓存文件中查询不到数据或者缓存过期后,会调用 memCacheClient 的 ServerGroups 方法:
// k8s.io/client-go/discovery/cached/memory/memcache.go
// 资源信息缓存实体
type cacheEntry struct {
resourceList *metav1.APIResourceList
err error
}
type memCacheClient struct {
// 内部套的发现客户端,这里是最基本的聚合发现 DiscoveryClient
delegate discovery.DiscoveryInterface
lock sync.RWMutex
// 资源信息缓存
groupToServerResources map[string]*cacheEntry
// 资源组缓存
groupList *metav1.APIGroupList
cacheValid bool
openapiClient openapi.Client
receivedAggregatedDiscovery bool
}
func (d *memCacheClient) ServerGroups() (*metav1.APIGroupList, error) {
// 调用 GroupsAndMaybeResources 方法
groups, _, _, err := d.GroupsAndMaybeResources()
if err != nil {
return nil, err
}
return groups, nil
}
func (d *memCacheClient) GroupsAndMaybeResources() (*metav1.APIGroupList, map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error, error) {
d.lock.Lock()
defer d.lock.Unlock()
if !d.cacheValid {
// 刷新内存的缓存状态
if err := d.refreshLocked(); err != nil {
return nil, nil, nil, err
}
}
// 从内存中构造出需要返回的数据结构,省略
return d.groupList, resourcesMap, failedGVs, nil
}
func (d *memCacheClient) refreshLocked() error {
var gl *metav1.APIGroupList
var err error
// delegate 是最基本的聚合发现 DiscoveryClient
if ad, ok := d.delegate.(discovery.AggregatedDiscoveryInterface); ok {
var resources map[schema.GroupVersion]*metav1.APIResourceList
var failedGVs map[schema.GroupVersion]error
// 调用 DiscoveryClient 的 GroupsAndMaybeResources 方法
// 最开始的时候讲过了
// 通过 RESTClient 客户端调用 /api 和 /apis 获取到所有的资源信息聚合后返回
gl, resources, failedGVs, err = ad.GroupsAndMaybeResources()
if resources != nil && err == nil {
// Cache the resources.
d.groupToServerResources = map[string]*cacheEntry{}
d.groupList = gl
for gv, resources := range resources {
d.groupToServerResources[gv.String()] = &cacheEntry{resources, nil}
}
// Cache GroupVersion discovery errors
for gv, err := range failedGVs {
d.groupToServerResources[gv.String()] = &cacheEntry{nil, err}
}
d.receivedAggregatedDiscovery = true
d.cacheValid = true
return nil
}
} else {
gl, err = d.delegate.ServerGroups()
}
// ...
return nil
}
memCacheClient 实际上就是调用 DiscoveryClient ,然后将数据缓存在内存中。
总结一下就是,CachedDiscoveryClient 首先从本地磁盘文件中取缓存的资源组信息,如果取不到或者过期了,就去 memCacheClient 的内存中取,内存中也取不到或者被刷新了,就调用 DiscoveryClient 向 kube-apiserver 发起请求获取。
回到 discovery.ServerGroupsAndResources 方法:
// k8s.io/client-go/discovery/discovery_client.go
func ServerGroupsAndResources(d DiscoveryInterface) ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
var sgs *metav1.APIGroupList
var resources []*metav1.APIResourceList
var failedGVs map[schema.GroupVersion]error
var err error
if ad, ok := d.(AggregatedDiscoveryInterface); ok {
// 聚合发现客户端,上面讲过了,省略 ...
} else {
// 对于缓存发现客户端,调用其 ServerGroups 方法,得到资源组 sgs
sgs, err = d.ServerGroups()
}
if sgs == nil {
return nil, nil, err
}
// 调用 fetchGroupVersionResources 传入资源组 sgs 获取资源信息
groupVersionResources, failedGroups := fetchGroupVersionResources(d, sgs)
// order results by group/version discovery order
result := []*metav1.APIResourceList{}
for _, apiGroup := range sgs.Groups {
for _, version := range apiGroup.Versions {
gv := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
if resources, ok := groupVersionResources[gv]; ok {
result = append(result, resources)
}
}
}
if len(failedGroups) == 0 {
return resultGroups, result, nil
}
return resultGroups, result, &ErrGroupDiscoveryFailed{Groups: failedGroups}
}
看到最终的 fetchGroupVersionResources 方法:
// k8s.io/client-go/discovery/discovery_client.go
func fetchGroupVersionResources(d DiscoveryInterface, apiGroups *metav1.APIGroupList) (map[schema.GroupVersion]*metav1.APIResourceList, map[schema.GroupVersion]error) {
groupVersionResources := make(map[schema.GroupVersion]*metav1.APIResourceList)
failedGroups := make(map[schema.GroupVersion]error)
wg := &sync.WaitGroup{}
resultLock := &sync.Mutex{}
// 循环资源组
for _, apiGroup := range apiGroups.Groups {
// 循环资源版本
for _, version := range apiGroup.Versions {
// 根据资源组和资源版本去获取资源信息
groupVersion := schema.GroupVersion{Group: apiGroup.Name, Version: version.Version}
wg.Add(1)
go func() {
defer wg.Done()
defer utilruntime.HandleCrash()
// 调用 CachedDiscoveryClient 的 ServerResourcesForGroupVersion 方法
apiResourceList, err := d.ServerResourcesForGroupVersion(groupVersion.String())
// lock to record results
resultLock.Lock()
defer resultLock.Unlock()
if err != nil {
// TODO: maybe restrict this to NotFound errors
failedGroups[groupVersion] = err
}
if apiResourceList != nil {
// even in case of error, some fallback might have been returned
groupVersionResources[groupVersion] = apiResourceList
}
}()
}
}
// 等待所有并发查询完成
wg.Wait()
return groupVersionResources, failedGroups
}
fetchGroupVersionResources 方法开启了并发去调用 ServerResourcesForGroupVersion 方法根据资源组版本来查询资源信息:
// k8s.io/client-go/discovery/cached/disk/cached_discovery.go
func (d *CachedDiscoveryClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) {
// 从本地磁盘缓存文件中查询资源信息
filename := filepath.Join(d.cacheDirectory, groupVersion, "serverresources.json")
cachedBytes, err := d.getCachedFile(filename)
// don't fail on errors, we either don't have a file or won't be able to run the cached check. Either way we can fallback.
if err == nil {
cachedResources := &metav1.APIResourceList{}
if err := runtime.DecodeInto(scheme.Codecs.UniversalDecoder(), cachedBytes, cachedResources); err == nil {
klog.V(10).Infof("returning cached discovery info from %v", filename)
return cachedResources, nil
}
}
// 如果缓存没有或者过期了,则调用 memCacheClient 的 ServerResourcesForGroupVersion 方法
liveResources, err := d.delegate.ServerResourcesForGroupVersion(groupVersion)
if err != nil {
klog.V(3).Infof("skipped caching discovery info due to %v", err)
return liveResources, err
}
if liveResources == nil || len(liveResources.APIResources) == 0 {
klog.V(3).Infof("skipped caching discovery info, no resources found")
return liveResources, err
}
// 将缓存写入本地磁盘缓存
if err := d.writeCachedFile(filename, liveResources); err != nil {
klog.V(1).Infof("failed to write cache to %v due to %v", filename, err)
}
return liveResources, nil
}
ServerResourcesForGroupVersion 方法和 ServerGroups 方法的逻辑是完全一样的,这里不重复了。感兴趣自己去看。
最后,基于缓存的发现客户端的使用示例如下:
package main
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery/cached/disk"
"k8s.io/client-go/tools/clientcmd"
"time"
)
func main() {
// 通过 kubeconfig 生成配置
config, err := clientcmd.BuildConfigFromFlags("", ".kube/config")
if err != nil {
panic(err)
}
// 创建 DiscoveryClient
//discoveryClient, err := discovery.NewDiscoveryClientForConfig(config)
// 创建带缓存的 DiscoveryClient
discoveryClient, err := disk.NewCachedDiscoveryClientForConfig(config, ".kube/cache", ".kube/http-cache", time.Minute*10)
if err != nil {
panic(err.Error())
}
_, apiResources, err := discoveryClient.ServerGroupsAndResources()
if err != nil {
panic(err.Error())
}
for _, apiResource := range apiResources {
gv, err := schema.ParseGroupVersion(apiResource.GroupVersion)
if err != nil {
panic(err.Error())
}
for _, resource := range apiResource.APIResources {
fmt.Printf("[G]%v [V]%v [R]%v\n", gv.Group, gv.Version, resource.Name)
}
}
}
本地缓存目录如下:
本回完,下一回介绍 client-go 的 Informer 机制。