Kubernetes Daemonset 设计与实现
2023-12-18 Cloud Native Kubernetes 读代码
概述
DaemonSet 确保全部节点或者部分节点 (取决于 Pod 的标签和亲和性、污点等特征) 上始终运行一个指定 Pod。
当有新的节点加入集群后,指定的 Pod 就会在新加入的节点上运行,当节点从集群中移除时,指定的 Pod 同样会被回收。
DaemonSet 的使用方法和最佳实践在 这篇文章 中已经介绍过了,这里不再赘述,本文着重从源代码的角度分析一下 DaemonSet 的实现原理。
示例
# 官方示例 controllers/daemonset.yaml
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluentd-elasticsearch
namespace: kube-system
labels:
k8s-app: fluentd-logging
spec:
selector:
matchLabels:
name: fluentd-elasticsearch
template:
metadata:
labels:
name: fluentd-elasticsearch
spec:
containers:
- name: fluentd-elasticsearch
image: k8s.gcr.io/fluentd-elasticsearch:1.20
volumeMounts:
- name: varlog
mountPath: /var/log
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
tolerations:
# 容忍度,这里省略,表示在所有节点上创建
volumes:
- name: varlog
hostPath:
path: /var/log
基于 YAML 文件创建 DaemonSet:
$ kubectl apply -f https://k8s.io/examples/controllers/daemonset.yaml
在 Kubernetes 中执行上面的代码后,会在 kube-system
命名空间下面创建 DaemonSet
资源,并在集群中的所有节点上创建包含 fluentd-elasticsearch
容器的 Pod。
假设集群中有 3 个节点,对应的拓扑结构如下:
源码说明
本文着重从源代码的角度分析一下 DaemonSet 的实现原理,DaemonSet 功能对应的源代码位于 Kubernetes 项目的 pkg/controller/daemon/
目录,本文以 Kubernetes v1.28
版本源代码进行分析。
流程图
下面我们跟着流程图一起看下源代码的具体实现。
DaemonSetsController
首先来看看 DaemonSetsController
控制器对象,该对象是实现 DaemonSet
功能的核心对象。
// DaemonSetsController 负责将 DaemonSet 对象对应的 Pod 调整到定义的期望状态
type DaemonSetsController struct {
// kubelet 客户端对象
// 用于执行各项类似 "kubectl ..." 操作
kubeClient clientset.Interface
// Pod 操作对象
// 用于对 Pod 进行各项操作,例如创建/删除 等
podControl controller.PodControlInterface
// 在创建/删除指定数量的副本之后,DaemonSet 将会临时挂起
// 在监听到指定事件后恢复正常
burstReplicas int
// 同步回调方法
// 同时方便在单元测试中注入 Mock
syncHandler func(ctx context.Context, dsKey string) error
// 缓存对象
// 记录每个 DaemonSet 需要创建/删除的 Pod
// 每轮同步过程中,对于 创建/删除 操作失败的 Pod 数量,都会记录起来
// 等到下一轮同步时继续执行相关的操作
// 直到 Pod 数量副本达到期望状态
expectations controller.ControllerExpectationsInterface
// DaemonSet 列表
dsLister appslisters.DaemonSetLister
dsStoreSynced cache.InformerSynced
// Pod 列表
podLister corelisters.PodLister
podStoreSynced cache.InformerSynced
// Node 列表
nodeLister corelisters.NodeLister
nodeStoreSynced cache.InformerSynced
// 队列中存储发生了变化 (需要同步) 的 DaemonSet
queue workqueue.RateLimitingInterface
}
初始化
NewDaemonSetsController
方法用于 DaemonSetsController
控制器对象的初始化工作,并返回一个实例化对象。
func NewDaemonSetsController(...) (*DaemonSetsController, error) {
...
dsc := &DaemonSetsController{
...
}
// 增加 DaemonSets informer 监听回调方法
daemonSetInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
...
})
...
// 增加 Pod informer 监听回调方法
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
...
})
...
// 增加 Node informer 监听回调方法
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
...
})
...
return dsc, nil
}
启动控制器
根据控制器的初始化方法 NewDaemonSetsController
的调用链路,可以找到控制器开始启动和执行的地方。
// cmd/kube-controller-manager/app/apps.go
func startDaemonSetController(ctx context.Context, ...) (...) {
// 初始化 DaemonSetController 对象实例
dsc, err := daemon.NewDaemonSetsController(
...
)
// 启动一个单独的 goroutine 来完成 {初始化 && 运行}
go dsc.Run(ctx, int(controllerContext.ComponentConfig.DaemonSetController.ConcurrentDaemonSetSyncs))
return nil, true, nil
}
具体逻辑方法
DaemonSetsController.Run
方法执行具体的初始化逻辑。
func (dsc *DaemonSetsController) Run(ctx context.Context, workers int) {
...
// (根据参数配置) 启动多个 goroutine 处理逻辑 (默认为 2 个)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, dsc.runWorker, time.Second)
}
<-ctx.Done()
}
DaemonSetsController.runWorker
方法本质上就是一个无限循环轮询器,不断从队列中取出 DaemonSet
对象,然后进行对应的操作,
内部无限循环调用 processNextWorkItem
方法。
func (dsc *DaemonSetsController) runWorker(ctx context.Context) {
for dsc.processNextWorkItem(ctx) {
}
}
DaemonSetsController.processNextWorkItem
方法的核心操作就是三板斧: 队列取对象 -> 对象同步 -> 对象放回队列。
func (dsc *DaemonSetsController) processNextWorkItem(ctx context.Context) bool {
// 从队列获取 DaemonSet
// 获取到的对象可以编码为一个字符串 key
dsKey, quit := dsc.queue.Get()
...
// 调用回调方法,默认也就是 syncDaemonSet 方法
err := dsc.syncHandler(ctx, dsKey.(string))
if err == nil {
// 如果执行同步的回调方法一切正常
// 将当前 DaemonSet 踢出队列
dsc.queue.Forget(dsKey)
return true
}
// 如果执行同步的回调方法返回 error
// 将当前 DaemonSet 重新放入队列
dsc.queue.AddRateLimited(dsKey)
return true
}
DaemonSet 同步
DaemonSetsController
的回调处理方法默认就是 syncDaemonSet
方法,该方法是所有 DaemonSet
同步操作的入口方法。
func (dsc *DaemonSetsController) syncDaemonSet(ctx context.Context, key string) error {
...
// 通过 key 解析出 DaemonSet 对象对应的 命名空间和名称
namespace, name, err := cache.SplitMetaNamespaceKey(key)
// 获取 DaemonSet 对象
ds, err := dsc.dsLister.DaemonSets(namespace).Get(name)
// 获取 Node 列表
nodeList, err := dsc.nodeLister.List(labels.Everything())
...
// 获取 DaemonSet 对象的字符串编码表示
dsKey, err := controller.KeyFunc(ds)
// 如果 DaemonSet 对象已经被标记为删除
// 直接返回
if ds.DeletionTimestamp != nil {
return nil
}
// 检测 DaemonSet 对象是否需要同步
if !dsc.expectations.SatisfiedExpectations(logger, dsKey) {
// 如果不需要同步,只更新状态即可
return dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, false)
}
// 同步 DaemonSet 对象
err = dsc.updateDaemonSet(ctx, ds, nodeList, hash, dsKey, old)
// 更新 DaemonSet 对象的状态
statusErr := dsc.updateDaemonSetStatus(ctx, ds, nodeList, hash, true)
...
}
通过 DaemonSetController.syncDaemonSet
方法的源代码,我们可以看到: DaemonSet 对象每次同步时,都会执行如下的操作:
- 根据参数 key 获取指定的 DaemonSet 对象
- 获取所有 Node 列表
- 检测 DaemonSet 对象是否需要同步
- 同步 DaemonSet
- 更新 DaemonSet 状态
其中,SatisfiedExpectations
方法在 ReplicaSet
实现原理一文中已经做过分析,本文不再赘述。
有了上面的这个大致的逻辑框架,接下来我们逐个分析对应的单个方法实现即可。
执行同步入口方法
DaemonSetsController.updateDaemonSet
方法是 DaemonSet
对象同步的入口方法,如果 DaemonSet
对象需要同步,就调用该方法执行具体操作。
func (dsc *DaemonSetsController) updateDaemonSet(ctx context.Context, ...) error {
// 调用 manage 方法执行同步
err := dsc.manage(ctx, ds, nodeList, hash)
// 如果 DaemonSet 对象需要同步
if dsc.expectations.SatisfiedExpectations(klog.FromContext(ctx), key) {
// 执行滚动更新
switch ds.Spec.UpdateStrategy.Type {
case apps.OnDeleteDaemonSetStrategyType:
case apps.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ctx, ds, nodeList, hash)
}
}
...
}
计算创建/删除 Pod 列表
DaemonSetsController.manage
方法用于将 DaemonSet
对象关联的 Pod
调度到指定的 Node
上面运行,在确定需要运行 Pod
列表和 Node
列表之后,
调用 syncNodes
方法执行具体的同步操作,将应该运行但是尚未运行的 Pod 启动运行,将正在运行但是状态异常 (不需要运行) 的 Pod 删除。
func (dsc *DaemonSetsController) manage(ctx context.Context, ...) error {
// 获取 Node 和 DaemonSet 关联的 Pod 之间的映射关系
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
// 对于每个 Node 来说
// 1. 遍历 Pod 列表
// 2. 如果当前 Pod 不应该运行在该节点上,执行删除
// 3. 如果当前 Pod 应该运行在该节点上,但是还未运行,那么启动该 Pod
var nodesNeedingDaemonPods, podsToDelete []string
for _, node := range nodeList {
// 计算当前节点需要创建的 Pod 列表和需要删除的 Pod 列表
nodesNeedingDaemonPodsOnNode, podsToDeleteOnNode := dsc.podsShouldBeOnNode(...)
// 更新需要创建的 Pod 列表
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodesNeedingDaemonPodsOnNode...)
// 更新需要删除的 Pod 列表
podsToDelete = append(podsToDelete, podsToDeleteOnNode...)
}
// 删除因为 {分配到不存在的 Node 上面} 而 {导致无法被调度的} Pod
// 如果 Node 已经不存在了,那么 Pod 将永远不会被调度
// 但是因为 Pod 和 Node 之间还有附属关系存在,所以无法被 GC 控制器自动回收
podsToDelete = append(podsToDelete, getUnscheduledPodsWithoutNode(nodeList, nodeToDaemonPods)...)
// 调用 syncNodes 方法执行同步操作
if err = dsc.syncNodes(ctx, ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
return err
}
return nil
}
执行同步操作
DaemonSetsController.syncNodes
方法用于执行同步操作,根据参数 DaemonSet
对象创建缺少的 Pod
, 同时删除多余的 Pod
。
func (dsc *DaemonSetsController) syncNodes(ctx context.Context, ds *apps.DaemonSet ...) error {
...
createDiff := len(nodesNeedingDaemonPods)
deleteDiff := len(podsToDelete)
// 修正单次创建操作最大数量
if createDiff > dsc.burstReplicas {
createDiff = dsc.burstReplicas
}
// 修正单次删除操作最大数量
if deleteDiff > dsc.burstReplicas {
deleteDiff = dsc.burstReplicas
}
// 更新 DaemonSet 关联的 Expectations 对象 (创建数量、删除数量)
dsc.expectations.SetExpectations(logger, dsKey, createDiff, deleteDiff)
// error channel: 接收 error + 信号量 合二为一
errCh := make(chan error, createDiff+deleteDiff)
// 并发创建控制 Wg 对象
createWait := sync.WaitGroup{}
// 获取创建 Pod 模板
template := util.CreatePodTemplate(ds.Spec.Template, generation, hash)
// 开始批量创建 Pod
// 每次创建成功后,下一轮创建的 Pod 数量以指数级进行增长 (1, 2, 4, 8 ...)
// 参考了 TCP 的 “慢启动” 方式
batchSize := integer.IntMin(createDiff, controller.SlowStartInitialBatchSize)
for pos := 0; createDiff > pos; batchSize, pos = integer.IntMin(2*batchSize, createDiff-(pos+batchSize)), pos+batchSize {
errorCount := len(errCh)
// 并发 goroutine 执行
createWait.Add(batchSize)
for i := pos; i < pos+batchSize; i++ {
go func(ix int) {
defer createWait.Done()
podTemplate := template.DeepCopy()
// 默认调度器将 Pod 和 Node 绑定时会更新 Pod 的亲和性规则
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(...)
// 执行创建 Pod 操作
err := dsc.podControl.CreatePods(ctx, ...)
if err != nil {
// 创建 Pod 失败后
// 更新 DaemonSet 关联的 Expectations 对象中的字段
dsc.expectations.CreationObserved(logger, dsKey)
// 将 error 发送到 channel
errCh <- err
}
}(i)
}
// 等待并发执行结束
createWait.Wait()
// 创建失败的 Pod 数量 = createDiff - 创建成功的 Pod 数量
// 创建失败的 Pod 会在下一轮同步中从 Expectations 取出继续创建
skippedPods := createDiff - (batchSize + pos)
...
}
// 并发删除控制 Wg 对象
deleteWait := sync.WaitGroup{}
// 并发 goroutine 执行
deleteWait.Add(deleteDiff)
for i := 0; i < deleteDiff; i++ {
go func(ix int) {
defer deleteWait.Done()
// 执行删除 Pod 操作
if err := dsc.podControl.DeletePod(ctx, ds.Namespace, podsToDelete[ix], ds); err != nil {
// 删除 Pod 失败后
// 更新 DaemonSet 关联的 Expectations 对象中的字段
dsc.expectations.DeletionObserved(logger, dsKey)
if !apierrors.IsNotFound(err) {
// 将 error 发送到 channel
errCh <- err
}
}
}(i)
}
// 等待并发执行结束
deleteWait.Wait()
// 将 error 打包返回
errors := []error{}
close(errCh)
for err := range errCh {
errors = append(errors, err)
}
return utilerrors.NewAggregate(errors)
}
创建 Pod
从上面的源代码可以看到,创建 Pod
由 RealPodControl.CreatePods
方法完成,该方法内部执行有两个调用链,第一步创建 Pod
和 Node
之间的附属关系,
第二步通过 KubeClient
创建 Pod
。
func (r RealPodControl) CreatePods(ctx context.Context, ...) error {
return r.CreatePodsWithGenerateName(ctx, ...)
}
第一步
func (r RealPodControl) CreatePodsWithGenerateName(ctx context.Context, ...) error {
// 创建 Pod 和 Node 的附属关系
pod, err := GetPodFromTemplate(template, controllerObject, controllerRef)
...
// 创建 Pod
return r.createPods(ctx, namespace, pod, controllerObject)
}
第二步
func (r RealPodControl) createPods(ctx context.Context, namespace string, pod *v1.Pod, object runtime.Object) error {
// 通过 KubeClient 创建 Pod
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
..
}
删除 Pod
删除 Pod 的调用链路相对简单,直接调用 DeletePod
方法删除即可。
func (r RealPodControl) DeletePod(ctx context.Context, ...) error {
...
// 通过 KubeClient 删除 Pod
if err := r.KubeClient.CoreV1().Pods(namespace).Delete(ctx, podID, metav1.DeleteOptions{}); err != nil {
...
}
...
}
滚动更新
DaemonSetsController.rollingUpdate
方法用于执行 DaemonSet
对象关联的 Pod
的滚动更新操作,内部流程大致如下:
- 获取 DaemonSet, Node, Pod 三者之间的映射关系
- 根据 DaemonSet 配置规格信息计算出执行 创建/删除 操作时的几个限制条件参数 (例如 Pod 最大不可用数量)
- 根据限制条件计算出需要 创建和删除 的 Pod、Pod 和 Node 之间的附属关系、需要删除的 Node 列表
- 调用 syncNodes 方法进行同步
从上面的流程描述中可以看到,rollingUpdate
只是获取并计算对应的状态,最后的执行操作还是交给 syncNodes
方法来完成,
为了提高阅读效率,这里直接将计算规则写在了下面的源代码中。
func (dsc *DaemonSetsController) rollingUpdate(ctx context.Context, ...) error {
// 获取 Node 和 DaemonSet 关联的 Pod 之间的映射关系
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ...)
// 计算 Pod 的升级中最大数量,最大不可用数量,期望调度数量
maxSurge, maxUnavailable, desiredNumberScheduled, err := dsc.updatedDesiredNodeCounts(ctx, ...)
// 如果 Pod 最大数量 maxSurge 未设置
// 在保证 Pod 最大不可用数量的前提下尽可能删除旧的 Pod
// 最终使 Pod 数量满足期望状态
// 这里对 DaemonSet 控制器同步逻辑做如下假设:
// 不允许每个节点上面超过一个 Pod
// 自动创建新的 Pod
// 自动处理操作失败的 Pod
// 删除 Pod 时,不超过最大不可用数量 maxUnavailable
// 同时 Pod 满足如下条件:
// 新创建的 Pod 的不可用数量必须低于 maxUnavailable
// 如果 Node 上面有运行的旧的 Pod, 则新一轮调度会尽可能不将 Pod 调度在该节点上面
if maxSurge == 0 {
...
// 遍历 Pod 列表
for nodeName, pods := range nodeToDaemonPods {
// 获取新的 Pod 列表和 旧的 Pod
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
...
switch {
case oldPod == nil && newPod == nil, oldPod != nil && newPod != nil:
// Pod 不可用
numUnavailable++
case newPod != nil:
if !podutil.IsPodAvailable(newPod, ...) {
// 虽然 Pod 是新创建的,但是依旧不可用
numUnavailable++
}
default:
// Pod 是旧的
switch {
case !podutil.IsPodAvailable(oldPod, ...):
// Pod 不可用,需要替换为新的
allowedReplacementPods = append(allowedReplacementPods, oldPod.Name)
case numUnavailable >= maxUnavailable:
// Pod 不可用数量超出限制了
continue
default:
// Pod 需要被删除
candidatePodsToDelete = append(candidatePodsToDelete, oldPod.Name)
}
}
}
...
// 执行同步操作,创建/删除 对应数量的 Pod
// 使 Pod 数量满足期望状态
return dsc.syncNodes(ctx, ...)
}
// ---------------------------------------------------------------------------------------------- //
// 如果 Pod 最大数量 maxSurge 已经设置
// 就将旧的不可用 Pod 删除,然后创建对应数量的新的 Pod
// 这里对 DaemonSet 控制器同步逻辑做如下假设:
// 不允许每个节点上面超过两个 Pod (最多一个新的 Pod, 一个旧的 Pod)
// 当节点上面不存在 Pod 时可以创建新的
// 自动处理操作失败的 Pod
// 删除 Pod 时,不超过最大不可用数量 maxUnavailable
// 同时 Pod 满足如下条件:
// 一个 Node 上面如果有不可用的 Pod, 那么就可以创建新的 Pod
// 一个 Node 上面如果新的 Pod 已经创建完成并且状态为可用,那么就可以把旧的 Pod 删除了
...
// 遍历 Pod 列表
for nodeName, pods := range nodeToDaemonPods {
// 获取新的 Pod 列表和 旧的 Pod
newPod, oldPod, ok := findUpdatedPodsOnNode(ds, pods, hash)
...
switch {
case oldPod == nil:
// Node 上面没有 Pod, 直接跳过
case newPod == nil:
switch {
case !podutil.IsPodAvailable(oldPod, ...):
...
// Node 上面没有任何运行的 Pod
// 那么该 Node 上面可以直接创建新的 Pod
allowedNewNodes = append(allowedNewNodes, nodeName)
default:
...
// Node 上面有一个运行的旧的 Pod
// 那么可以将该 Node 加入候选节点
candidateNewNodes = append(candidateNewNodes, nodeName)
}
default:
...
// Node 上面的 Pod 数量已经超过限制了
// 那么将该 Node 加入删除列表
// 也就不会有 Pod 被调度到该 Node 上面了
oldPodsToDelete = append(oldPodsToDelete, oldPod.Name)
}
}
...
// 执行同步操作,创建/删除 对应数量的 Pod
// 使 Pod 数量满足期望状态
return dsc.syncNodes(ctx, ds, oldPodsToDelete, newNodesToCreate, hash)
}
更新状态
DaemonSetsController.updateDaemonSetStatus
方法用于更新 DaemonSet
对象的相关状态字段,这其中包括:
- 期望被调度的 Node 数量
- 当前已调度的 Node 数量
- 正在非正常运行 Pod (已经运行 Pod, 但是不应该运行 Pod) 的 Node 数量
- 正在正常运行 Pod 的 Node 数量
- 已经完成滚动升级的 Node 数量
- 应该运行 Pod 并且可用的 Node 数量
func (dsc *DaemonSetsController) updateDaemonSetStatus(ctx context.Context, ...) error {
// 获取 Node 和 DaemonSet 关联的 Pod 之间的映射关系
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ctx, ds, false)
...
// 遍历 Node 列表
for _, node := range nodeList {
...
}
numberUnavailable := desiredNumberScheduled - numberAvailable
// 更新 DaemonSet 状态
err = storeDaemonSetStatus(ctx, ...)
// 当 Pod 的数量不满足期望状态时
// 将 DaemonSet 加入队列
if ds.Spec.MinReadySeconds > 0 && numberReady != numberAvailable {
dsc.enqueueDaemonSetAfter(ds, time.Duration(ds.Spec.MinReadySeconds)*time.Second)
}
return nil
}