蛮荆

Kubernetes CronJob 设计与实现

2024-01-03

概述

CronJob 和 Linux 中的 cron 定时任务机制一样,用于定时任务的编排和运行。

CronJob 的使用方法和最佳实践在 这篇文章 中已经介绍过了,这里不再赘述,本文着重从源代码的角度分析一下 CronJob 的实现原理。

示例

# 官方示例 controllers/job/cronjob.yaml

apiVersion: batch/v1
kind: CronJob
metadata:
  name: hello
spec:
  schedule: "* * * * *"
  jobTemplate:
    spec:
      template:
        spec:
          containers:
            - name: hello
              image: busybox:1.28
              imagePullPolicy: IfNotPresent
              command:
                - /bin/sh
                - -c
                - date
          restartPolicy: OnFailure

基于 YAML 文件创建 CronJob:

$ kubectl apply -f https://k8s.io/examples/application/job/cronjob.yaml

在 Kubernetes 中执行上面的代码后,会创建一个对应的定时任务 CronJob 对象,该对象有一个关联的定时任务 hello, 该任务每分钟执行一次,输出当前时间。

源码说明

本文着重从源代码的角度分析一下 CronJob 的实现原理,CronJob 功能对应的源代码位于 Kubernetes 项目的 pkg/controller/cronjob/ 目录,本文以 Kubernetes v1.28 版本源代码进行分析。

CronJob 源代码目录

流程图

CronJob 控制器执行流程图

下面我们跟着流程图一起看下源代码的具体实现。


CronJobController

Controller 控制器对象是实现 CronJob 功能的核心对象,但是和其他几个常用的控制器命名方式不同,这里使用了一个类似兼容性的命名。

type ControllerV2 struct {
	// 队列中存储发生了变化 (需要同步) 的 CronJob 对象
	queue workqueue.RateLimitingInterface

	// kubelet 客户端对象
	// 用于执行各项类似 "kubectl ..." 操作
	kubeClient  clientset.Interface

	// Job 列表
	jobLister     batchv1listers.JobLister
	// CronJob 列表
	cronJobLister batchv1listers.CronJobLister
}

初始化

NewControllerV2 方法用于 CronJob 控制器对象的初始化工作,并返回一个实例化对象。

func NewControllerV2(ctx context.Context, ...) (*ControllerV2, error) {
	jm := &ControllerV2{
        ...
	}

	// 增加 Job informer 监听回调方法
	jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		...
	})
	
	// 增加 CronJob informer 监听回调方法
	cronJobsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        ...
	})

	return jm, nil
}

启动控制器

根据控制器的初始化方法 NewControllerV2 的调用链路,可以找到控制器开始启动和执行的地方。

// cmd/kube-controller-manager/app/batch.go

func startCronJobController(ctx context.Context, ...) (controller.Interface, bool, error) {
	cj2c, err := cronjob.NewControllerV2(ctx, controllerContext.InformerFactory.Batch().V1().Jobs(),
		...
	)
    
    // 启动一个单独的 goroutine 来完成控制器的 {初始化 && 运行}
	go cj2c.Run(ctx, int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs))
	
	return nil, true, nil
}

ControllerV2.Run 方法执行控制器具体的启动逻辑。

func (jm *ControllerV2) Run(ctx context.Context, workers int) {
	...

	// (根据参数配置) 启动多个 goroutine 处理逻辑
	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, jm.worker, time.Second)
	}

	<-ctx.Done()
}

Controller.worker 方法本质上就是一个无限循环轮询器,不断从队列中取出 CronJob 对象,然后进行对应的操作,内部无限循环调用 processNextWorkItem 方法。

func (jm *ControllerV2) worker(ctx context.Context) {
	for jm.processNextWorkItem(ctx) {
	}
}
func (jm *ControllerV2) processNextWorkItem(ctx context.Context) bool {
	// 从队列获取 CronJob 对象
	key, quit := jm.queue.Get()
    
	...

	// 和其他几个常用的控制器不同
	// CronJob 控制器不是通过字段注入的方式设置同步的回调方法
	// 而是直接调用 sync 方法进行同步
	requeueAfter, err := jm.sync(ctx, key.(string))
	
	switch {
	case err != nil:
		// 如果同步回调方法执行异常
		// 将当前 CronJob 对象重新放入队列
		jm.queue.AddRateLimited(key)
	case requeueAfter != nil:
		// 如果同步回调方法执行成功
		//  并且 
		// CronJob 对象的下次执行时间不为空
		// 第一步: 先将当前 CronJob 对象踢出队列
		jm.queue.Forget(key)
		// 第二步: 将当前 CronJob 对象延迟放入队列
		jm.queue.AddAfter(key, *requeueAfter)
	}
	
	return true
}

CronJob 同步

ControllerV2 的回调处理方法是 sync 方法,该方法是所有 CronJob 对象同步操作的入口方法。

func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Duration, error) {
	// 通过 key 解析出 CronJob 对象对应的 命名空间和名称
	ns, name, err := cache.SplitMetaNamespaceKey(cronJobKey)

	// 获取 CronJob 对象
	cronJob, err := jm.cronJobLister.CronJobs(ns).Get(name)
	
	...

	// 获取 CronJob 对象关联的 Job 对象列表
	jobsToBeReconciled, err := jm.getJobsToBeReconciled(cronJob)

	// 深度拷贝一个 CronJob 对象 (用户优化操作)
	// 拷贝的对象用于计算源对象的所有更新操作
	/    并且仅在需要的时候执行一次更新
	cronJobCopy := cronJob.DeepCopy()

	// 删除完成的 Job
    updateStatusAfterCleanup := jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled)
	
	// 获取 CronJob 对象下次执行的时间和同步 (更新) 状态
	requeueAfter, updateStatusAfterSync, syncErr := jm.syncCronJob(ctx, cronJobCopy, jobsToBeReconciled)
	
    // 更新 CronJob 对象状态
	if updateStatusAfterCleanup || updateStatusAfterSync {
		if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
			return ...
		}
	}

	// 返回下次 CronJob 的下次执行时间
	if requeueAfter != nil {
		return requeueAfter, nil
	}
	
	return nil, syncErr
}

通过 ControllerV2.sync 方法的源代码,我们可以看到: CronJob 对象每次同步时,都会执行如下的操作:

  1. 根据参数 key 获取指定的 CronJob 对象
  2. 获取 CronJob 对象关联的 Job 对象列表
  3. 删除完成的 Job
  4. 调用 manageJob 方法执行具体的同步操作
  5. 更新 CronJob 对象状态

为什么 CronJob 对象要和 Job 对象进行关联呢?显而易见,CronJob 定时任务的具体执行是直接委托给 Job 来执行的。

执行同步

ControllerV2.syncCronJob 方法根据 CornJob 对象及其关联的 Job 对象执行同步操作,也就是负责具体干活的。

func (jm *ControllerV2) syncCronJob(ctx context.Context, cronJob *batchv1.CronJob, jobs []*batchv1.Job) (*time.Duration, bool, error) {
	// 建立 Job Map: 用于快速查找 Job
	childrenJobs := make(map[types.UID]bool)
	
	// 遍历 Job 对象列表
	for _, j := range jobs {
		childrenJobs[j.ObjectMeta.UID] = true
		found := inActiveList(cronJob, j.ObjectMeta.UID)
		
		// 如果 Job 不存在于 CronJob 的运行 Job 列表中
		// 并且 Job 还未完成
		if !found && !IsJobFinished(j) {
			// 获取 CronJob 对象 (新)
			cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cronJob.Namespace, cronJob.Name)

			// 如果此时的 Job 存在于 CronJob 对象的运行 Job 列表中
			// 那就使用新的 CronJob 对象替换掉旧的 CronJob 对象
			if inActiveList(cjCopy, j.ObjectMeta.UID) {
				cronJob = cjCopy
				continue
			}
		} else if found && IsJobFinished(j) {
			// 如果 Job 存在于 CronJob 的运行 Job 列表中
			// 并且 Job 也已经完成
			
			_, status := getFinishedStatus(j)
			// 从 CronJob 对象的运行 Job 列表中 列表中删除 Job 
			deleteFromActiveList(cronJob, j.ObjectMeta.UID)
		} else if IsJobFinished(j) {
			// 如果 Job 已经完成
			// 更细 CronJob 和 Job 的最后完成时间
			if cronJob.Status.LastSuccessfulTime == nil {
				cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime
				updateStatus = true
			}
			if j.Status.CompletionTime != nil && j.Status.CompletionTime.After(cronJob.Status.LastSuccessfulTime.Time) {
				cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime
				updateStatus = true
			}
		}
	}

	// 遍历 CronJob 的运行 Job 列表
	// 如果对应的 Job 已经不复存在,就直接删除
	// 避免因为已经过期的 Job 附属关系而导致 CronJob 一直处于运行状态
	for _, j := range cronJob.Status.Active {
		_, found := childrenJobs[j.UID]
		if found {
			continue
		}
		
		// 直接尝试从 Api-Sever 中获取 Job (绕过缓存)
		// 避免 informer 的更新不及时而导致查询不到 Job
		_, err := jm.jobControl.GetJob(j.Namespace, j.Name)
		switch {
		case errors.IsNotFound(err):
			// Job 存在于 CronJob 的运行 Job 列表中
			// 但是无法查询到
			// 此时直接从 CronJob 对象的运行 Job 列表中 列表中删除 Job 
			//   如果 Job 还未过期,就调度一个新的 Pod 重新执行
			deleteFromActiveList(cronJob, j.UID)
			updateStatus = true
		case err != nil:
			return nil, updateStatus, err
		}
	}

	// CronJob 对象已经被删除了,直接退出即可
	if cronJob.DeletionTimestamp != nil {
		return nil, updateStatus, nil
	}
	
	...
	
	// 使用开源的 cron 库解析 CronJob 对象的定时表达式
	// https://github.com/robfig/cron/v3
	sched, err := cron.ParseStandard(formatSchedule(cronJob, jm.recorder))
	
    // 计算任务下次执行的时间
	scheduledTime, err := nextScheduleTime(logger, cronJob, ...)

	// 如果 CronJob 设置了延迟的最后时间
	//   spec.StartingDeadlineSeconds 表示任务如果由于某种原因错过了调度时间,
	//   开始该任务的最后截止时间
	// 则启动延迟策略,然后退出
	tooLate := false
	if cronJob.Spec.StartingDeadlineSeconds != nil {
		tooLate = scheduledTime.Add(time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds)).Before(now)
	}
	if tooLate {
		t := nextScheduleTimeDuration(cronJob, now, sched)
		return t, updateStatus, nil
	}
	
	...
	
	// ------------------------------------------------------------------ //
	// CronJob 的并发行规则
	// Allow(默认): 允许并发任务执行
	// Forbid:  不允许并发任务执行;如果新 Job 的执行时间到了,
	//          但是老 Job 没有执行完,CronJob 会忽略新 Job 的执行
    // Replace: 如果新 Job 的执行时间到了,但是老 Job 没有执行完,
	//          CronJob 会用新 Job 替换当前正在运行的老 Job
	
	// 不允许并发执行 Job, 直接退出
	if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cronJob.Status.Active) > 0 {
		...
		
		return t, updateStatus, nil
	}
	
	// 从源代码可以看到,Replace 策略可能会引起资源浪费 (针对幂等型 Job)
	// 例如一个老的 Job 还在运行中,并且执行完成度已经 90%
	//   此时依然会直接删除 (终止) 该 Job, 等于这 90% 的任务白白浪费了
	
	// 当然了,如果是批量型 Job, 例如每次处理 10W 个 MQ 事件
	// 则不存在这个问题
	if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
		// 遍历 CronJob 的运行 Job 列表
		// 然后逐个删除
		for _, j := range cronJob.Status.Active {
			// 获取当前的 Job 对象
			job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
			
			// 删除当前的 Job 对象 
			if !deleteJob(logger, ...) {
				return nil, updateStatus, ...
			}
			updateStatus = true
		}
	}

	// Job 运行状态
	jobAlreadyExists := false
	// 获取 Job 创建模板
	jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
	// 创建 Job 
	jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq)

	...
	
	// 创建 CronJob 和 Job 的附属关联关系
	jobRef, err := getRef(jobResp)

	// 更新 CronJob 的运行 Job 列表和最后执行时间
	cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
	cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
	updateStatus = true

	// 计算 CronJob 的下次执行时间
	t := nextScheduleTimeDuration(cronJob, now, sched)
	return t, updateStatus, nil
}

通过 ControllerV2.syncCronJob 方法的源代码,我们可以看到其内部的执行同步流程如下:

  1. 获取参数 Job 列表中的每个元素的最新状态并建立 Job Map
  2. 遍历 CronJob 的运行 Job 列表 (List) 并和 Job Map 中对应的 Job 进行对比,根据 Job 的不同状态执行对应的操作
  3. 检测 CronJob 对象是否已经被删除了,如果被删除了,也就不需要所谓的同步操作了,直接退出即可
  4. 计算 CronJob 的下次执行时间,再根据 CronJob 的延迟执行策略设置执行对应的操作
  5. 根据 CronJob 的并发行规则执行对应的操作
  6. 创建 Job
  7. 更新 CronJob 的运行 Job 列表和最后执行时间
  8. 计算 CronJob 的下次执行时间并返回

小结

CronJob 控制器执行流程图

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。