Kubernetes CronJob 设计与实现
2024-01-03 Cloud Native Kubernetes 读代码
概述
CronJob 和 Linux 中的 cron 定时任务机制一样,用于定时任务的编排和运行。
CronJob 的使用方法和最佳实践在 这篇文章 中已经介绍过了,这里不再赘述,本文着重从源代码的角度分析一下 CronJob 的实现原理。
示例
# 官方示例 controllers/job/cronjob.yaml
apiVersion: batch/v1
kind: CronJob
metadata:
name: hello
spec:
schedule: "* * * * *"
jobTemplate:
spec:
template:
spec:
containers:
- name: hello
image: busybox:1.28
imagePullPolicy: IfNotPresent
command:
- /bin/sh
- -c
- date
restartPolicy: OnFailure
基于 YAML 文件创建 CronJob:
$ kubectl apply -f https://k8s.io/examples/application/job/cronjob.yaml
在 Kubernetes 中执行上面的代码后,会创建一个对应的定时任务 CronJob 对象,该对象有一个关联的定时任务 hello
, 该任务每分钟执行一次,输出当前时间。
源码说明
本文着重从源代码的角度分析一下 CronJob 的实现原理,CronJob 功能对应的源代码位于 Kubernetes 项目的 pkg/controller/cronjob/
目录,本文以 Kubernetes v1.28
版本源代码进行分析。
流程图
下面我们跟着流程图一起看下源代码的具体实现。
CronJobController
Controller
控制器对象是实现 CronJob
功能的核心对象,但是和其他几个常用的控制器命名方式不同,这里使用了一个类似兼容性的命名。
type ControllerV2 struct {
// 队列中存储发生了变化 (需要同步) 的 CronJob 对象
queue workqueue.RateLimitingInterface
// kubelet 客户端对象
// 用于执行各项类似 "kubectl ..." 操作
kubeClient clientset.Interface
// Job 列表
jobLister batchv1listers.JobLister
// CronJob 列表
cronJobLister batchv1listers.CronJobLister
}
初始化
NewControllerV2
方法用于 CronJob 控制器对象的初始化工作,并返回一个实例化对象。
func NewControllerV2(ctx context.Context, ...) (*ControllerV2, error) {
jm := &ControllerV2{
...
}
// 增加 Job informer 监听回调方法
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
...
})
// 增加 CronJob informer 监听回调方法
cronJobsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
...
})
return jm, nil
}
启动控制器
根据控制器的初始化方法 NewControllerV2
的调用链路,可以找到控制器开始启动和执行的地方。
// cmd/kube-controller-manager/app/batch.go
func startCronJobController(ctx context.Context, ...) (controller.Interface, bool, error) {
cj2c, err := cronjob.NewControllerV2(ctx, controllerContext.InformerFactory.Batch().V1().Jobs(),
...
)
// 启动一个单独的 goroutine 来完成控制器的 {初始化 && 运行}
go cj2c.Run(ctx, int(controllerContext.ComponentConfig.CronJobController.ConcurrentCronJobSyncs))
return nil, true, nil
}
ControllerV2.Run
方法执行控制器具体的启动逻辑。
func (jm *ControllerV2) Run(ctx context.Context, workers int) {
...
// (根据参数配置) 启动多个 goroutine 处理逻辑
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, jm.worker, time.Second)
}
<-ctx.Done()
}
Controller.worker
方法本质上就是一个无限循环轮询器,不断从队列中取出 CronJob
对象,然后进行对应的操作,内部无限循环调用 processNextWorkItem
方法。
func (jm *ControllerV2) worker(ctx context.Context) {
for jm.processNextWorkItem(ctx) {
}
}
func (jm *ControllerV2) processNextWorkItem(ctx context.Context) bool {
// 从队列获取 CronJob 对象
key, quit := jm.queue.Get()
...
// 和其他几个常用的控制器不同
// CronJob 控制器不是通过字段注入的方式设置同步的回调方法
// 而是直接调用 sync 方法进行同步
requeueAfter, err := jm.sync(ctx, key.(string))
switch {
case err != nil:
// 如果同步回调方法执行异常
// 将当前 CronJob 对象重新放入队列
jm.queue.AddRateLimited(key)
case requeueAfter != nil:
// 如果同步回调方法执行成功
// 并且
// CronJob 对象的下次执行时间不为空
// 第一步: 先将当前 CronJob 对象踢出队列
jm.queue.Forget(key)
// 第二步: 将当前 CronJob 对象延迟放入队列
jm.queue.AddAfter(key, *requeueAfter)
}
return true
}
CronJob 同步
ControllerV2
的回调处理方法是 sync
方法,该方法是所有 CronJob
对象同步操作的入口方法。
func (jm *ControllerV2) sync(ctx context.Context, cronJobKey string) (*time.Duration, error) {
// 通过 key 解析出 CronJob 对象对应的 命名空间和名称
ns, name, err := cache.SplitMetaNamespaceKey(cronJobKey)
// 获取 CronJob 对象
cronJob, err := jm.cronJobLister.CronJobs(ns).Get(name)
...
// 获取 CronJob 对象关联的 Job 对象列表
jobsToBeReconciled, err := jm.getJobsToBeReconciled(cronJob)
// 深度拷贝一个 CronJob 对象 (用户优化操作)
// 拷贝的对象用于计算源对象的所有更新操作
/ 并且仅在需要的时候执行一次更新
cronJobCopy := cronJob.DeepCopy()
// 删除完成的 Job
updateStatusAfterCleanup := jm.cleanupFinishedJobs(ctx, cronJobCopy, jobsToBeReconciled)
// 获取 CronJob 对象下次执行的时间和同步 (更新) 状态
requeueAfter, updateStatusAfterSync, syncErr := jm.syncCronJob(ctx, cronJobCopy, jobsToBeReconciled)
// 更新 CronJob 对象状态
if updateStatusAfterCleanup || updateStatusAfterSync {
if _, err := jm.cronJobControl.UpdateStatus(ctx, cronJobCopy); err != nil {
return ...
}
}
// 返回下次 CronJob 的下次执行时间
if requeueAfter != nil {
return requeueAfter, nil
}
return nil, syncErr
}
通过 ControllerV2.sync
方法的源代码,我们可以看到: CronJob 对象每次同步时,都会执行如下的操作:
- 根据参数 key 获取指定的 CronJob 对象
- 获取 CronJob 对象关联的 Job 对象列表
- 删除完成的 Job
- 调用 manageJob 方法执行具体的同步操作
- 更新 CronJob 对象状态
为什么 CronJob 对象要和 Job 对象进行关联呢?显而易见,CronJob 定时任务的具体执行是直接委托给 Job 来执行的。
执行同步
ControllerV2.syncCronJob
方法根据 CornJob 对象及其关联的 Job 对象执行同步操作,也就是负责具体干活的。
func (jm *ControllerV2) syncCronJob(ctx context.Context, cronJob *batchv1.CronJob, jobs []*batchv1.Job) (*time.Duration, bool, error) {
// 建立 Job Map: 用于快速查找 Job
childrenJobs := make(map[types.UID]bool)
// 遍历 Job 对象列表
for _, j := range jobs {
childrenJobs[j.ObjectMeta.UID] = true
found := inActiveList(cronJob, j.ObjectMeta.UID)
// 如果 Job 不存在于 CronJob 的运行 Job 列表中
// 并且 Job 还未完成
if !found && !IsJobFinished(j) {
// 获取 CronJob 对象 (新)
cjCopy, err := jm.cronJobControl.GetCronJob(ctx, cronJob.Namespace, cronJob.Name)
// 如果此时的 Job 存在于 CronJob 对象的运行 Job 列表中
// 那就使用新的 CronJob 对象替换掉旧的 CronJob 对象
if inActiveList(cjCopy, j.ObjectMeta.UID) {
cronJob = cjCopy
continue
}
} else if found && IsJobFinished(j) {
// 如果 Job 存在于 CronJob 的运行 Job 列表中
// 并且 Job 也已经完成
_, status := getFinishedStatus(j)
// 从 CronJob 对象的运行 Job 列表中 列表中删除 Job
deleteFromActiveList(cronJob, j.ObjectMeta.UID)
} else if IsJobFinished(j) {
// 如果 Job 已经完成
// 更细 CronJob 和 Job 的最后完成时间
if cronJob.Status.LastSuccessfulTime == nil {
cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime
updateStatus = true
}
if j.Status.CompletionTime != nil && j.Status.CompletionTime.After(cronJob.Status.LastSuccessfulTime.Time) {
cronJob.Status.LastSuccessfulTime = j.Status.CompletionTime
updateStatus = true
}
}
}
// 遍历 CronJob 的运行 Job 列表
// 如果对应的 Job 已经不复存在,就直接删除
// 避免因为已经过期的 Job 附属关系而导致 CronJob 一直处于运行状态
for _, j := range cronJob.Status.Active {
_, found := childrenJobs[j.UID]
if found {
continue
}
// 直接尝试从 Api-Sever 中获取 Job (绕过缓存)
// 避免 informer 的更新不及时而导致查询不到 Job
_, err := jm.jobControl.GetJob(j.Namespace, j.Name)
switch {
case errors.IsNotFound(err):
// Job 存在于 CronJob 的运行 Job 列表中
// 但是无法查询到
// 此时直接从 CronJob 对象的运行 Job 列表中 列表中删除 Job
// 如果 Job 还未过期,就调度一个新的 Pod 重新执行
deleteFromActiveList(cronJob, j.UID)
updateStatus = true
case err != nil:
return nil, updateStatus, err
}
}
// CronJob 对象已经被删除了,直接退出即可
if cronJob.DeletionTimestamp != nil {
return nil, updateStatus, nil
}
...
// 使用开源的 cron 库解析 CronJob 对象的定时表达式
// https://github.com/robfig/cron/v3
sched, err := cron.ParseStandard(formatSchedule(cronJob, jm.recorder))
// 计算任务下次执行的时间
scheduledTime, err := nextScheduleTime(logger, cronJob, ...)
// 如果 CronJob 设置了延迟的最后时间
// spec.StartingDeadlineSeconds 表示任务如果由于某种原因错过了调度时间,
// 开始该任务的最后截止时间
// 则启动延迟策略,然后退出
tooLate := false
if cronJob.Spec.StartingDeadlineSeconds != nil {
tooLate = scheduledTime.Add(time.Second * time.Duration(*cronJob.Spec.StartingDeadlineSeconds)).Before(now)
}
if tooLate {
t := nextScheduleTimeDuration(cronJob, now, sched)
return t, updateStatus, nil
}
...
// ------------------------------------------------------------------ //
// CronJob 的并发行规则
// Allow(默认): 允许并发任务执行
// Forbid: 不允许并发任务执行;如果新 Job 的执行时间到了,
// 但是老 Job 没有执行完,CronJob 会忽略新 Job 的执行
// Replace: 如果新 Job 的执行时间到了,但是老 Job 没有执行完,
// CronJob 会用新 Job 替换当前正在运行的老 Job
// 不允许并发执行 Job, 直接退出
if cronJob.Spec.ConcurrencyPolicy == batchv1.ForbidConcurrent && len(cronJob.Status.Active) > 0 {
...
return t, updateStatus, nil
}
// 从源代码可以看到,Replace 策略可能会引起资源浪费 (针对幂等型 Job)
// 例如一个老的 Job 还在运行中,并且执行完成度已经 90%
// 此时依然会直接删除 (终止) 该 Job, 等于这 90% 的任务白白浪费了
// 当然了,如果是批量型 Job, 例如每次处理 10W 个 MQ 事件
// 则不存在这个问题
if cronJob.Spec.ConcurrencyPolicy == batchv1.ReplaceConcurrent {
// 遍历 CronJob 的运行 Job 列表
// 然后逐个删除
for _, j := range cronJob.Status.Active {
// 获取当前的 Job 对象
job, err := jm.jobControl.GetJob(j.Namespace, j.Name)
// 删除当前的 Job 对象
if !deleteJob(logger, ...) {
return nil, updateStatus, ...
}
updateStatus = true
}
}
// Job 运行状态
jobAlreadyExists := false
// 获取 Job 创建模板
jobReq, err := getJobFromTemplate2(cronJob, *scheduledTime)
// 创建 Job
jobResp, err := jm.jobControl.CreateJob(cronJob.Namespace, jobReq)
...
// 创建 CronJob 和 Job 的附属关联关系
jobRef, err := getRef(jobResp)
// 更新 CronJob 的运行 Job 列表和最后执行时间
cronJob.Status.Active = append(cronJob.Status.Active, *jobRef)
cronJob.Status.LastScheduleTime = &metav1.Time{Time: *scheduledTime}
updateStatus = true
// 计算 CronJob 的下次执行时间
t := nextScheduleTimeDuration(cronJob, now, sched)
return t, updateStatus, nil
}
通过 ControllerV2.syncCronJob
方法的源代码,我们可以看到其内部的执行同步流程如下:
- 获取参数 Job 列表中的每个元素的最新状态并建立 Job Map
- 遍历 CronJob 的运行 Job 列表 (List) 并和 Job Map 中对应的 Job 进行对比,根据 Job 的不同状态执行对应的操作
- 检测 CronJob 对象是否已经被删除了,如果被删除了,也就不需要所谓的同步操作了,直接退出即可
- 计算 CronJob 的下次执行时间,再根据 CronJob 的延迟执行策略设置执行对应的操作
- 根据 CronJob 的并发行规则执行对应的操作
- 创建 Job
- 更新 CronJob 的运行 Job 列表和最后执行时间
- 计算 CronJob 的下次执行时间并返回