蛮荆

Kubernetes 调度器队列 - 设计与实现

2024-01-23

概述

作为 Kubernetes 调度器的一部分,调度器队列 起着非常重要的作用,我们可以很容易联想到调度过程中的如下问题:

  • 当有一批 Pod 同时需要调度时,它们被调度的先后顺序是怎样的?
  • 当某些 Pod 调度失败时,会被存储到哪里?
  • 对于调度失败的 Pod, 它们的重试机制是怎样的?

为了解答上面的这些问题,就需要深入了解 调度器队列 的实现机制。

三个队列

调度器队列的内部实现需要依赖以下三个队列:

1. Active 队列

存放等待被调度的 Pod

内部实现是一个 Heap 堆结构,按照优先级从高到低排序,堆顶的 Pod 优先级最高,新创建的 Pod (.spec.nodeName 属性为空) 都会加入到这个队列中。

在每个调度周期中,调度器会从该队列中取出 Pod 并进行调度,如果调度失败了 (例如因为资源不足),Pod 就会被加入到 UnSchedulable 队列中。 如果 Pod 被成功调度,其会从队列中被删除

2. UnSchedulable 队列

存放无法被调度的 Pod

3. Backoff 队列

存放等待重试的 Pod,之所以单独设置一个队列,是为了避免处于等待状态的 Pod 不断重试,给调度器带来不必要的负载。

内部实现是一个 Heap 堆结构,按照重试等待时间从低到高排序,堆顶的 Pod 等待时间最少。单个 Pod 的的重试次数越多,该 Pod 重新进入 Active 队列所需要的时间就越长 (几乎所有重试机制都是这么设计的)。

重试算法采用的是 指数退避机制,默认情况下最小为 1 秒,最大为 10 秒,例如,重试 3 次的 Pod 下一次的重试等待时间为 2^3 = 8 秒。 为了避免 Pod 经常调度失败而频繁进入等待队列,应该配置合理的退避时间基数,降低系统负载。

此外,调度队列机制有两个在后台运行的 goroutine, 负责定期刷新 将 Pod 加入到 Active 队列,当某些事件 (例如节点添加或更新、现有 Pod 被删除等) 触发时, 调度程序会将 UnSchedulable 队列或 Backoff 队列中的 Pod 移动到 Active 队列,做好重新调度前的准备工作。

三个队列的交互

图片来源: https://github.com/kubernetes/community/blob/f03b6d5692bd979f07dd472e7b6836b2dad0fd9b/contributors/devel/sig-scheduling/scheduler_queues.md


源代码

调度器队列主要为单个调度周期中的调度器提供合适的需要被调度的 Pod, 此外,队列还需要实现延迟调度操作。

本文着重从源代码的角度分析一下 调度器 的实现原理,对应的源代码文件位于 Kubernetes 项目的 pkg/scheduler/internal/queue/scheduling_queue.go,本文以 Kubernetes v1.28 版本源代码进行分析。

Scheduler 队列源代码目录

调度队列接口

SchedulingQueue 表示调度队列接口,具体的实现由 PriorityQueue 对象完成。

type SchedulingQueue interface {
	// 添加 Pod 到队列  
	Add(pod *v1.Pod) error
	// 从队列中弹出一个 Pod
	Pop() (*framework.QueuedPodInfo, error)
	// 更新队列中的 Pod
	Update(oldPod, newPod *v1.Pod) error
	// 删除队列中的 Pod
	Delete(pod *v1.Pod) error

	...
	
	// 关闭队列
	// 主要作用就是通知异步 goroutine, 实现优雅退出
	Close()
	// 启动管理调度队列的 goroutine
	Run()
}

根据 PriorityQueue 对象的定义,我们可以看到上文中提到的三个队列,

type PriorityQueue struct {
	...
	
	// Pod 重试初始时间
	podInitialBackoffDuration time.Duration
	// Pod 重试最大时间
	podMaxBackoffDuration time.Duration
	
	// 三个队列
	activeQ *heap.Heap
	podBackoffQ *heap.Heap
	unschedulableQ *UnschedulablePodsMap

	// 表示队列是否已关闭
	closed bool
}

activeQ 队列和 backoffQ 队列使用的数据结构都是 heap.Heap, 顾名思义这是一个堆数据结构,该结构也是 Kubernetes 内部自定义实现的一个结构, 因为具体的实现和本文关系不大,这里不再赘述。

UnschedulablePodsMap 对象表示不可调度队列,对象内部也很简单,就是一个 Pod 哈希函数加一个 Map 数据结构。

type UnschedulablePodsMap struct {
	podInfoMap map[string]*framework.QueuedPodInfo
	keyFunc    func(*v1.Pod) string
}

队列初始化

NewPriorityQueue 方法初始化一个调度队列对象并返回。

func NewPriorityQueue(...) *PriorityQueue {
	...

	pq := &PriorityQueue{
		activeQ:        heap.NewWithRecorder(...),
		unschedulableQ: newUnschedulablePodsMap(...)),
	}
	
	pq.podBackoffQ = heap.NewWithRecorder(...)

	return pq
}

CURD 操作

在了解了调度队列的基础数据结构之后,下面来看下队列的几个主要 CURD 操作以及不同状态变化时的处理逻辑。

Add

PriorityQueue.Add 方法负责将 Pod 添加到 activeQ 队列中。

func (p *PriorityQueue) Add(pod *v1.Pod) error {
	pInfo := p.newQueuedPodInfo(pod)
	// 添加到 activeQ 队列
	if err := p.activeQ.Add(pInfo); err != nil {
		return err
	}
	// 从 unschedulableQ 队列中删除该 Pod
	if p.unschedulableQ.get(pod) != nil {
		p.unschedulableQ.delete(pod)
	}
	// 从 backoffQ 队列中删除该 Pod
	if err := p.podBackoffQ.Delete(pInfo); err == nil {
	}
	
	...

	return nil
}

Pop

PriorityQueue.Pop 方法负责将 activeQ 队列的堆顶元素取出并返回。

func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
	// 如果 activeQ 队列为空
	// 那么这里就变为阻塞操作
	for p.activeQ.Len() == 0 {
		if p.closed {
			return nil, fmt.Errorf(queueClosed)
		}
		// 等待通知
		p.cond.Wait()
	}
	
	// 弹出堆顶元素并返回
	obj, err := p.activeQ.Pop()
	pInfo := obj.(*framework.QueuedPodInfo)
	
	...
	
	return pInfo, err
}

Delete

PriorityQueue.Pop 方法负责将指定的 Pod 从队列中删除,内部的实现会分别尝试从三个队列中逐个删除。

func (p *PriorityQueue) Delete(pod *v1.Pod) error {
	// 首先尝试从 activeQ 队列中删除
	err := p.activeQ.Delete(newQueuedPodInfoNoTimestamp(pod))
	if err != nil {
		// 如果发生报错,再依次尝试从 backoffQ 队列和 unschedulableQ 队列中删除
		p.podBackoffQ.Delete(newQueuedPodInfoNoTimestamp(pod))
		p.unschedulableQ.delete(pod)
	}
	return nil
}

Update

PriorityQueue.Update 方法负责更新队列中的 Pod, 内部实现会分别尝试从 activeQ 和 backoffQ 队列中取出对应的 Pod 然后更新, 如果这两个队列中没有对应的 Pod, 继续尝试从 unschedulable 队列中删除 Pod, 并将更新后的 Pod 加入到 activeQ 队列中。

func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {
	if oldPod != nil {
		oldPodInfo := newQueuedPodInfoNoTimestamp(oldPod)
		// 如果 Pod 存在于 activeQ 队列中,直接更新即可
		if oldPodInfo, exists, _ := p.activeQ.Get(oldPodInfo); exists {
			err := p.activeQ.Update(updatePod(oldPodInfo, newPod))
			return err
		}

		// 如果 Pod 存在于 backoffQ 队列中
		// 先从队列中删除旧的 Pod,然后再将新的 Pod 加入到队列中
		if oldPodInfo, exists, _ := p.podBackoffQ.Get(oldPodInfo); exists {
			p.podBackoffQ.Delete(oldPodInfo)
			err := p.activeQ.Add(updatePod(oldPodInfo, newPod))
			if err == nil {
				p.cond.Broadcast()
			}
			return err
		}
	}

	// 如果 Pod 存在于 unschedulableQ 队列中
	if usPodInfo := p.unschedulableQ.get(newPod); usPodInfo != nil {
		p.PodNominator.UpdateNominatedPod(oldPod, newPod)
		if isPodUpdated(oldPod, newPod) {
			// 先从 unschedulableQ 队列中删除
			p.unschedulableQ.delete(usPodInfo.Pod)
			// 如果 Pod 更新之后可以被调度,就将其放入 activeQ 队列中
			err := p.activeQ.Add(updatePod(usPodInfo, newPod))
			if err == nil {
				p.cond.Broadcast()
			}
			return err
		}
		return nil
	}
	
	// 如果 Pod 不存在于任何一个队列中
	// 直接加入到 activeQ 队列中
	err := p.activeQ.Add(p.newQueuedPodInfo(newPod))

	return err
}

状态相关操作

异步操作

上文中提到,当某些事件触发时,调度队列机制有两个后台 goroutine 负责将 Pod 加入到 Active 队列,该动作由 PriorityQueue.Run 方法完成。

func (p *PriorityQueue) Run() {
	// 将 backoffQ 队列中的已经完成重试的 Pod 移动到 activeQ 队列中
	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
	// 将 unSchedulableQ 队列中停留时间过长的 Pod 移动到 activeQ 队列中
	go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

移动 Pod

将 unschedulableQ 队列中的所有 Pod 根据状态移动到 activeQ 或 backoffQ 队列中,内部通过调用 movePodsToActiveOrBackoffQueue 方法来实现具体的操作。

func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) {
	unschedulablePods := make([]*framework.QueuedPodInfo, 0, len(p.unschedulableQ.podInfoMap))
	for _, pInfo := range p.unschedulableQ.podInfoMap {
		unschedulablePods = append(unschedulablePods, pInfo)
	}
	p.movePodsToActiveOrBackoffQueue(unschedulablePods, event)
}
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event string) {
	// 遍历 Pod 列表,逐个进行转移
	for _, pInfo := range podInfoList {
		pod := pInfo.Pod
		
		if p.isPodBackingoff(pInfo) {
			if err := p.podBackoffQ.Add(pInfo); err != nil {
				
			} else {
                // 如果 Pod 添加到 podBackoffQ 队列成功
                // 就从 unschedulableQ 队列中删除该 Pod
				p.unschedulableQ.delete(pod)
			}
		} else {
			if err := p.activeQ.Add(pInfo); err != nil {
				
			} else {
				// 如果 Pod 添加到 activeQ 队列成功
				// 就从 unschedulableQ 队列中删除该 Pod 
				p.unschedulableQ.delete(pod)
			}
		}
	}
	
	p.cond.Broadcast()
}

重试

PriorityQueue.getBackoffTime 方法用于计算指定的 Pod 下次进行重试的时间。

func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {
	duration := p.calculateBackoffDuration(podInfo)
	backoffTime := podInfo.Timestamp.Add(duration)
	return backoffTime
}

PriorityQueue.calculateBackoffDuration 方法用于计算指定的 Pod 下次重试时的退避时间,内部实现就是简单的指数退避算法。

func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {
	duration := p.podInitialBackoffDuration
	// 从 1 到 Pod 已经重试的次数
	// 作用相当于 math.Pow() 
	for i := 1; i < podInfo.Attempts; i++ {
		duration = duration * 2
		if duration > p.podMaxBackoffDuration {
			return p.podMaxBackoffDuration
		}
	}
	return duration
}

比较两个 Pod

PriorityQueue.podsCompareBackoffCompleted 方法根据下次重试时间来比较两个 Pod, 这样就可以确定 Pod 放入队列 (堆结构) 时的位置。

func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {
	pInfo1 := podInfo1.(*framework.QueuedPodInfo)
	pInfo2 := podInfo2.(*framework.QueuedPodInfo)
	bo1 := p.getBackoffTime(pInfo1)
	bo2 := p.getBackoffTime(pInfo2)
	return bo1.Before(bo2)
}

小结

三个队列状态流程图

通过三个队列的相互配合,调度器队列实现了调度过程中 Pod 获取、 Pod 状态变化、Pod 存储等重要功能,在分析源代码的过程中,我们又遇到了 队列堆数据结构指数退避算法 等基础知识, 即使如 Kubernetes 这般庞然大物,其内部也就是由基础知识一点一滴搭建起来的,学习优秀的开源项目本身也是个复习巩固的过程,同时也希望读者能常读常新。

Reference

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。