Kubectl源码阅读1

邓胖

共 18499字,需浏览 37分钟

 ·

2023-08-26 21:40

如果不知道怎么用client-go,那就看看kubectl的代码吧,kubectl除了是一个比较优秀的命令行工具之外,还提供了比较好的代码实现,通过看kubectl的代码我们可以抄到很多有用的代码片段。

kubernetes代码版本: v1.20.2

本文主要讲解kubectl增删改查的代码。

快速入门

一般来说kubectl最常用的操作就是对资源的增删改查。

      
      # 创建资源
kubectl create -f test.yaml

# 删除资源
kubectl delete -f test.yaml

# 更新资源, 我们也可以用apply来创建资源
kubectl apply -f test.yaml

# 获取pod列表
kubectl get pods

代码结构

如果使用过cobra,那么对kubectl的代码结构不会陌生,因为kubectl就是使用cobra来组织各个命令的。

当然了,kubernetes/cmd下的所有项目,几乎都是使用的cobra,所以代码入口基本都是一致的。

      
      func main() {
    // 1.
 command := cmd.NewDefaultKubectlCommand()
    // 2.
 if err := command.Execute(); err != nil {
  os.Exit(1)
 }
}

func NewDefaultKubectlCommand() *cobra.Command {
 return NewDefaultKubectlCommandWithArgs(NewDefaultPluginHandler(plugin.ValidPluginFilenamePrefixes), os.Args, os.Stdin, os.Stdout, os.Stderr)
}


func NewDefaultKubectlCommandWithArgs(pluginHandler PluginHandler, args []string, in io.Reader, out, errout io.Writer) *cobra.Command {
 cmd := NewKubectlCommand(in, out, errout)
    // 检查是否是调用插件..., 
}


func NewKubectlCommand(in io.Reader, out, err io.Writer) *cobra.Command {
    // 3.
 cmds := &cobra.Command{
  Use:   "kubectl",
  Short: i18n.T("kubectl controls the Kubernetes cluster manager"),,
  Run: runHelp,
  BashCompletionFunction: bashCompletionFunc,
 }
 // 4.
 flags := cmds.PersistentFlags()
 kubeConfigFlags := genericclioptions.NewConfigFlags(true).WithDeprecatedPasswordFlag()
 kubeConfigFlags.AddFlags(flags)
    // k8s老传统了一层套一层,即使很小的改变也不会直接修改原对象
 matchVersionKubeConfigFlags := cmdutil.NewMatchVersionFlags(kubeConfigFlags)
 matchVersionKubeConfigFlags.AddFlags(cmds.PersistentFlags())
    // 5.
 f := cmdutil.NewFactory(matchVersionKubeConfigFlags)
 ioStreams := genericclioptions.IOStreams{In: in, Out: out, ErrOut: err}
    
    // 6.
 groups := templates.CommandGroups{
  {
   Message: "Basic Commands (Beginner):",
   Commands: []*cobra.Command{
    create.NewCmdCreate(f, ioStreams),
   },
  },
  {
   Message: "Basic Commands (Intermediate):",
   Commands: []*cobra.Command{
    get.NewCmdGet("kubectl", f, ioStreams),
    delete.NewCmdDelete(f, ioStreams),
   },
  },
  {
   Message: "Advanced Commands:",
   Commands: []*cobra.Command{
    apply.NewCmdApply("kubectl", f, ioStreams),
   },
  }
 }
 groups.Add(cmds)
 return cmds
}

代码分解如下:

  1. 初始化命令行
  2. 执行命令
  3. kubectl根命令的配置, 作为所有子命令的父命令
  4. 获取cobra.CommandFlagSet对象, 用于后续在它上面追加各种参数,比如这里的--kubeconfig参数
  5. 将获取kubeconfigConfigFlags再次封装,以及封装标准输出和输入
  6. 将子命令分组并添加到kubectl命令中

如果使用过kubectl或者二次开发过k8s,应该对kubeconfig不默认,一般来说我们总会准备一个kubeconfig来连接集群, kubectl也不例外,它会提供一个--kubeconfig的可选参数用于配置kubeconfig, 默认情况下会读取~/.kube/config

总的来说kubectl的根命令做了两件事情,一是将所有命令组织起来,二是配置全局参数(比如--kubeconfig)。

而这个--kubeconfig参数对应的kubeConfigFlags会负责加载kubeconfig并提供一系列的帮助函数,比如

      
      
        // 查询k8s集群资源列表
        
func (*ConfigFlags).ToDiscoveryClient() (discovery.CachedDiscoveryInterface, error)
// 可以生成client的rest.Config
func (*ConfigFlags).ToRESTConfig() (*rest.Config, error)
// 用于映射gvr到gvk的对象
func (*ConfigFlags).ToRESTMapper() (meta.RESTMapper, error)

有了这三个方法,就可以很方便的跟集群交互了。

如果对上述三个对象还是不太熟悉的同学,可以看看我之前client-go的系列文章: https://youerning.top/tags/client-go

创建

基于上面的代码结构,我们可以很容易的找到create命令对应的代码.

      
      func NewCmdCreate(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
    // 1.
 o := NewCreateOptions(ioStreams)
 cmd := &cobra.Command{
  Use:                   "create -f FILENAME",
  DisableFlagsInUseLine: true,
  Short:                 i18n.T("Create a resource from a file or from stdin."),
  Long:                  createLong,
  Example:               createExample,
        // 2.
  Run: func(cmd *cobra.Command, args []string) {
            // cmdutil.CheckErr的逻辑主要就是判断是否出错并退出。
            // 3.
   cmdutil.CheckErr(o.Complete(f, cmd))
            // 4.
   cmdutil.CheckErr(o.ValidateArgs(cmd, args))
            // 5.
   cmdutil.CheckErr(o.RunCreate(f, cmd))
  },
    }
    
    // 增加子命令和参数等
}

代码分解如下:

  1. 几乎所有kubectl子命令都会构建一个Options对象用于绑定命令行参数
  2. cobra代码运行入口
  3. 补全参数以及构建必要的对象
  4. 验证命令行参数是否合法
  5. 创建资源的代码入口
      
      func (o *CreateOptions) RunCreate(f cmdutil.Factory, cmd *cobra.Command) error {
 // 1.
 schema, err := f.Validator(cmdutil.GetFlagBool(cmd, "validate"))
 cmdNamespace, enforceNamespace, err := f.ToRawKubeConfigLoader().Namespace()

    // 2.
 r := f.NewBuilder().
  Unstructured().
  Schema(schema).
  ContinueOnError().
  NamespaceParam(cmdNamespace).DefaultNamespace().
  FilenameParam(enforceNamespace, &o.FilenameOptions).
  LabelSelectorParam(o.Selector).
  Flatten().
  Do()

 count := 0
    // 3.
 err = r.Visit(func(info *resource.Info, err error) error {
        // 4.
  if err := util.CreateOrUpdateAnnotation(cmdutil.GetFlagBool(cmd, cmdutil.ApplyAnnotationsFlag), info.Object, scheme.DefaultJSONEncoder()); err != nil {
   return cmdutil.AddSourceToErr("creating", info.Source, err)
  }

        // DryRun试运行, 可以在不修改目标对象的情况下验证功能
  if o.DryRunStrategy != cmdutil.DryRunClient {
            // 5.
   obj, err := resource.
    NewHelper(info.Client, info.Mapping).
    DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
    WithFieldManager(o.fieldManager).
    Create(info.Namespace, true, info.Object)
            // 6.
   info.Refresh(obj, true)
  }

  count++
        // 7.
  return o.PrintObj(info.Object)
 })
 return nil
}

代码分解如下:

  1. 获取资源验证器(scheme), 通过集群的/openapi/v2地址获取json schema validator, 可以用来验证输入是否合法
  2. 构建者(builder)的设计模式,将参数通过各个方法传入,最后Do方法用来执行,返回一个访问者vistor设计模式的对象, 它提供Visit方法来访问构造的对象
  3. 通过Visit方法遍历info对象
  4. 更新注释: kubectl.kubernetes.io/last-applied-configuration
  5. 创建对象
  6. 刷新对象,用于后续打印结果
  7. 打印结果

这段代码有两个对象比较重要,Builder, Visitor, 两者分别对应构建者设计模式和访问者设计模式,如果对这两个设计模式不太熟悉的同学,可以先搜索并学习一下,这里就不讲两者的代码思路了。

Builder

      
      r := f.NewBuilder().
  Unstructured().
  Schema(schema).
  ContinueOnError().
  NamespaceParam(cmdNamespace).DefaultNamespace().
  // 除了FilenameParam, 其他方法都是简单的设置一个熟悉
  FilenameParam(enforceNamespace, &o.FilenameOptions).
  LabelSelectorParam(o.Selector).
  Flatten().
  Do()


func (b *Builder) FilenameParam(enforceNamespace bool, filenameOptions *FilenameOptions) *Builder {
 recursive := filenameOptions.Recursive
 paths := filenameOptions.Filenames
 for _, s := range paths {
  switch {
  case s == "-":
   b.Stdin()
  case strings.Index(s, "http://") == 0 || strings.Index(s, "https://") == 0:
   url, err := url.Parse(s)
   b.URL(defaultHttpGetAttempts, url)
  default:
   if !recursive {
    b.singleItemImplied = true
   }
            // 1.
   b.Path(recursive, s)
  }
 }
 return b
}

func (b *Builder) Path(recursive bool, paths ...string) *Builder {
 for _, p := range paths {
        //检查文件是否存在
        
        // 2.
  visitors, err := ExpandPathsToFileVisitors(b.mapper, p, recursive, FileExtensions, b.schema)
  b.paths = append(b.paths, visitors...)
 }
 return b
}

func ExpandPathsToFileVisitors(mapper *mapper, paths string, recursive bool, extensions []string, schema ContentValidator) ([]Visitor, error) {
 var visitors []Visitor
 err := filepath.Walk(paths, func(path string, fi os.FileInfo, err error) error {
        
        // 3.
  visitor := &FileVisitor{
   Path:          path,
   StreamVisitor: NewStreamVisitor(nil, mapper, path, schema),
  }

  visitors = append(visitors, visitor)
  return nil
 })

 return visitors, nil
}

代码分解如下:

  1. 解析传入的文件路径
  2. 将路径包装一下
  3. 最终包装成FileVisitor,该对象在调用Visit方法时会解析并验证传入的yaml文件。

Visitor

由于k8s支持比较多的场景,所以visitor包了一层又一层。

      
      func (b *Builder) Do() *Result {
    // 返回一个Result对象, 它也备包裹了许多层
    r := b.visitorResult()
    if b.flatten {
  r.visitor = NewFlattenListVisitor(r.visitor, b.objectTyper, b.mapper)
 }
 helpers := []VisitorFunc{}
    // 设置namespace,如果没有的话
 if b.defaultNamespace {
  helpers = append(helpers, SetNamespace(b.namespace))
 }
    // 检查namespace是否一致
 if b.requireNamespace {
  helpers = append(helpers, RequireNamespace(b.namespace))
 }
    // 忽略没有namespace作用于的对象,即kubectl api-resources的NAMESPACED字段
 helpers = append(helpers, FilterNamespace)
    // RetrieveLazy当对象是空的时候回去请求对象
 if b.requireObject {
  helpers = append(helpers, RetrieveLazy)
 }
    // 遇到错误是否继续的包装器
 if b.continueOnError {
  r.visitor = NewDecoratedVisitor(ContinueOnErrorVisitor{r.visitor}, helpers...)
 } else {
  r.visitor = NewDecoratedVisitor(r.visitor, helpers...)
 }
}

可以看到visitor被包装了非常多的层,所以如果调用Visit会是这样的一个调用链

      
      sequenceDiagram
DecoratedVisitor ->>+ ContinueOnErrorVisitor:
ContinueOnErrorVisitor ->>+ RetrieveLazy:
RetrieveLazy ->>+ FilterNamespace:
FilterNamespace ->>+ RequireNamespace:
RequireNamespace ->>+ SetNamespace:
SetNamespace ->>+ FlattenListVisitor:
FlattenListVisitor ->>+ EagerVisitorList:
EagerVisitorList ->>+ ExpandPathsToFileVisitors:
ExpandPathsToFileVisitors ->>+ FileVisitor:
FileVisitor ->>+ StreamVisitor:
StreamVisitor -->>- DecoratedVisitor:

aedf63ac1efe5bb7f061a67c106b723a.webp

可以看到,真正干活的StreamVisitor调用路径非常深。

为了简单起见,我们可以直接跳过前面的控制流程的visitor, 那么代码如下:

      
      err = r.Visit(func(info *resource.Info, err error) error {
 // 获取StreamVisitor解析的info对象并创建对应的资源,后文详细说明
}
              
func (v *StreamVisitor) Visit(fn VisitorFunc) error {
 d := yaml.NewYAMLOrJSONDecoder(v.Reader, 4096)
 for {
        // 1.
  ext := runtime.RawExtension{}
  if err := d.Decode(&ext); err != nil {
   if err == io.EOF {
    return nil
   }
   return fmt.Errorf("error parsing %s: %v", v.Source, err)
  }
  ext.Raw = bytes.TrimSpace(ext.Raw)
        // 2.
  if err := ValidateSchema(ext.Raw, v.Schema); err != nil {
   return fmt.Errorf("error validating %q: %v", v.Source, err)
  }
        // 3.
  info, err := v.infoForData(ext.Raw, v.Source)
  if err := fn(info, nil); err != nil {
   return err
  }
 }
}

func (m *mapper) infoForData(data []byte, source string) (*Info, error) {
    // 4.
 obj, gvk, err := m.decoder.Decode(data, nilnil)
 name, _ := metadataAccessor.Name(obj)
 namespace, _ := metadataAccessor.Namespace(obj)
 resourceVersion, _ := metadataAccessor.ResourceVersion(obj)

 ret := &Info{
  Source:          source,
  Namespace:       namespace,
  Name:            name,
  ResourceVersion: resourceVersion,
  Object: obj,
 }

 if m.localFn == nil || !m.localFn() {
        // 5.
  restMapper, err := m.restMapperFn()
  mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
  ret.Mapping = mapping
  client, err := m.clientFn(gvk.GroupVersion())
  ret.Client = client
 }

 return ret, nil
}

代码分解如下:

  1. 尝试解析yaml文件,并读取第一个资源定义对象

    因为yaml文件可以同时定义多个资源, 所以是一个循环来解析用户输入的yaml文件

  2. 验证yaml文件定义的各个字段是否合法

  3. 将yaml文件转成一个info对象

  4. 首先解码成一个runtime.Object, 这里应该是Unstructured

  5. 得到restmapper对象并获取对应的mapping, 即gvr和gvk的映射关系, 还有就是绑定一个client

最后就是创建的核心逻辑

      
      obj, err := resource.
    NewHelper(info.Client, info.Mapping).
    DryRun(o.DryRunStrategy == cmdutil.DryRunServer).
    WithFieldManager(o.fieldManager).
    // 除了Create方法,其他都是构造方法,简单的设置对应的字段值而已
    Create(info.Namespace, true, info.Object)

func (m *Helper) Create(namespace string, modify bool, obj runtime.Object) (runtime.Object, error) {
 return m.CreateWithOptions(namespace, modify, obj, nil)
}

func (m *Helper) CreateWithOptions(namespace string, modify bool, obj runtime.Object, options *metav1.CreateOptions) (runtime.Object, error) {
 if options == nil {
  options = &metav1.CreateOptions{}
 }
 return m.createResource(m.RESTClient, m.Resource, namespace, obj, options)
}

func (m *Helper) createResource(c RESTClient, resource, namespace string, obj runtime.Object, options *metav1.CreateOptions) (runtime.Object, error) {
 return c.Post().
  NamespaceIfScoped(namespace, m.NamespaceScoped).
  Resource(resource).
  VersionedParams(options, metav1.ParameterCodec).
  Body(obj).
  Do(context.TODO()).
  Get()
}

上面的代码还是比较清晰的,最后还是回到RESTClient的构造,如果熟悉client-go的话应该不陌生。至此,整个调用过程就结束了。

由于文章太长了,所以在此截断。


浏览 19
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报