使用 Clientset 获取 Kubernetes 资源对象
?
本节主要讲解 Kubernetes 核心的资源类型 Scheme 的定义以及如何使用 Clientset 来获取集群资源对象。
介绍
当我们操作资源和 apiserver 进行通信的时候,需要根据资源对象类型的 Group、Version、Kind 以及规范定义、编解码等内容构成 Scheme 类型,然后 Clientset 对象就可以来访问和操作这些资源类型了,Scheme 的定义主要在 api 子项目之中,源码仓库地址: https://github.com/kubernetes/api ,被同步到 Kubernetes 源码的 staging/src/k8s.io/api 之下。
主要就是各种资源对象的原始结构体定义,比如查看 apps/v1 目录下面的定义:
$ tree staging/src/k8s.io/api/apps/v1staging/src/k8s.io/api/apps/v1├── BUILD├── doc.go├── generated.pb.go├── generated.proto├── register.go├── types.go├── types_swagger_doc_generated.go└── zz_generated.deepcopy.go0 directories, 8 files
types.go 文件
其中 types.go 文件里面就是 apps/v1 这个 GroupVersion 下面所有的资源对象的定义,有 Deployment、DaemonSet、StatefulSet、ReplicaSet 等几个资源对象,比如 Deployment 的结构体定义如下所示:

由 TypeMeta、ObjectMeta、DeploymentSpec 以及 DeploymentStatus 4个属性组成,和我们使用 YAML 文件定义的 Deployment 资源对象也是对应的。
apiVersion: apps/v1kind: Deploymentmetadata:name: nginx-deploynamespace: defaultspec:selector:matchLabels:app: nginxtemplate:metadata:labels:app: nginxspec:containers:name: nginximage: nginxports:containerPort: 80
其中 apiVersion 与 kind 就是 TypeMeta 属性,metadata 属性就是 ObjectMeta,spec 属性就是 DeploymentSpec,当资源部署过后也会包含一个 status 的属性,也就是 DeploymentStatus ,这样就完整的描述了一个资源对象的模型。
zz_generated.deepcopy.go 文件
上面定义的规范在 Kubernetes 中称为资源类型 Scheme,此外zz_generated.deepcopy.go 文件是由 deepcopy-gen 工具创建的定义各资源类型 DeepCopyObject() 方法的文件,所有注册到 Scheme 的资源类型都要实现 runtime.Object 接口:
// staging/src/k8s.io/apimachinery/pkg/runtime/interface.gotype Object interface {GetObjectKind() schema.ObjectKindDeepCopyObject() Object}
而所有的资源类型都包含一个 TypeMeta 类型,而该类型实现了 GetObjectKind() 方法,所以各资源类型只需要实现 DeepCopyObject() 方法即可:
// staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/meta.gofunc (obj *TypeMeta) GetObjectKind() schema.ObjectKind { return obj }
各个资源类型的 DeepCopyObject() 方法也不是手动定义,而是使用 deepcopy-gen 工具命令统一自动生成的,该工具会读取 types.go 文件中的 +k8s:deepcopy-gen 注释,以 Deployment 为例:
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object// Deployment enables declarative updates for Pods and ReplicaSets.type Deployment struct {......}
然后将自动生成的代码保存到 zz_generated.deepcopy.go 文件中。
register.go 文件
register.go 文件的主要作用是定义 AddToScheme 函数,将各种资源类型注册到 Clientset 使用的  Scheme 对象中去,由于每个资源自动生成了 DeepCopyObject() 方法,这样资源就实现了 runtime.Object 接口,所以可以注册到 Scheme 中去了。
// staging/src/k8s.io/api/apps/v1/register.govar (// TODO: move SchemeBuilder with zz_generated.deepcopy.go to k8s.io/api.// localSchemeBuilder and AddToScheme will stay in k8s.io/kubernetes.SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)localSchemeBuilder = &SchemeBuilder// 对外暴露的 AddToScheme 方法用于注册该 Group/Verion 下的所有资源类型AddToScheme = localSchemeBuilder.AddToScheme)// staging/src/k8s.io/client-go/kubernetes/scheme/register.go// 新建一个 Scheme,将各类资源对象都添加到该 Schemevar Scheme = runtime.NewScheme()// 为 Scheme 中的所有类型创建一个编解码工厂var Codecs = serializer.NewCodecFactory(Scheme)// 为 Scheme 中的所有类型创建一个参数编解码工厂var ParameterCodec = runtime.NewParameterCodec(Scheme)// 将各 k8s.io/api// 目录下资源类型的 AddToScheme() 方法注册到 SchemeBuilder 中 var localSchemeBuilder = runtime.SchemeBuilder{......appsv1.AddToScheme,appsv1beta1.AddToScheme,appsv1beta2.AddToScheme,......}var AddToScheme = localSchemeBuilder.AddToSchemefunc init() {v1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"})// 调用 SchemeBuilder 中各资源对象的 AddToScheme() 方法,将它们注册到到 Scheme 对象utilruntime.Must(AddToScheme(Scheme))}
将各类资源类型注册到全局的 Scheme 对象中,这样 Clientset 就可以识别和使用它们了,那么我们应该如何使用 Clientset 呢?
示例
首先我们来看下如何通过 Clientset 来获取资源对象,我们这里来创建一个 Clientset 对象,然后通过该对象来获取默认命名空间之下的 Deployments 列表,代码如下所示:
package mainimport ("flag""fmt""os""path/filepath"metav1 "k8s.io/apimachinery/pkg/apis/meta/v1""k8s.io/client-go/kubernetes""k8s.io/client-go/rest""k8s.io/client-go/tools/clientcmd")func main() {var err errorvar config *rest.Configvar kubeconfig *stringif home := homeDir(); home != "" {kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")} else {kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")}flag.Parse()// 使用 ServiceAccount 创建集群配置(InCluster模式)if config, err = rest.InClusterConfig(); err != nil {// 使用 KubeConfig 文件创建集群配置if config, err = clientcmd.BuildConfigFromFlags("", *kubeconfig); err != nil {panic(err.Error())}}// 创建 clientsetclientset, err := kubernetes.NewForConfig(config)if err != nil {panic(err.Error())}// 使用 clientsent 获取 Deploymentsdeployments, err := clientset.AppsV1().Deployments("default").List(metav1.ListOptions{})if err != nil {panic(err)}for idx, deploy := range deployments.Items {fmt.Printf("%d -> %s\n", idx+1, deploy.Name)}}func homeDir() string {if h := os.Getenv("HOME"); h != "" {return h}return os.Getenv("USERPROFILE") // windows}
上面的代码运行可以获得 default 命名空间之下的 Deployments:
go run main.go1 -> details-v12 -> el-gitlab-listener3 -> nginx4 -> productpage-v15 -> ratings-v16 -> reviews-v17 -> reviews-v28 -> reviews-v3
这是一个非常典型的访问 Kubernetes 集群资源的方式,通过 client-go 提供的 Clientset 对象来获取资源数据,主要有以下三个步骤:
使用 kubeconfig 文件或者 ServiceAccount(InCluster 模式)来创建访问 Kubernetes API 的 Restful 配置参数,也就是代码中的
rest.Config对象
使用 rest.Config 参数创建 Clientset 对象,这一步非常简单,直接调用
kubernetes.NewForConfig(config)即可初始化
然后是 Clientset 对象的方法去获取各个 Group 下面的对应资源对象进行 CRUD 操作
Clientset 对象
上面我们了解了如何使用 Clientset 对象来获取集群资源,接下来我们来分析下 Clientset 对象的实现。
上面我们使用的 Clientset 实际上是对各种资源类型的 Clientset 的一次封装:
// staging/src/k8s.io/client-go/kubernetes/clientset.go// NewForConfig 使用给定的 config 创建一个新的 Clientsetfunc NewForConfig(c *rest.Config) (*Clientset, error) {configShallowCopy := *cif configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)}var cs Clientsetvar err errorcs.admissionregistrationV1beta1, err = admissionregistrationv1beta1.NewForConfig(&configShallowCopy)if err != nil {return nil, err}// 将其他 Group 和版本的资源的 RESTClient 封装到全局的 Clientset 对象中cs.appsV1, err = appsv1.NewForConfig(&configShallowCopy)if err != nil {return nil, err}......cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(&configShallowCopy)if err != nil {return nil, err}return &cs, nil}
上面的 NewForConfig 函数里面就是将其他的各种资源的 RESTClient 封装到了全局的 Clientset 中,这样当我们需要访问某个资源的时候只需要使用 Clientset 里面包装的属性即可,比如 clientset.CoreV1() 就是访问 Core 这个 Group 下面 v1 这个版本的 RESTClient。这些局部的 RESTClient 都定义在 staging/src/k8s.io/client-go/typed/ 文件中,比如 staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/apps_client.go 这个文件中就是定义的 apps 这个 Group 下面的 v1 版本的 RESTClient,这里同样以 Deployment 为例:
// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/apps_client.go// NewForConfig 根据 rest.Config 创建一个 AppsV1Clientfunc NewForConfig(c *rest.Config) (*AppsV1Client, error) {config := *c// 为 rest.Config 设置资源对象默认的参数if err := setConfigDefaults(&config); err != nil {return nil, err}// 实例化 AppsV1Client 的 RestClientclient, err := rest.RESTClientFor(&config)if err != nil {return nil, err}return &AppsV1Client{client}, nil}func setConfigDefaults(config *rest.Config) error {// 资源对象的 GroupVersiongv := v1.SchemeGroupVersionconfig.GroupVersion = &gv// 资源对象的 root pathconfig.APIPath = "/apis"// 使用注册的资源类型 Scheme 对请求和响应进行编解码,Scheme 就是前文中分析的资源类型的规范config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs}if config.UserAgent == "" {config.UserAgent = rest.DefaultKubernetesUserAgent()}return nil}func (c *AppsV1Client) Deployments(namespace string) DeploymentInterface {return newDeployments(c, namespace)}// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/deployment.go// deployments 实现了 DeploymentInterface 接口type deployments struct {client rest.Interfacens string}// newDeployments 实例化 deployments 对象func newDeployments(c *AppsV1Client, namespace string) *deployments {return &deployments{client: c.RESTClient(),ns: namespace,}}
通过上面代码我们就可以很清晰的知道可以通过 clientset.AppsV1().Deployments("default")来获取一个 deployments 对象,然后该对象下面定义了 deployments 对象的 CRUD 操作,比如我们调用的 List() 函数:
// staging/src/k8s.io/client-go/kubernetes/typed/apps/v1/deployment.gofunc (c *deployments) List(opts metav1.ListOptions) (result *v1.DeploymentList, err error) {var timeout time.Durationif opts.TimeoutSeconds != nil {timeout = time.Duration(*opts.TimeoutSeconds) * time.Second}result = &v1.DeploymentList{}err = c.client.Get().Namespace(c.ns).Resource("deployments").VersionedParams(&opts, scheme.ParameterCodec).Timeout(timeout).Do().Into(result)return}
从上面代码可以看出最终是通过 c.client 去发起的请求,也就是局部的 restClient 初始化的函数中通过 rest.RESTClientFor(&config) 创建的对象,也就是将 rest.Config 对象转换成一个 Restful 的 Client 对象用于网络操作:
// staging/src/k8s.io/client-go/rest/config.go// RESTClientFor 返回一个满足客户端 Config 对象上的属性的 RESTClient 对象。// 注意在初始化客户端的时候,RESTClient 可能需要一些可选的属性。func RESTClientFor(config *Config) (*RESTClient, error) {if config.GroupVersion == nil {return nil, fmt.Errorf("GroupVersion is required when initializing a RESTClient")}if config.NegotiatedSerializer == nil {return nil, fmt.Errorf("NegotiatedSerializer is required when initializing a RESTClient")}qps := config.QPSif config.QPS == 0.0 {qps = DefaultQPS}burst := config.Burstif config.Burst == 0 {burst = DefaultBurst}baseURL, versionedAPIPath, err := defaultServerUrlFor(config)if err != nil {return nil, err}transport, err := TransportFor(config)if err != nil {return nil, err}// 初始化一个 HTTP Client 对象var httpClient *http.Clientif transport != http.DefaultTransport {httpClient = &http.Client{Transport: transport}if config.Timeout > 0 {httpClient.Timeout = config.Timeout}}return NewRESTClient(baseURL, versionedAPIPath, config.ContentConfig, qps, burst, config.RateLimiter, httpClient)}
到这里我们就知道了 Clientset 是基于 RESTClient 的,RESTClient 是底层的用于网络请求的对象,可以直接通过 RESTClient 提供的 RESTful 方法如 Get()、Put()、Post()、Delete() 等和 APIServer 进行交互:
同时支持 JSON 和 protobuf 两种序列化方式
支持所有原生资源
但实际上除了常用的 CRUD 操作之外,我们还可以进行 Watch 操作,可以监听资源对象的增、删、改、查操作,这样我们就可以根据自己的业务逻辑去处理这些数据了,但是实际上也并不建议这样使用,因为往往由于集群中的资源较多,我们需要自己在客户端去维护一套缓存,而这个维护成本也是非常大的,为此 client-go 也提供了自己的实现机制,那就是 Informers。Informers 是这个事件接口和带索引查找功能的内存缓存的组合,这样也是目前最常用的用法。Informers 第一次被调用的时候会首先在客户端调用 List 来获取全量的对象集合,然后通过 Watch 来获取增量的对象更新缓存,这个我们后续在讲解。
K8S进阶训练营,点击下方图片了解详情

