首页 » 技术分享 » kubectl源码分析之drain

kubectl源码分析之drain

 

发布一个k8s部署视频:https://edu.csdn.net/course/detail/26967

课程内容:各种k8s部署方式。包括minikube部署,kubeadm部署,kubeasz部署,rancher部署,k3s部署。包括开发测试环境部署k8s,和生产环境部署k8s。

腾讯课堂连接地址https://ke.qq.com/course/478827?taid=4373109931462251&tuin=ba64518

第二个视频发布  https://edu.csdn.net/course/detail/27109

腾讯课堂连接地址https://ke.qq.com/course/484107?tuin=ba64518

介绍主要的k8s资源的使用配置和命令。包括configmap,pod,service,replicaset,namespace,deployment,daemonset,ingress,pv,pvc,sc,role,rolebinding,clusterrole,clusterrolebinding,secret,serviceaccount,statefulset,job,cronjob,podDisruptionbudget,podSecurityPolicy,networkPolicy,resourceQuota,limitrange,endpoint,event,conponentstatus,node,apiservice,controllerRevision等。

第三个视频发布:https://edu.csdn.net/course/detail/27574

详细介绍helm命令,学习helm chart语法,编写helm chart。深入分析各项目源码,学习编写helm插件

第四个课程发布:https://edu.csdn.net/course/detail/28488

本课程将详细介绍k8s所有命令,以及命令的go源码分析,学习知其然,知其所以然

 

加qq群,请联系:


————————————————

加微信群请联系:

type DrainCmdOptions struct {//drain结构体
	PrintFlags *genericclioptions.PrintFlags
	ToPrinter  func(string) (printers.ResourcePrinterFunc, error)

	Namespace string

	drainer   *drain.Helper
	nodeInfos []*resource.Info

	genericclioptions.IOStreams
}
func NewDrainCmdOptions(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *DrainCmdOptions {
	o := &DrainCmdOptions{//初始化drain结构体
		PrintFlags: genericclioptions.NewPrintFlags("drained").WithTypeSetter(scheme.Scheme),
		IOStreams:  ioStreams,
		drainer: &drain.Helper{
			GracePeriodSeconds: -1,
			Out:                ioStreams.Out,
			ErrOut:             ioStreams.ErrOut,
		},
	}
	o.drainer.OnPodDeletedOrEvicted = o.onPodDeletedOrEvicted
	return o
}
//创建drain命令
func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
	o := NewDrainCmdOptions(f, ioStreams)//初始化结构体

	cmd := &cobra.Command{//创建cobra命令
		Use:                   "drain NODE",
		DisableFlagsInUseLine: true,
		Short:                 i18n.T("Drain node in preparation for maintenance"),
		Long:                  drainLong,
		Example:               drainExample,
		Run: func(cmd *cobra.Command, args []string) {
			cmdutil.CheckErr(o.Complete(f, cmd, args))//准备
			cmdutil.CheckErr(o.RunDrain())//运行
		},
	}
	cmd.Flags().BoolVar(&o.drainer.Force, "force", o.drainer.Force, "Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet.")//force选项
	cmd.Flags().BoolVar(&o.drainer.IgnoreAllDaemonSets, "ignore-daemonsets", o.drainer.IgnoreAllDaemonSets, "Ignore DaemonSet-managed pods.")//ignore-daemonsets选项
	cmd.Flags().BoolVar(&o.drainer.DeleteLocalData, "delete-local-data", o.drainer.DeleteLocalData, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).")//delete-local-data选项
	cmd.Flags().IntVar(&o.drainer.GracePeriodSeconds, "grace-period", o.drainer.GracePeriodSeconds, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.")//grace-period选项
	cmd.Flags().DurationVar(&o.drainer.Timeout, "timeout", o.drainer.Timeout, "The length of time to wait before giving up, zero means infinite") //timeout选项
	cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on")//selector选项
	cmd.Flags().StringVarP(&o.drainer.PodSelector, "pod-selector", "", o.drainer.PodSelector, "Label selector to filter pods on the node")//pod-selector选项

	cmdutil.AddDryRunFlag(cmd)//干跑选项
	return cmd
}
//准备方法
func (o *DrainCmdOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error {
	var err error

	if len(args) == 0 && !cmd.Flags().Changed("selector") {//args和selector不能同时不指定
		return cmdutil.UsageErrorf(cmd, fmt.Sprintf("USAGE: %s [flags]", cmd.Use))
	}
	if len(args) > 0 && len(o.drainer.Selector) > 0 {//args和selector不能同时指定
		return cmdutil.UsageErrorf(cmd, "error: cannot specify both a node name and a --selector option")
	}

	o.drainer.DryRun = cmdutil.GetDryRunFlag(cmd)//设置干跑

	if o.drainer.Client, err = f.KubernetesClientSet(); err != nil {//设置clientset
		return err
	}

	if len(o.drainer.PodSelector) > 0 {//如果指定了pod-selector,解析
		if _, err := labels.Parse(o.drainer.PodSelector); err != nil {
			return errors.New("--pod-selector=<pod_selector> must be a valid label selector")
		}
	}

	o.nodeInfos = []*resource.Info{}//构造Node info对象

	o.Namespace, _, err = f.ToRawKubeConfigLoader().Namespace()//设置namespace
	if err != nil {
		return err
	}

	o.ToPrinter = func(operation string) (printers.ResourcePrinterFunc, error) {//printflag转printer函数
		o.PrintFlags.NamePrintFlags.Operation = operation
		if o.drainer.DryRun {
			o.PrintFlags.Complete("%s (dry run)")
		}

		printer, err := o.PrintFlags.ToPrinter()
		if err != nil {
			return nil, err
		}

		return printer.PrintObj, nil
	}

	builder := f.NewBuilder().
		WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
		NamespaceParam(o.Namespace).DefaultNamespace().
		ResourceNames("nodes", args...).
		SingleResourceType().
		Flatten()//用builder构造result对象

	if len(o.drainer.Selector) > 0 {
		builder = builder.LabelSelectorParam(o.drainer.Selector).
			ResourceTypes("nodes")
	}

	r := builder.Do()//构造result对象

	if err = r.Err(); err != nil {
		return err
	}

	return r.Visit(func(info *resource.Info, err error) error {//visit result对象
		if err != nil {
			return err
		}
		if info.Mapping.Resource.GroupResource() != (schema.GroupResource{Group: "", Resource: "nodes"}) {//判断info为node
			return fmt.Errorf("error: expected resource of type node, got %q", info.Mapping.Resource)
		}

		o.nodeInfos = append(o.nodeInfos, info)//append info
		return nil
	})
}
//运行drain
func (o *DrainCmdOptions) RunDrain() error {
	if err := o.RunCordonOrUncordon(true); err != nil {//先执行cordon
		return err
	}

	printObj, err := o.ToPrinter("drained")//printflag转printer
	if err != nil {
		return err
	}

	drainedNodes := sets.NewString()//定义drainedNodes
	var fatal error

	for _, info := range o.nodeInfos {//遍历nodeinfo
		var err error
		if !o.drainer.DryRun {//如果非干跑
			err = o.deleteOrEvictPodsSimple(info)//删除或驱逐pod
		}
		if err == nil || o.drainer.DryRun {//如果是干跑或者错误为空,drainedNode添加
			drainedNodes.Insert(info.Name)
			printObj(info.Object, o.Out)//打印结果
		} else {// 如果非干跑
			fmt.Fprintf(o.ErrOut, "error: unable to drain node %q, aborting command...\n\n", info.Name)//打印错误
			remainingNodes := []string{}
			fatal = err
			for _, remainingInfo := range o.nodeInfos {//遍历Node info
				if drainedNodes.Has(remainingInfo.Name) {//如果drainedNodes包含info则跳过
					continue
				}
				remainingNodes = append(remainingNodes, remainingInfo.Name)//append没有drained的node
			}

			if len(remainingNodes) > 0 {//如果剩余的node大于0
				fmt.Fprintf(o.ErrOut, "There are pending nodes to be drained:\n")
				for _, nodeName := range remainingNodes {//打印提示
					fmt.Fprintf(o.ErrOut, " %s\n", nodeName)
				}
			}
			break
		}
	}

	return fatal
}
//删除或驱逐pod
func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error {
	list, errs := o.drainer.GetPodsForDeletion(nodeInfo.Name)//获取node下的需要删除的pod
	if errs != nil {
		return utilerrors.NewAggregate(errs)
	}
	if warnings := list.Warnings(); warnings != "" {//打印告警
		fmt.Fprintf(o.ErrOut, "WARNING: %s\n", warnings)
	}

	if err := o.drainer.DeleteOrEvictPods(list.Pods()); err != nil {//执行删除或驱逐pod
		pendingList, newErrs := o.drainer.GetPodsForDeletion(nodeInfo.Name)//获取没有被删除或驱逐的pod

		fmt.Fprintf(o.ErrOut, "There are pending pods in node %q when an error occurred: %v\n", nodeInfo.Name, err)//打印告警
		for _, pendingPod := range pendingList.Pods() {//打印pod pending
			fmt.Fprintf(o.ErrOut, "%s/%s\n", "pod", pendingPod.Name)
		}
		if newErrs != nil {
			fmt.Fprintf(o.ErrOut, "following errors also occurred:\n%s", utilerrors.NewAggregate(newErrs))
		}
		return err
	}
	return nil
}
//执行删除或驱逐pod
func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error {
	if len(pods) == 0 {//pod为空返回
		return nil
	}

	policyGroupVersion, err := CheckEvictionSupport(d.Client)//判断是否支持evict
	if err != nil {
		return err
	}

	// TODO(justinsb): unnecessary?
	getPodFn := func(namespace, name string) (*corev1.Pod, error) {//获取pod的函数
		return d.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
	}

	if len(policyGroupVersion) > 0 {//如果支持驱逐
		return d.evictPods(pods, policyGroupVersion, getPodFn)//驱逐pod
	}

	return d.deletePods(pods, getPodFn)//删除pod
}
//驱逐pod
func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
	returnCh := make(chan error, 1)//创建return channel

	for _, pod := range pods {//遍历pod
		go func(pod corev1.Pod, returnCh chan error) {//运行go routine
			for {
				fmt.Fprintf(d.Out, "evicting pod %q\n", pod.Name)//打印驱逐pod
				err := d.EvictPod(pod, policyGroupVersion)//执行驱逐pod
				if err == nil {//如果error为nil,跳出循环
					break
				} else if apierrors.IsNotFound(err) {
					returnCh <- nil//如果报没找到错误,往channel里放nil
					return
				} else if apierrors.IsTooManyRequests(err) {//如果错误是太多请求
					fmt.Fprintf(d.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", pod.Name, err)
					time.Sleep(5 * time.Second)//休息5秒
				} else {
					returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)//往channel放一个错误
					return
				}
			}
			_, err := waitForDelete([]corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn, d.OnPodDeletedOrEvicted)//等待pod删除完成
			if err == nil {//error为空,往channel放nil
				returnCh <- nil
			} else {//否则往channel放错误
				returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
			}
		}(pod, returnCh)//调用go routine
	}

	doneCount := 0//完成数量
	var errors []error

	// 0 timeout means infinite, we use MaxInt64 to represent it.
	var globalTimeout time.Duration//全局超时时间
	if d.Timeout == 0 {//如果没有指定超时,则为最大整数
		globalTimeout = time.Duration(math.MaxInt64)
	} else {//如果指定了超时,则指定超时时间
		globalTimeout = d.Timeout
	}
	globalTimeoutCh := time.After(globalTimeout)//超时channel
	numPods := len(pods)//需要evit的pod数量
	for doneCount < numPods {//如果完成数量不等于需要完成的数量,则一直循环
		select {
		case err := <-returnCh://从channel里获取值
			doneCount++//完成数量加1
			if err != nil {//如果有错误append错误
				errors = append(errors, err)
			}
		case <-globalTimeoutCh://如果超时,返回错误
			return fmt.Errorf("drain did not complete within %v", globalTimeout)
		}
	}
	return utilerrors.NewAggregate(errors)
}
//执行删除pod
func (d *Helper) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
	// 0 timeout means infinite, we use MaxInt64 to represent it.
	var globalTimeout time.Duration//设置超时时间
	if d.Timeout == 0 {
		globalTimeout = time.Duration(math.MaxInt64)
	} else {
		globalTimeout = d.Timeout
	}
	for _, pod := range pods {//遍历pod执行删除
		err := d.DeletePod(pod)
		if err != nil && !apierrors.IsNotFound(err) {
			return err
		}
	}
	_, err := waitForDelete(pods, 1*time.Second, globalTimeout, false, getPodFn, d.OnPodDeletedOrEvicted)//等待删除完成
	return err
}

 

 

 

 

 

 

 

 

 

 

转载自原文链接, 如需删除请联系管理员。

原文链接:kubectl源码分析之drain,转载请注明来源!

0