"kubernetes KubeCtl源码分析"

  "kubectl源码分析"

Posted by Xu on August 24, 2018

kubectl源码分析

这一章我们将从kubectl的命令发送到API Server的流程进行分析,我们知道K8S采用如下的架构进行对POD的控制和管理:

kubectlframe

  • kubectl初始化,添加各类命令行cobra.Command接口(Create,Update)
  • 当执行某一具体的命令时,会使用工厂模式,首先Factory创建一个Builder
  • 再对该Builder进行相关设置(相当于设置该车间要生产的产品的生产模式)
  • Builder最后调用Do(),根据Builder中的配置来设置该Builder生产的对象Result,其中包含最重要的成员:visitor(根据Builder的配置信息得到不同的访问者)
  • 结合观察者设计模式,不同的观察者观察不同的对象,有如下几类观察者
    • Resource/Name
    • URL
    • FilePath
    • Selector
    • Info(封装调用信息)
    • 所有的观察者调用visit(fn(info,err))函数都是为了首先配置生成封装了调用信息的Info结构,然后执行函数fn(info,err)。
      type Info struct{
          Client RESTClient//REST风格客户端接口
          Mapping *meta.RESTMapping
          Namespace string
          Name  string
          Source string//比较重要,pod源的数据(json/yaml)
          Object  runtime.Object
          ResourceVersion string
          Export bool
      }
    
  • 调用函数fn(info,err),其实就是通过info.Client中的接口生成request请求体对象,发送给apiserver

源码总结

  1. kubectl 向APIServer发送Request请求
  2. 根据restful风格的网络服务架构设计,kubenete将所有的API服务都视为资源,将访问API的过程抽象成访问者访问资源的过程
  3. 这里使用了两个设计模式:工厂模式,和观察者模式
  4. 工厂模式根据命令信息生成一个访问者visotor
  5. 观察者模式则是根据不同的visitor来访问不同的资源(API)信息,最后通过RESTClient的接口标准合成APIServer访问的请求体并发送到服务端。整个过程可以抽象成如下形式:

kubectl_cmd

kubectl初始化,添加Command

  • 创建了一个工厂对象,用于创建产品,首先生成车间Builder,然后执行DO,生成产品Result
  • 根据产品调用visitor执行调用client接口生成request对象
  • 添加了所有kubectl的命令对象
func NewKubectlCommand(in io.Reader, out, err io.Writer) *cobra.Command {
    // Parent command to which all subcommands are added.
    cmds := &cobra.Command{
        Use:   "kubectl",
        Short: i18n.T("kubectl controls the Kubernetes cluster manager"),
        Long: templates.LongDesc(`
      kubectl controls the Kubernetes cluster manager.

      Find more information at:
            https://kubernetes.io/docs/reference/kubectl/overview/`),
        Run: runHelp,
        BashCompletionFunction: bashCompletionFunc,
    }

    flags := cmds.PersistentFlags()
    flags.SetNormalizeFunc(utilflag.WarnWordSepNormalizeFunc) // Warn for "_" flags

    // Normalize all flags that are coming from other packages or pre-configurations
    // a.k.a. change all "_" to "-". e.g. glog package
    flags.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)

    kubeConfigFlags := genericclioptions.NewConfigFlags()
    kubeConfigFlags.AddFlags(flags)
    matchVersionKubeConfigFlags := cmdutil.NewMatchVersionFlags(kubeConfigFlags)
    matchVersionKubeConfigFlags.AddFlags(cmds.PersistentFlags())

    cmds.PersistentFlags().AddGoFlagSet(flag.CommandLine)

    f := cmdutil.NewFactory(matchVersionKubeConfigFlags)//创建了一个工厂对象,用于创建产品,首先生成车间Builder,然后执行DO,生成产品Result

    // Sending in 'nil' for the getLanguageFn() results in using
    // the LANG environment variable.
    //
    // TODO: Consider adding a flag or file preference for setting
    // the language, instead of just loading from the LANG env. variable.
    i18n.LoadTranslations("kubectl", nil)

    // From this point and forward we get warnings on flags that contain "_" separators
    cmds.SetGlobalNormalizationFunc(utilflag.WarnWordSepNormalizeFunc)

    ioStreams := genericclioptions.IOStreams{In: in, Out: out, ErrOut: err}

    groups := templates.CommandGroups{
        {
            Message: "Basic Commands (Beginner):",
            Commands: []*cobra.Command{
                create.NewCmdCreate(f, ioStreams),
                NewCmdExposeService(f, ioStreams),
                NewCmdRun(f, ioStreams),
                set.NewCmdSet(f, ioStreams),
                deprecatedAlias("run-container", NewCmdRun(f, ioStreams)),
            },
        },
        {
            Message: "Basic Commands (Intermediate):",
            Commands: []*cobra.Command{
                NewCmdExplain("kubectl", f, ioStreams),
                get.NewCmdGet("kubectl", f, ioStreams),
                NewCmdEdit(f, ioStreams),
                NewCmdDelete(f, ioStreams),
            },
        },
        {
            Message: "Deploy Commands:",
            Commands: []*cobra.Command{
                rollout.NewCmdRollout(f, ioStreams),
                NewCmdRollingUpdate(f, ioStreams),
                NewCmdScale(f, ioStreams),
                NewCmdAutoscale(f, ioStreams),
            },
        },
        {
            Message: "Cluster Management Commands:",
            Commands: []*cobra.Command{
                NewCmdCertificate(f, ioStreams),
                NewCmdClusterInfo(f, ioStreams),
                NewCmdTop(f, ioStreams),
                NewCmdCordon(f, ioStreams),
                NewCmdUncordon(f, ioStreams),
                NewCmdDrain(f, ioStreams),
                NewCmdTaint(f, ioStreams),
            },
        },
        {
            Message: "Troubleshooting and Debugging Commands:",
            Commands: []*cobra.Command{
                NewCmdDescribe("kubectl", f, ioStreams),
                NewCmdLogs(f, ioStreams),
                NewCmdAttach(f, ioStreams),
                NewCmdExec(f, ioStreams),
                NewCmdPortForward(f, ioStreams),
                NewCmdProxy(f, ioStreams),
                NewCmdCp(f, ioStreams),
                auth.NewCmdAuth(f, ioStreams),
            },
        },
        {
            Message: "Advanced Commands:",
            Commands: []*cobra.Command{
                NewCmdApply("kubectl", f, ioStreams),
                NewCmdPatch(f, ioStreams),
                NewCmdReplace(f, ioStreams),
                wait.NewCmdWait(f, ioStreams),
                NewCmdConvert(f, ioStreams),
            },
        },
        {
            Message: "Settings Commands:",
            Commands: []*cobra.Command{
                NewCmdLabel(f, ioStreams),
                NewCmdAnnotate("kubectl", f, ioStreams),
                NewCmdCompletion(ioStreams.Out, ""),
            },
        },
    }
    groups.Add(cmds)

    filters := []string{"options"}

    // Hide the "alpha" subcommand if there are no alpha commands in this build.
    alpha := NewCmdAlpha(f, ioStreams)
    if !alpha.HasSubCommands() {
        filters = append(filters, alpha.Name())
    }

    templates.ActsAsRootCommand(cmds, filters, groups...)

    for name, completion := range bash_completion_flags {
        if cmds.Flag(name) != nil {
            if cmds.Flag(name).Annotations == nil {
                cmds.Flag(name).Annotations = map[string][]string{}
            }
            cmds.Flag(name).Annotations[cobra.BashCompCustom] = append(
                cmds.Flag(name).Annotations[cobra.BashCompCustom],
                completion,
            )
        }
    }

    cmds.AddCommand(alpha)
    cmds.AddCommand(cmdconfig.NewCmdConfig(f, clientcmd.NewDefaultPathOptions(), ioStreams))
    cmds.AddCommand(NewCmdPlugin(f, ioStreams))
    cmds.AddCommand(NewCmdVersion(f, ioStreams))
    cmds.AddCommand(NewCmdApiVersions(f, ioStreams))
    cmds.AddCommand(NewCmdApiResources(f, ioStreams))
    cmds.AddCommand(NewCmdOptions(ioStreams.Out))

    return cmds
}       

create命令为例子,观察者模式分析

  • 调用NewCmdCreate命令,来为create命令添加自命令以及运行函数RunCreate
  • 当调用create命令时,具体会调用到RunCreate命令
    • 工厂模式,生成车间Builder,配置车间然后Do(),生成产品Result
    • Result中根据Builder配置信息设置一个重要的成员visitor(有不同类型观察者)
    • Result.Visit()其实就是Result.visitor.Visitor(fn(Info,err))
      • 不同的观察者visior会生成不同调用对象信息结构体Info
      • 然后将Info作为参数传入到fn函数中
      • fn函数会根据Info中的RESTClient来调用具体的接口生成Request请求体对象,然后向Api Server发送Request请求
func (o *CreateOptions) RunCreate(f cmdutil.Factory, cmd *cobra.Command) error {
    // raw only makes sense for a single file resource multiple objects aren't likely to do what you want.
    // the validator enforces this, so
    if len(o.Raw) > 0 {
        return o.raw(f)
    }

    if o.EditBeforeCreate {
        return RunEditOnCreate(f, o.PrintFlags, o.RecordFlags, o.IOStreams, cmd, &o.FilenameOptions)
    }
    schema, err := f.Validator(cmdutil.GetFlagBool(cmd, "validate"))
    if err != nil {
        return err
    }

    cmdNamespace, enforceNamespace, err := f.ToRawKubeConfigLoader().Namespace()
    if err != nil {
        return err
    }

    r := f.NewBuilder().
        Unstructured().
        Schema(schema).
        ContinueOnError().
        NamespaceParam(cmdNamespace).DefaultNamespace().
        FilenameParam(enforceNamespace, &o.FilenameOptions).
        LabelSelectorParam(o.Selector).
        Flatten().
        Do()//工厂模式,生成车间,中间部分的操作均为配置车间,然后Do(),生成产品Result
    err = r.Err()
    if err != nil {
        return err
    }

    count := 0
    err = r.Visit(func(info *resource.Info, err error) error {//观察者生成Info,调用func回调函数
        if err != nil {
            return err
        }
        if err := kubectl.CreateOrUpdateAnnotation(cmdutil.GetFlagBool(cmd, cmdutil.ApplyAnnotationsFlag), info.Object, cmdutil.InternalVersionJSONEncoder()); err != nil {
            return cmdutil.AddSourceToErr("creating", info.Source, err)
        }

        if err := o.Recorder.Record(info.Object); err != nil {
            glog.V(4).Infof("error recording current command: %v", err)
        }

        if !o.DryRun {//这里的DryRun代表只创建不实际运行
            //这里实现根据info中的RESTClient接口来生成对应的Request请求体来请求API Server
            if err := createAndRefresh(info); err != nil {
                return cmdutil.AddSourceToErr("creating", info.Source, err)
            }
        }

        count++

        return o.PrintObj(info.Object)
    })
    if err != nil {
        return err
    }
    if count == 0 {
        return fmt.Errorf("no objects passed to create")
    }
    return nil
}

create命令生成Request请求体的过程

这里只是简单的介绍create的生成Request流程,但对RESTClient的生成过程,及具体Request请求体到通过httpClient的方式发送的部分没有进行深入的分析。

  • createAndRefresh首先生成一个调用辅助器Helper

      type Helper struct {
      // The name of this resource as the server would recognize it
      Resource string
      // A RESTClient capable of mutating this resource.
      RESTClient RESTClient
      // True if the resource type is scoped to namespaces
      NamespaceScoped bool
      }
    
  • Helper调用Create函数

func createAndRefresh(info *resource.Info) error {
    obj, err := resource.NewHelper(info.Client, info.Mapping).Create(info.Namespace, true, info.Object)
    if err != nil {
        return err
    }
    info.Refresh(obj, true)
    return nil
}
  • Create函数调用createResource创建资源
func (m *Helper) Create(namespace string, modify bool, obj runtime.Object) (runtime.Object, error) {
    if modify {
        // Attempt to version the object based on client logic.
        version, err := metadataAccessor.ResourceVersion(obj)
        if err != nil {
            // We don't know how to clear the version on this object, so send it to the server as is
            return m.createResource(m.RESTClient, m.Resource, namespace, obj)
        }
        if version != "" {
            if err := metadataAccessor.SetResourceVersion(obj, ""); err != nil {
                return nil, err
            }
        }
    }

    return m.createResource(m.RESTClient, m.Resource, namespace, obj)
}
  • createResource通过RESTClient来根据资源配置信息生成Request请求并发送到API Server
    • Post()生成一个Post请求体Request初始化结构
    • Resource()设置想要访问的资源
    • Body()合成请求体的body中的内容
    • Do()执行具体的请求,并发送请求返回一个简单的响应Result表示是否请求成功
    • Get()获取响应的状态等信息
func (m *Helper) createResource(c RESTClient, resource, namespace string, obj runtime.Object) (runtime.Object, error) {
    return c.Post().NamespaceIfScoped(namespace, m.NamespaceScoped).Resource(resource).Body(obj).Do().Get()//Post()生成一个Post请求体Request初始化结构,Resource()设置想要访问的资源Body()合成请求体的body中的内容,Do()执行具体的请求,并发送请求返回一个简单的响应Result表示是否请求成功,Get()获取响应的状态等信息
}

//Request结构

type Request struct {
    // required
    client HTTPClient
    verb   string

    baseURL     *url.URL
    content     ContentConfig
    serializers Serializers

    // generic components accessible via method setters
    pathPrefix string
    subpath    string
    params     url.Values
    headers    http.Header

    // structural elements of the request that are part of the Kubernetes API conventions
    namespace    string
    namespaceSet bool
    resource     string
    resourceName string
    subresource  string
    timeout      time.Duration

    // output
    err  error
    body io.Reader

    // This is only used for per-request timeouts, deadlines, and cancellations.
    ctx context.Context

    backoffMgr BackoffManager
    throttle   flowcontrol.RateLimiter
}