蛮荆

Kubernetes Daemonset 设计与实现

2023-12-18

概述

DaemonSet 确保全部节点或者部分节点 (取决于 Pod 的标签和亲和性、污点等特征) 上始终运行一个指定 Pod。

当有新的节点加入集群后,指定的 Pod 就会在新加入的节点上运行,当节点从集群中移除时,指定的 Pod 同样会被回收。

DaemonSet 的使用方法和最佳实践在 这篇文章 中已经介绍过了,这里不再赘述,本文着重从源代码的角度分析一下 DaemonSet 的实现原理。

示例

# 官方示例 controllers/daemonset.yaml

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluentd-elasticsearch
  namespace: kube-system
  labels:
    k8s-app: fluentd-logging
spec:
  selector:
    matchLabels:
      name: fluentd-elasticsearch
  template:
    metadata:
      labels:
        name: fluentd-elasticsearch
    spec:
      containers:
      - name: fluentd-elasticsearch
        image: k8s.gcr.io/fluentd-elasticsearch:1.20
        volumeMounts:
        - name: varlog
          mountPath: /var/log
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
      tolerations:
      # 容忍度,这里省略,表示在所有节点上创建
      volumes:
      - name: varlog
        hostPath:
          path: /var/log

基于 YAML 文件创建 DaemonSet:

$ kubectl apply -f https://k8s.io/examples/controllers/daemonset.yaml

在 Kubernetes 中执行上面的代码后,会在 kube-system 命名空间下面创建 DaemonSet 资源,并在集群中的所有节点上创建包含 fluentd-elasticsearch 容器的 Pod。

假设集群中有 3 个节点,对应的拓扑结构如下:

DaemonSet 拓扑结构


源码说明

本文着重从源代码的角度分析一下 DaemonSet 的实现原理,DaemonSet 功能对应的源代码位于 Kubernetes 项目的 pkg/controller/daemon/ 目录,本文以 Kubernetes v1.28 版本源代码进行分析。

DaemonSet 源代码目录

流程图

DaemonSet 执行流程图

下面我们跟着流程图一起看下源代码的具体实现。

DaemonSetsController

首先来看看 DaemonSetsController 控制器对象,该对象是实现 DaemonSet 功能的核心对象。

// DaemonSetsController 负责将 DaemonSet 对象对应的 Pod 调整到定义的期望状态
type DaemonSetsController struct {
    // kubelet 客户端对象
    // 用于执行各项类似 "kubectl ..." 操作
	kubeClient clientset.Interface
	
    // Pod 操作对象
	// 用于对 Pod 进行各项操作,例如创建/删除 等
	podControl controller.PodControlInterface

	// 在创建/删除指定数量的副本之后,DaemonSet 将会临时挂起
	// 在监听到指定事件后恢复正常
	burstReplicas int

	// 同步回调方法
	// 同时方便在单元测试中注入 Mock
	syncHandler func(ctx context.Context, dsKey string) error

    // 缓存对象
    // 记录每个 DaemonSet 需要创建/删除的 Pod
    // 每轮同步过程中,对于 创建/删除 操作失败的 Pod 数量,都会记录起来
    //   等到下一轮同步时继续执行相关的操作
    //   直到 Pod 数量副本达到期望状态
	expectations controller.ControllerExpectationsInterface
	
	// DaemonSet 列表
	dsLister appslisters.DaemonSetLister
	dsStoreSynced cache.InformerSynced
	
    // Pod 列表
	podLister corelisters.PodLister
	podStoreSynced cache.InformerSynced
	
	// Node 列表
	nodeLister corelisters.NodeLister
	nodeStoreSynced cache.InformerSynced

	// 队列中存储发生了变化 (需要同步) 的 DaemonSet
	queue workqueue.RateLimitingInterface
}

初始化

NewDaemonSetsController 方法用于 DaemonSetsController 控制器对象的初始化工作,并返回一个实例化对象。

func NewDaemonSetsController(...) (*DaemonSetsController, error) {
	...
	
	dsc := &DaemonSetsController{
		...
	}

	// 增加 DaemonSets informer 监听回调方法
	daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		...
	})
	
	...

    // 增加 Pod informer 监听回调方法
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        ...
	})

	...
	
	// 增加 Node informer 监听回调方法
	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		...
	})
	
	...

	return dsc, nil
}

启动控制器

根据控制器的初始化方法 NewDaemonSetsController 的调用链路,可以找到控制器开始启动和执行的地方。

// cmd/kube-controller-manager/app/apps.go

func startDaemonSetController(ctx context.Context, ...) (...) {
	// 初始化 DaemonSetController 对象实例 
	dsc, err := daemon.NewDaemonSetsController(
        ...
	)

    // 启动一个单独的 goroutine 来完成 {初始化 && 运行}
	go dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs))
	
	return nil, true, nil
}

具体逻辑方法

DaemonSetsController.Run 方法执行具体的初始化逻辑。

func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
    ...

	// (根据参数配置) 启动多个 goroutine 处理逻辑 (默认为 2 个)
	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, dsc.runWorker, time.Second)
	}

	<-ctx.Done()
}

DaemonSetsController.runWorker 方法本质上就是一个无限循环轮询器,不断从队列中取出 DaemonSet 对象,然后进行对应的操作, 内部无限循环调用 processNextWorkItem 方法。

func (dsc *DaemonSetsController) runWorker(ctx context.Context) {
	for dsc.processNextWorkItem(ctx) {
	}
}

DaemonSetsController.processNextWorkItem 方法的核心操作就是三板斧: 队列取对象 -> 对象同步 -> 对象放回队列。

func (dsc *DaemonSetsController) processNextWorkItem(ctx context.Context) bool {
	// 从队列获取 DaemonSet
	// 获取到的对象可以编码为一个字符串 key
	dsKey, quit := dsc.queue.Get()
	
	...
	
	// 调用回调方法,默认也就是 syncDaemonSet 方法
	err := dsc.syncHandler(ctx, dsKey.(string))
	if err == nil {
		// 如果执行同步的回调方法一切正常
		// 将当前 DaemonSet 踢出队列
		dsc.queue.Forget(dsKey)
		return true
	}

	// 如果执行同步的回调方法返回 error
	// 将当前 DaemonSet 重新放入队列
	dsc.queue.AddRateLimited(dsKey)

	return true
}

DaemonSet 同步

DaemonSetsController 的回调处理方法默认就是 syncDaemonSet 方法,该方法是所有 DaemonSet 同步操作的入口方法。

func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) error {
	...

	// 通过 key 解析出 DaemonSet 对象对应的 命名空间和名称
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	
    // 获取 DaemonSet 对象
	ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
	
	// 获取 Node 列表
	nodeList, err := dsc.nodeLister.List(labels.Everything())
	
	...

	// 获取 DaemonSet 对象的字符串编码表示
	dsKey, err := controller.KeyFunc(ds)

	// 如果 DaemonSet 对象已经被标记为删除
	// 直接返回
	if ds.DeletionTimestamp != nil {
		return nil
	}

	// 检测 DaemonSet 对象是否需要同步
	if !dsc.expectations.SatisfiedExpectations(logger, dsKey) {
		// 如果不需要同步,只更新状态即可
		return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
	}
	
	// 同步 DaemonSet 对象
	err = dsc.updateDaemonSet(ctx, ds, nodeList, hash, dsKey, old)
	
	// 更新 DaemonSet 对象的状态
	statusErr := dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
	
    ...
}

通过 DaemonSetController.syncDaemonSet 方法的源代码,我们可以看到: DaemonSet 对象每次同步时,都会执行如下的操作:

  1. 根据参数 key 获取指定的 DaemonSet 对象
  2. 获取所有 Node 列表
  3. 检测 DaemonSet 对象是否需要同步
  4. 同步 DaemonSet
  5. 更新 DaemonSet 状态

其中,SatisfiedExpectations 方法在 ReplicaSet 实现原理一文中已经做过分析,本文不再赘述。

有了上面的这个大致的逻辑框架,接下来我们逐个分析对应的单个方法实现即可。


执行同步入口方法

DaemonSetsController.updateDaemonSet 方法是 DaemonSet 对象同步的入口方法,如果 DaemonSet 对象需要同步,就调用该方法执行具体操作。

func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ...) error {
	// 调用 manage 方法执行同步
	err := dsc.manage(ctx, ds, nodeList, hash)
	
	// 如果 DaemonSet 对象需要同步
	if dsc.expectations.SatisfiedExpectations(klog.FromContext(ctx), key) {
		// 执行滚动更新
		switch ds.Spec.UpdateStrategy.Type {
		case apps.OnDeleteDaemonSetStrategyType:
		case apps.RollingUpdateDaemonSetStrategyType:
			err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
		}
	}

	...
}

计算创建/删除 Pod 列表

DaemonSetsController.manage 方法用于将 DaemonSet 对象关联的 Pod 调度到指定的 Node 上面运行,在确定需要运行 Pod 列表和 Node 列表之后, 调用 syncNodes 方法执行具体的同步操作,将应该运行但是尚未运行的 Pod 启动运行,将正在运行但是状态异常 (不需要运行) 的 Pod 删除。

func (dsc *DaemonSetsController) manage(ctx context.Context, ...) error {
	// 获取 Node 和 DaemonSet 关联的 Pod 之间的映射关系
	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)

	// 对于每个 Node 来说
	//   1. 遍历 Pod 列表
	//   2. 如果当前 Pod 不应该运行在该节点上,执行删除
	//   3. 如果当前 Pod 应该运行在该节点上,但是还未运行,那么启动该 Pod
	var nodesNeedingDaemonPods, podsToDelete []string
	for _, node := range nodeList {
		// 计算当前节点需要创建的 Pod 列表和需要删除的 Pod 列表
		nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(...)

		// 更新需要创建的 Pod 列表
		nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
		// 更新需要删除的 Pod 列表
		podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
	}
	
	// 删除因为 {分配到不存在的 Node 上面} 而 {导致无法被调度的} Pod
	//   如果 Node 已经不存在了,那么 Pod 将永远不会被调度
	//   但是因为 Pod 和 Node 之间还有附属关系存在,所以无法被 GC 控制器自动回收
	podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)

	// 调用 syncNodes 方法执行同步操作
	if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
		return err
	}

	return nil
}

执行同步操作

DaemonSetsController.syncNodes 方法用于执行同步操作,根据参数 DaemonSet 对象创建缺少的 Pod, 同时删除多余的 Pod

func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet ...) error {
	...

	createDiff := len(nodesNeedingDaemonPods)
	deleteDiff := len(podsToDelete)

	// 修正单次创建操作最大数量
	if createDiff > dsc.burstReplicas {
		createDiff = dsc.burstReplicas
	}
	// 修正单次删除操作最大数量
	if deleteDiff > dsc.burstReplicas {
		deleteDiff = dsc.burstReplicas
	}

	// 更新 DaemonSet 关联的 Expectations 对象 (创建数量、删除数量)
	dsc.expectations.SetExpectations(logger, dsKey, createDiff, deleteDiff)

	// error channel: 接收 error + 信号量 合二为一
	errCh := make(chan error, createDiff+deleteDiff)
	
	// 并发创建控制 Wg 对象
	createWait := sync.WaitGroup{}
	
	// 获取创建 Pod 模板
	template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
	
	// 开始批量创建 Pod
	//   每次创建成功后,下一轮创建的 Pod 数量以指数级进行增长 (1, 2, 4, 8 ...)
	//   参考了 TCP 的 “慢启动” 方式
	batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
	for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
		errorCount := len(errCh)
		
		// 并发 goroutine 执行
		createWait.Add(batchSize)
		
		for i := pos; i < pos+batchSize; i++ {
			go func(ix int) {
				defer createWait.Done()
				
				podTemplate := template.DeepCopy()
				// 默认调度器将 Pod 和 Node 绑定时会更新 Pod 的亲和性规则
				podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(...)

				// 执行创建 Pod 操作
				err := dsc.podControl.CreatePods(ctx, ...)
				
				if err != nil {
					// 创建 Pod 失败后
					// 更新 DaemonSet 关联的 Expectations 对象中的字段
					dsc.expectations.CreationObserved(logger, dsKey)
					// 将 error 发送到 channel
					errCh <- err
				}
			}(i)
		}
		
		// 等待并发执行结束
		createWait.Wait()
		
		// 创建失败的 Pod 数量 = createDiff - 创建成功的 Pod 数量
		// 创建失败的 Pod 会在下一轮同步中从 Expectations 取出继续创建 
		skippedPods := createDiff - (batchSize + pos)
		
		...
	}

    // 并发删除控制 Wg 对象
	deleteWait := sync.WaitGroup{}
	// 并发 goroutine 执行
	deleteWait.Add(deleteDiff)
	
	for i := 0; i < deleteDiff; i++ {
		go func(ix int) {
			defer deleteWait.Done()
			
			// 执行删除 Pod 操作
			if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
				// 删除 Pod 失败后
				// 更新 DaemonSet 关联的 Expectations 对象中的字段
				dsc.expectations.DeletionObserved(logger, dsKey)
				if !apierrors.IsNotFound(err) {
                    // 将 error 发送到 channel
					errCh <- err
				}
			}
		}(i)
	}
	
	// 等待并发执行结束
	deleteWait.Wait()

	// 将 error 打包返回
	errors := []error{}
	close(errCh)
	for err := range errCh {
		errors = append(errors, err)
	}
	return utilerrors.NewAggregate(errors)
}

创建 Pod

从上面的源代码可以看到,创建 PodRealPodControl.CreatePods 方法完成,该方法内部执行有两个调用链,第一步创建 PodNode 之间的附属关系, 第二步通过 KubeClient 创建 Pod

func (r RealPodControl) CreatePods(ctx context.Context, ...) error {
    return r.CreatePodsWithGenerateName(ctx, ...)
}

第一步

func (r RealPodControl) CreatePodsWithGenerateName(ctx context.Context, ...) error {
	// 创建 Pod 和 Node 的附属关系
	pod, err := GetPodFromTemplate(template, controllerObject, controllerRef)
	
	...
	
	// 创建 Pod
	return r.createPods(ctx, namespace, pod, controllerObject)
}

第二步

func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v1.Pod, object runtime.Object) error {
    // 通过 KubeClient 创建 Pod
	newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})

	..
}

删除 Pod

删除 Pod 的调用链路相对简单,直接调用 DeletePod 方法删除即可。

func (r RealPodControl) DeletePod(ctx context.Context, ...) error {
	...
	
	// 通过 KubeClient 删除 Pod
	if err := r.KubeClient.CoreV1().Pods(namespace).Delete(ctx, podID, metav1.DeleteOptions{}); err != nil {
        ...
	}
	
	...
}

滚动更新

DaemonSetsController.rollingUpdate 方法用于执行 DaemonSet 对象关联的 Pod 的滚动更新操作,内部流程大致如下:

  1. 获取 DaemonSet, Node, Pod 三者之间的映射关系
  2. 根据 DaemonSet 配置规格信息计算出执行 创建/删除 操作时的几个限制条件参数 (例如 Pod 最大不可用数量)
  3. 根据限制条件计算出需要 创建和删除 的 Pod、Pod 和 Node 之间的附属关系、需要删除的 Node 列表
  4. 调用 syncNodes 方法进行同步

从上面的流程描述中可以看到,rollingUpdate 只是获取并计算对应的状态,最后的执行操作还是交给 syncNodes 方法来完成, 为了提高阅读效率,这里直接将计算规则写在了下面的源代码中。

func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ...) error {
	// 获取 Node 和 DaemonSet 关联的 Pod 之间的映射关系
	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ...)

	// 计算 Pod 的升级中最大数量,最大不可用数量,期望调度数量
	maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ...)

	// 如果 Pod 最大数量 maxSurge 未设置
	// 在保证 Pod 最大不可用数量的前提下尽可能删除旧的 Pod
	// 最终使 Pod 数量满足期望状态

	// 这里对 DaemonSet 控制器同步逻辑做如下假设:
	//   不允许每个节点上面超过一个 Pod
	//   自动创建新的 Pod
	//   自动处理操作失败的 Pod
	//   删除 Pod 时,不超过最大不可用数量 maxUnavailable
	// 同时 Pod 满足如下条件:
	//   新创建的 Pod 的不可用数量必须低于 maxUnavailable
	//   如果 Node 上面有运行的旧的 Pod, 则新一轮调度会尽可能不将 Pod 调度在该节点上面
	if maxSurge == 0 {
		...
		
		// 遍历 Pod 列表
		for nodeName, pods := range nodeToDaemonPods {
			// 获取新的 Pod 列表和 旧的 Pod
			newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
			
			...
			
			switch {
			case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil:
				// Pod 不可用
				numUnavailable++
			case newPod != nil:
				if !podutil.IsPodAvailable(newPod, ...) {
					// 虽然 Pod 是新创建的,但是依旧不可用
					numUnavailable++
				}
			default:
				// Pod 是旧的
				switch {
				case !podutil.IsPodAvailable(oldPod, ...):
					// Pod 不可用,需要替换为新的
					allowedReplacementPods = append(allowedReplacementPods, oldPod.Name)
				case numUnavailable >= maxUnavailable:
					// Pod 不可用数量超出限制了
					continue
				default:
                    // Pod 需要被删除
					candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name)
				}
			}
		}
		
		...
		
		// 执行同步操作,创建/删除 对应数量的 Pod
		// 使 Pod 数量满足期望状态
		return dsc.syncNodes(ctx, ...)
	}

	// ---------------------------------------------------------------------------------------------- //
	
	
	// 如果 Pod 最大数量 maxSurge 已经设置
	// 就将旧的不可用 Pod 删除,然后创建对应数量的新的 Pod
	
	// 这里对 DaemonSet 控制器同步逻辑做如下假设:
	//   不允许每个节点上面超过两个 Pod (最多一个新的 Pod, 一个旧的 Pod)
	//   当节点上面不存在 Pod 时可以创建新的
	//   自动处理操作失败的 Pod
	//   删除 Pod 时,不超过最大不可用数量 maxUnavailable
	// 同时 Pod 满足如下条件:
	//   一个 Node 上面如果有不可用的 Pod, 那么就可以创建新的 Pod
	//   一个 Node 上面如果新的 Pod 已经创建完成并且状态为可用,那么就可以把旧的 Pod 删除了
	
	...
	
	// 遍历 Pod 列表
	for nodeName, pods := range nodeToDaemonPods {
		// 获取新的 Pod 列表和 旧的 Pod
		newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
	    
		...

		switch {
		case oldPod == nil:
			// Node 上面没有 Pod, 直接跳过
		case newPod == nil:
			switch {
			case !podutil.IsPodAvailable(oldPod, ...):
				...
				
				// Node 上面没有任何运行的 Pod
				// 那么该 Node 上面可以直接创建新的 Pod
				allowedNewNodes = append(allowedNewNodes, nodeName)
			default:
				...
				
				// Node 上面有一个运行的旧的 Pod
				// 那么可以将该 Node 加入候选节点
				candidateNewNodes = append(candidateNewNodes, nodeName)
			}
		default:
			...
			
			// Node 上面的 Pod 数量已经超过限制了
			// 那么将该 Node 加入删除列表
			//   也就不会有 Pod 被调度到该 Node 上面了
			oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
		}
	}
	
	...

    // 执行同步操作,创建/删除 对应数量的 Pod
    // 使 Pod 数量满足期望状态
	return dsc.syncNodes(ctx, ds, oldPodsToDelete, newNodesToCreate, hash)
}

更新状态

DaemonSetsController.updateDaemonSetStatus 方法用于更新 DaemonSet 对象的相关状态字段,这其中包括:

  • 期望被调度的 Node 数量
  • 当前已调度的 Node 数量
  • 正在非正常运行 Pod (已经运行 Pod, 但是不应该运行 Pod) 的 Node 数量
  • 正在正常运行 Pod 的 Node 数量
  • 已经完成滚动升级的 Node 数量
  • 应该运行 Pod 并且可用的 Node 数量
func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ...) error {
	// 获取 Node 和 DaemonSet 关联的 Pod 之间的映射关系
	nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
	
	...
	
	// 遍历 Node 列表
	for _, node := range nodeList {
		...
	}
	numberUnavailable := desiredNumberScheduled - numberAvailable

	// 更新 DaemonSet 状态
	err = storeDaemonSetStatus(ctx, ...)

	// 当 Pod 的数量不满足期望状态时
	// 将 DaemonSet 加入队列
	if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable {
		dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second)
	}
	return nil
}

小结

DaemonSet 控制器执行流程图

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。