蛮荆

Kubernetes Deployment 滚动更新实现原理

2023-12-28

概述

一个 Deployment 为 Pod 和 ReplicaSet 提供声明式的更新能力。

生产环境中,通常将无状态应用定义为 Deployment 以便于完成对应的滚动更新、水平扩容、自动回滚等典型业务场景。

Deployment 本质上是对 ReplicaSet 的升级,那么为什么不直接使用 ReplicaSet 呢?因为 ReplicaSet 的功能设计比较底层 (或者也可以说比较基础), ReplicaSet 只是为了维护保证一组 Pod 处于运行状态 (只能指定 Pod 的期望副本数量),无法完成水平扩容等高级特性。

Deployment 拓扑结构

示例

# 官方示例 controllers/nginx-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: nginx-deployment
  labels:
    app: nginx
spec:
  replicas: 3 # 副本数量,可以根据实际情况修改
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers:
      - name: nginx
        image: nginx:1.14.2
        ports:
        - containerPort: 80

在 Kubernetes 中执行上面的代码后,不仅会创建 1 个 Deployment 对象,还会创建 1 个对应的 ReplicaSet 对象和 3 个 Pod 对象。


源码说明

本文着重从源代码的角度分析一下 Deployment 的实现原理,Deployment 功能对应的源代码位于 Kubernetes 项目的 pkg/controller/deployment/ 目录,本文以 Kubernetes v1.28 版本源代码进行分析。

Deployment 源代码目录

流程图

Deployment 控制器执行流程图

下面我们跟着流程图一起看下源代码的具体实现。


DeploymentController

首先来看看 DeploymentController 控制器对象,该对象是实现 Deployment 功能的核心对象。

// DeploymentController 负责将 Deployment 对象对应的 Pod 调整到定义的期望状态
type DeploymentController struct {
	...

	// 回调方法
	// 同时方便在单元测试中注入 Mock
	syncHandler func(ctx context.Context, dKey string) error
	
	// Deployment 列表
	dLister appslisters.DeploymentLister
    // ReplicaSet 列表
	rsLister appslisters.ReplicaSetLister
	// Pod 列表
	podLister corelisters.PodLister

	// 队列中存储发生了变化 (需要同步) 的 Deployment
	queue workqueue.RateLimitingInterface
}

初始化

NewDeploymentController 方法用于 DeploymentController 控制器对象的初始化工作,并返回一个实例化对象。

func `NewDeploymentController`(ctx context.Context, ...) (*DeploymentController, ...) {
    ...
	
	dc := &DeploymentController{
		...
		queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "deployment"),
	}
	
	...

	// 增加 Deployment informer 监听回调方法
	dInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        ...
	})
	
	// 增加 ReplicaSet informer 监听回调方法
	rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		...
	})
	
	// 增加 Pod informer 监听回调方法
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		...
	})

	// 注册回调方法
	// 默认为 DeploymentController 对象的 syncDeployment 方法
	// 在单元测试中,也可以通过参数的注入,完成 Mock
	dc.syncHandler = dc.syncDeployment
	
	...
	
	return dc, nil
}

启动控制器

根据控制器的初始化方法 NewDeploymentController 的调用链路,可以找到控制器开始启动和执行的地方。

// cmd/kube-controller-manager/app/apps.go

func startDeploymentController(ctx context.Context, ...) (...) {
	dc, err := deployment.NewDeploymentController(
        ...
	)

	// 启动一个单独的 goroutine 来完成 {初始化 && 运行}
	go dc.Run(ctx, int(controllerContext.ComponentConfig.DeploymentController.ConcurrentDeploymentSyncs))
	
	
	return nil, true, nil
}

根据源文件代码可以看到,StatefulSetControllerReplicaSetController 等控制器的启动方法也在该文件中。

控制器启动-源代码文件

具体逻辑方法

DeploymentController.Run 方法执行具体的初始化逻辑。

func (dc *DeploymentController) Run(ctx context.Context, workers int) {
	...

	// (根据参数配置) 启动多个 goroutine 处理逻辑 (默认为 5 个)
	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, dc.worker, time.Second)
	}

	<-ctx.Done()
}

DeploymentController.worker 方法本质上就是一个无限循环轮询器,不断从队列中取出 Deployment 对象,然后进行对应的操作。

func (dc *DeploymentController) worker(ctx context.Context) {
	// 内部调用 processNextWorkItem 方法
	for dc.processNextWorkItem(ctx) {
	}
}

func (dc *DeploymentController) processNextWorkItem(ctx context.Context) bool {
	// 从队列获取 Deployment
	// 获取到的对象可以编码为一个字符串 key
	key, quit := dc.queue.Get()

	// 调用回调方法,默认也就是 syncDeployment 方法
	err := dc.syncHandler(ctx, key.(string))

	...
}

Deployment 同步

DeploymentController 的回调处理方法默认就是 DeploymentController.syncDeployment 方法,也就是说,该方法是所有 Deployment 操作的入口方法。

func (dc *DeploymentController) syncDeployment(ctx context.Context, key string) error {
	// 通过 key 解析出 Deployment 对象对应的 命名空间和名称
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	
	...

	// 获取 Deployment 对象
	deployment, err := dc.dLister.Deployments(namespace).Get(name)
	
    // 深度拷贝一个对象
	// 在方法内部作为局部变量使用
	d := deployment.DeepCopy()
	
	...

	// 获取 Deployment 对应的 ReplicaSet 列表
	rsList, err := dc.getReplicaSetsForDeployment(ctx, d)
	
	// 获取 Deployment 对应的 Pod 列表
	// 返回结果会根据 ReplicaSet 进行分组
	podMap, err := dc.getPodMapForDeployment(d, rsList)

	// Deployment 需要删除
	if d.DeletionTimestamp != nil {
		// 执行删除操作,然后直接返回
		return dc.syncStatusOnly(ctx, d, rsList)
	}

	// 检测 Deployment 是否处于暂停 (Pause) 状态
	if err = dc.checkPausedConditions(ctx, d); err != nil {
		return err
	}
	// 如果 Deployment 处于暂停状态
	// 同步状态后直接返回
	if d.Spec.Paused {
		return dc.sync(ctx, d, rsList)
	}

	// 当前是回滚操作
	//   如果 Deployment 对应的底层 ReplicaSet 使用新的版本更新,那么回滚操作不能重复执行
	//   所以应该确保 Deployment 先清理了历史版本,然后再执行回滚 ues.
	if getRollbackTo(d) != nil {
		return dc.rollback(ctx, d, rsList)
	}

	// 检测 Deployment 是否需要扩容
	scalingEvent, err := dc.isScalingEvent(ctx, d, rsList)
	// 同步状态后直接返回
	if scalingEvent {
		return dc.sync(ctx, d, rsList)
	}

	// Deployment 有两种更新策略
	switch d.Spec.Strategy.Type {
	case apps.RecreateDeploymentStrategyType:
		// 1. 全量更新 (先 kill 所有已存在的 Pod, 然后启动新的 Pod)
		// 这个一般应用场景很少,所以本文直接跳过,不做源代码分析了
		return dc.rolloutRecreate(ctx, d, rsList, podMap)
	case apps.RollingUpdateDeploymentStrategyType:
		// 2. 滚动更新
		return dc.rolloutRolling(ctx, d, rsList)
	}
	
	...
}

通过 DeploymentController.syncDeployment 方法的源代码,我们可以看到: Deployment 每次同步时都会执行如下的操作:

  1. 根据参数 key 获取指定的 Deployment
  2. 获取 Deployment 对应的 ReplicaSet 列表和 Pod 列表
  3. 检测并在必要时删除 Deployment
  4. 检测并在必要时暂停 Deployment
  5. 检测并在必要时回滚 Deployment
  6. 检测并在必要时扩容|缩容 Deployment
  7. 检测并在必要时更新 Deployment (滚动更新|全量更新)

需要注意的是,每次同步时只会有一种操作进行,例如如果执行了扩容操作,那么就不会执行更新操作了,其他类型的操作以此类推。

有了上面的这个大致的逻辑框架,接下来我们逐个分析对应的单个方法实现即可,限于篇幅,本文主要分析 Deployment 扩容|缩容、滚动更新、删除这三个操作的具体实现。


Deployment 扩容|缩容

Deployment 的扩容|缩容操作总共分为 3 步,下面逐个步骤来看看。

1. 检测是否为 扩容|缩容 操作

检测 扩容|缩容 操作主要由 DeploymentController.DeploymentController 方法完成。

func (dc *DeploymentController) isScalingEvent(ctx context.Context, ...) (bool, error) {
	// 获取所有新的 ReplicaSet 列表和旧的 ReplicaSet 列表
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
	// 所有 ReplicaSet 集合
	allRSs := append(oldRSs, newRS)

	// 遍历所有拥有 Pod 的 ReplicaSet 列表 (rs.Spec.Replicas > 0)
	for _, rs := range controller.FilterActiveReplicaSets(allRSs) {
		// 计算期望的 Pod 副本数量
		desired, ok := deploymentutil.GetDesiredReplicasAnnotation(logger, rs)
		// 如果期望 Pod 副本数量和定义的副本数量不同,就需要扩容操作
		if desired != *(d.Spec.Replicas) {
			return true, nil
		}
	}
	return false, nil
}

2. 同步 扩容|缩容 操作

func (dc *DeploymentController) sync(ctx context.Context, ...) error {
    // 获取所有新的 ReplicaSet 列表和旧的 ReplicaSet 列表
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)

	// 执行 扩容|缩容 操作
	if err := dc.scale(ctx, d, newRS, oldRSs); err != nil {
		return err
	}
	
	...

    // 所有 ReplicaSet 集合
	allRSs := append(oldRSs, newRS)
	// 更新 ReplicaSet 状态
	return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
}

3. 具体执行 扩容|缩容 操作

为了降低扩容|缩容过程中可能引发的节点性能故障风险,Kubernetes 不会直接 扩容|缩容 到目标的副本数量,因为这样可能引发 “不可用副本数量急剧增加” 现象, 所以会逐步按照比例进行,类似滚动升级的设计理念。

func (dc *DeploymentController) scale(ctx context.Context, deployment *apps.Deployment, newRS *apps.ReplicaSet, oldRSs []*apps.ReplicaSet) error {
    ...

	// 如果 Deployment 配置的升级模式为滚动升级
	// 按照比例伸缩所有 ReplicaSet (包括新的和旧的)
	if deploymentutil.IsRollingUpdate(deployment) {
		allRSs := controller.FilterActiveReplicaSets(append(oldRSs, newRS))
		allRSsReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs)

		// 初始化 Pod 最大副本数量
		allowedSize := int32(0)
		if *(deployment.Spec.Replicas) > 0 {
			// Pod 最大副本数量 = Deployment 预期副本数量 + 滚动升级模式 Pod 最大数量
			//     (也就是说可以创建的超出期望的 Pod 数量)
			allowedSize = *(deployment.Spec.Replicas) + deploymentutil.MaxSurge(*deployment)
		}

		// 需要增加伸缩的 Pod 数量
		deploymentReplicasToAdd := allowedSize - allRSsReplicas
		
		var scalingOperation string
		switch {
		case deploymentReplicasToAdd > 0:
			// 如果 Pod 数量大于 0,需要扩容
			// 对 ReplicaSet 进行排序,让新的 ReplicaSet 靠前,扩容时以新的 ReplicaSet 为基准扩容 Pod
			sort.Sort(controller.ReplicaSetsBySizeNewer(allRSs))
			scalingOperation = "up"

		case deploymentReplicasToAdd < 0:
			// 如果 Pod 数量小于 0,需要缩容
			// 对 ReplicaSet 进行排序,让旧的 ReplicaSet 靠前,缩容时先删除旧的 Pod
			sort.Sort(controller.ReplicaSetsBySizeOlder(allRSs))
			scalingOperation = "down"
		}
		
		// deploymentReplicasAdded 表示已经累计 Pod 数量
		// 该值不能超过 deploymentReplicasToAdd
		deploymentReplicasAdded := int32(0)
		
		nameToSize := make(map[string]int32)
		
		// 遍历所有 ReplicaSet
		for i := range allRSs {
			rs := allRSs[i]
			
			if deploymentReplicasToAdd != 0 {
				// 如果 ReplicaSet 需要扩容
				// 粗略估算出 ReplicaSet 需要的目标副本数量
				proportion := deploymentutil.GetProportion(logger, rs, *deployment, deploymentReplicasToAdd, deploymentReplicasAdded)
				// 更新 ReplicaSet 的目标副本数量
				nameToSize[rs.Name] = *(rs.Spec.Replicas) + proportion
				// 更新已经累计的 Pod 数量
				deploymentReplicasAdded += proportion
			} else {
				// 如果 ReplicaSet 不需要扩容
				// 那么 ReplicaSet 的目标副本数量就是其定义的期望副本数量
				nameToSize[rs.Name] = *(rs.Spec.Replicas)
			}
		}

		// 遍历所有 ReplicaSet
		for i := range allRSs {
			rs := allRSs[i]
			
			// 如果需要增加伸缩的 Pod 数量依然不等于 0 
			// 也就意味着仍然有 ReplicaSet 需要伸缩
			// 此时直接将剩余的伸缩数量给到列表中的第一个 (最新的,因为上面排序过了) ReplicaSet 即可
			if i == 0 && deploymentReplicasToAdd != 0 {
				leftover := deploymentReplicasToAdd - deploymentReplicasAdded
				nameToSize[rs.Name] = nameToSize[rs.Name] + leftover
				
				// 缩容最小至 0 
				if nameToSize[rs.Name] < 0 {
					nameToSize[rs.Name] = 0
				}
			}

			// 逐个 ReplicaSet 进行伸缩
			if _, _, err := dc.scaleReplicaSet(ctx, rs, nameToSize[rs.Name], deployment, scalingOperation); err != nil {
				return err
			}
		}
	}
	return nil
}

Deployment 滚动更新

Deployment 的滚动更新操作由 DeploymentController.rolloutRolling 方法完成,内部通过不断对新的 ReplicaSet 进行扩容, 同时对旧的 ReplicaSet 进行缩容,此消彼长间,直到达到预期的状态。

Deployment 滚动更新

func (dc *DeploymentController) rolloutRolling(ctx context.Context, ...) error {
    // 获取所有新的 ReplicaSet 列表和旧的 ReplicaSet 列表
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, true)

    // 所有 ReplicaSet 集合
	allRSs := append(oldRSs, newRS)
	
	// 扩容
	scaledUp, err := dc.reconcileNewReplicaSet(ctx, allRSs, newRS, d)

	if scaledUp {
		// 扩容完成
		// 同步状态后直接返回
		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
	}

	// 如果无法扩容了,说明此时 Pod 的副本数量 >= 目标数量
	// 那么就应该执行缩容操作了
	scaledDown, err := dc.reconcileOldReplicaSets(ctx, allRSs, controller.FilterActiveReplicaSets(oldRSs), newRS, d)

	if scaledDown {
		// 缩容完成
		// 同步状态后直接返回
		return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
	}
	
	// 清理历史 ReplicaSet 对象
	if deploymentutil.DeploymentComplete(d, &d.Status) {
		if err := dc.cleanupDeployment(ctx, oldRSs, d); err != nil {
			return err
		}
	}

	// 滚动升级完毕
	// 同步最终状态
	return dc.syncRolloutStatus(ctx, allRSs, newRS, d)
}

DeploymentController.reconcileNewReplicaSet 方法用于执行新的 ReplicaSet 的扩容操作。

func (dc *DeploymentController) reconcileNewReplicaSet(ctx context.Context, ...) (bool, error) {
	// 如果 ReplicaSet 的副本数量已经达到期望数量
	// 就不需要扩容了,直接返回即可
	if *(newRS.Spec.Replicas) == *(deployment.Spec.Replicas) {
		return false, nil
	}
	// 如果 ReplicaSet 的副本数量大于期望数量
	// 就需要进行缩容操作了
	if *(newRS.Spec.Replicas) > *(deployment.Spec.Replicas) {
		scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, *(deployment.Spec.Replicas), deployment)
		return scaled, err
	}
	
	// 到这里执行的才是扩容操作
	// 计算需要扩容的副本数量
	newReplicasCount, err := deploymentutil.NewRSNewReplicas(deployment, allRSs, newRS)

	// 同步 ReplicaSet 相关字段
	// 同步之后,剩下的操作就交给 ReplicaSet 控制器了
	scaled, _, err := dc.scaleReplicaSetAndRecordEvent(ctx, newRS, newReplicasCount, deployment)
	return scaled, err
}

DeploymentController.reconcileOldReplicaSets 方法用于执行旧的 ReplicaSet 的缩容操作。

func (dc *DeploymentController) reconcileOldReplicaSets(ctx context.Context, ...) (bool, error) {
	// 获取旧的 Pod 数量
	oldPodsCount := deploymentutil.GetReplicaCountForReplicaSets(oldRSs)
	if oldPodsCount == 0 {
		// 如果旧的 Pod 数量为 0,说明就不再需要缩容了
		// 缩容的最小数量为 0
		return false, nil
	}

	// 如果满足以下两个条件中的任意一个,就可以执行缩容操作
	//   1. 旧的 ReplicaSet 中存在不健康的副本,可以直接缩容 (删除) 掉,同时不会增加不可用性
	//   2. 新的 ReplicaSet 比例已经扩大,此时就可以进一步缩容旧的 ReplicaSet 了

	// 最大缩容 Pod 副本数量 = 所有 Pod 数量 - 最大不可用数量 - 新启动的 Pod 不可用数量
	
	// 不仅仅要考虑 最大不可用数量 和 已经创建的 Pod 数量,还要考虑新启动的 Pod 不可用数量
	// 这样新启动的 Pod 不可用数量,就会被计算在内,从而不会导致在下一轮缩容操作中
	//    继续缩减旧的 ReplicaSet 副本,引发不可用性的增高

    // 举个例子来说明缩容场景
	//   * 当前有 10 个副本
	//   * 滚动升级配置中最大不可用数量为 2 个
	//   * 新启动的副本为 3 个
	
	// 对于条件 1 来说:
	//   Deployment 更新之后,旧的 ReplicaSet 缩容到 8 个 Pod,同时新的 ReplicaSet 扩容到 5 个 Pod
	//   假设新的 ReplicaSet 发生故障并且一直不可用
	//   此时 Pod 总数量为 13, 最小可用数量为 8, ReplicaSet 的不可用 Pod 数量为 5
	//   节点故障导致其中一个 ReplicaSet 不可用,但是,13 - 8 - 5 = 0, 所以旧的 ReplicaSet 不能缩容
	//   操作人员 (开发或者运维) 发现故障情况,并且执行回滚操作
	//   新的 ReplicaSet 最大不可用数量为 1, 因此最大可缩容数量 = 13 - 8 - 1 = 4,
	//   最后 Pod 的总数量为 9, 随着时间变化,新的 ReplicaSet 最终会将 Pod 数量扩容到 10
	
	// 对于条件 2 来说:
	//   还是以刚才的场景接着说
	//   ...
	//   操作人员 (开发或者运维) 发现故障情况,但是不执行回滚操作,而是直接更新 Pod 模板
    //   这是新的 ReplicaSet 必须从 0 开始创建新的 Pod 了,因为此时已经有 13 个 Pod 了
	//   但是,新的 ReplicaSet 副本不可用数量同样也是 0, 因此 2 个旧的 ReplicaSet 的副本数量将会缩容 5 个,从 13 个缩减到 8 个
	//   这样新的 新的 ReplicaSet 副本就可以直接扩容到 5 个

	// 获取所有 Pod 数量
	allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs)
    // 获取 Pod 的最多不可用数量
    maxUnavailable := deploymentutil.MaxUnavailable(*deployment)
    // 计算副本最小可用量
	minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
	// 计算新的 ReplicaSet 副本不可用数量
	newRSUnavailablePodCount := *(newRS.Spec.Replicas) - newRS.Status.AvailableReplicas
	// 计算副本最大缩容数量
	maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount
	// 如果最大缩容数量小于等于 0,说明此时不需要缩容
	// 直接退出即可
	if maxScaledDown <= 0 {
		return false, nil
	}

	// 清理不可用的 ReplicaSet
	oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(ctx, ...)

	// 缩容旧的 ReplicaSet, 需要检测最大不可用数量以确保可以正常缩容
	allRSs = append(oldRSs, newRS)
	scaledDownCount, err := dc.scaleDownOldReplicaSetsForRollingUpdate(ctx, ...)
	
	// 本次已缩容的数量 = 清理的数量 + 缩容的数量
	totalScaledDown := cleanupCount + scaledDownCount
	return totalScaledDown > 0, nil
}

Deployment 删除

DeploymentController.syncStatusOnly 方法用于更新 Deployment 的状态,最终真正的删除操作是由 GarbageCollector GC 控制器来完成的。

func (dc *DeploymentController) syncStatusOnly(ctx context.Context, ...) error {
	// 获取所有新的 ReplicaSet 列表和旧的 ReplicaSet 列表
	newRS, oldRSs, err := dc.getAllReplicaSetsAndSyncRevision(ctx, d, rsList, false)
	
	allRSs := append(oldRSs, newRS)
	// 内部调用 syncDeploymentStatus 方法
	return dc.syncDeploymentStatus(ctx, allRSs, newRS, d)
}

DeploymentController.syncDeploymentStatus 方法用于检测 Deployment 的状态是否为最新的,并在必要时进行同步。

func (dc *DeploymentController) syncDeploymentStatus(ctx context.Context, ...) error {
	newStatus := calculateStatus(allRSs, newRS, d)

	newDeployment := d
	newDeployment.Status = newStatus
	_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
	return err
}

小结

Deployment 控制器执行流程图

扩展阅读

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。