蛮荆

Kubernetes Pod 设计与实现 - Pause 容器

2023-10-26

Pause 容器

Pod 是 Kubernetes 中应用运行的最小单位 (同时也是顶级资源),也称为容器组,单个 Pod 可以看作是一个逻辑独立的主机,拥有自己的 IP、主机名称、进程等。

每个节点上面可以运行多个 Pod, 每个 Pod 中可以运行多个容器,那么单个 Pod 中的容器是如何共享同一个 IPC、Network 等资源的呢? 这就需要使用到本文的主角: Pause 容器。

Pause 容器是一个基础容器,顾名思义 (该容器进程会一直处于 “暂停” 状态),主要作用是将相关容器集中到一起,并通过让内部容器加入其命名空间,最终形成逻辑上的容器组。 其次,在启用共享 PID 命名空间的情况下,它作为每个 Pod 中第一个启动的进程 (init 进程) 接收并处理僵尸进程 (僵尸进程最坏情况下会耗尽内存)。

当 Pod 中的容器重启之后,需要位于和重启之前相同的命名空间中,因为容器的生命周期和 Pod 进行了绑定,所以 Pause 容器可以使这个过程变得非常快捷方便。 也就是说,同一个 Pod 中的所有容器看到的 Namespace 视图是一样的。

Pause 容器和 Pod 的生命周期是一样的,Pause 容器从 Pod 创建调度后开始运行 (Pod 中第一个启动的容器),一直到 Pod 被删除后退出,如果在这期间 Pod 被驱逐或关闭,Kubernetes 会重新创建 Pod 并自动运行 Pause 容器

示例

Pause 示例

上面的图中展示了一个典型的 Pod 结构,其中包含了 3 个容器:

  1. Pause 容器,负责构建 Pod 的 Namespace 基础
  2. nginx 容器,加入到 Pause 容器的 Namespace 中,共享资源
  3. ghost 容器,加入到 Pause 容器的 Namespace 中,共享资源

源码实现

Pause 官方容器使用 C 语言编写,Github 地址为: https://github.com/kubernetes/kubernetes/tree/master/build/pause, 源代码只有短短十几行,本文以 Linux 实现为例学习 Pause 容器的实现。

Pause 仓库目录

linux 目录下面只有两个文件, pause.c, orphan.c, 其中 orphan.c 文件中的代码只是模拟了一个僵尸进程用于测试,这里不做分析。

pause.c

Pause 容器的核心实现代码:

// 引入各种头文件

// 信号注册处理函数 (SIGINT, SIGTERM)
// 这种属于正常退出情况,所以退出码为 0
static void sigdown(int signo) {
  psignal(signo, "Shutting down, got signal");
  exit(0);
}

// 信号注册处理函数 (SIGCHLD)
// 由操作系统发送给父进程,通知子进程的状态变化
//   子进程终止:  
//   子进程暂停或恢复运行: 
static void sigreap(int signo) {
  // 等待指定的子进程退出
  
  // 函数原型:
  //   pid_t waitpid(pid_t pid, int *status, int options);
  
  // 参数说明:
  //   -1: 任意子进程
  //   NULL: 子进程的退出状态无需存储
  //   WNOHANG: 非阻塞,没有找到子进程时直接返回 0
  while (waitpid(-1, NULL, WNOHANG) > 0)
    ;
  // 如果子进程的数量大于 0, 无限循环
}

int main(int argc, char **argv) {
  // 打印版本号 ...

  if (getpid() != 1)
    // 如果 Pause 进程 ID 不等于 1, 输出警告信息
    
    // 因为进程 ID 等于 1 是 init 进程,负责接收处理僵尸进程
    // 所以如果 Pause 进程 ID 不等于 1, 可能产生僵尸进程的堆积
    fprintf(stderr, "Warning: pause should be the first process\n");

  // 注册 SIGINT 信号 (Ctrl + C)
  if (sigaction(SIGINT, &(struct sigaction){.sa_handler = sigdown}, NULL) < 0)
    return 1;
  // 注册 SIGTERM 信号 (例如执行 kill, systemctl stop 等命令)
  if (sigaction(SIGTERM, &(struct sigaction){.sa_handler = sigdown}, NULL) < 0)
    return 2;
  // 注册 SIGCHLD 信号
  if (sigaction(SIGCHLD, &(struct sigaction){.sa_handler = sigreap,
                                             .sa_flags = SA_NOCLDSTOP},
                NULL) < 0)
    return 3;

  // 无限循环
  for (;;)
    // pause 函数可以使当前进程陷入睡眠状态,避免浪费 CPU 资源
    // 直到捕获到一个信号后被唤醒
    pause();
  
  // 为什么退出码是 42 ? 因为作者比较任性 ?
  // 感兴趣的读者可以看看回答
  // https://stackoverflow.com/questions/16236182/what-is-the-origin-of-magic-number-42-indispensable-in-coding
  return 42;
}

通过上面的源代码可以看到,Pause 容器的本质就是一个独立的进程,该进程作为容器内第一个进程启动之后变身为父进程,后续启动的进程都会成为该进程的子进程, 父进程通过信号量的变化来执行对应的操作。Pause 容器唯一的作用是 保证即使 Pod 中没有任何容器运行也不会被删除,因为这时候还有 Pause 容器在运行。


Kubernetes Pod 启动流程

对 Pause 容器 (进程) 有了基础认识之后,我们可以看看 Kubernetes 中的 Pod 是如何通过 Pause 进程来管理子容器 (进程) 的, Pod 中的容器可以大致分类为:

  1. 临时容器 (一般用于 debug)
  2. init 容器
  3. 业务容器

Pod 创建的主干代码位于 kubeGenericRuntimeManager 对象的 SyncPod 方法中,下面结合具体的代码做一个简单的注解。

// https://github.com/kubernetes/kubernetes/blob/f8a4e343a106a73145464e8de8a919d13b59d25a/pkg/kubelet/kuberuntime/kuberuntime_manager.go#L1053

// SyncPod 方法通过执行下列步骤使 Pod 运行并达到预期的状态
//
//  1. 计算 Pause 容器和其他容器的状态变化
//  2. 清理 Pause 容器
//  3. 清理还在运行中,但是状态不合理的容器 (也就是早应该退出的容器)
//  4. 创建 Pause 容器
//  5. 创建临时容器
//  6. 创建 init 容器
//  7. 调整运行容器的状态
//  8. 创建具体的业务容器
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, ...) (result kubecontainer.PodSyncResult) {
	// Step 1: 计算 Pause 容器和其他容器的状态变化
	podContainerChanges := m.computePodActions(ctx, pod, podStatus)
	
	...

	// Step 2: 如果 Pause 容器发生了变化,结束 Pod
	if podContainerChanges.KillPod {
		...
		
	} else {
		// Step 3: 结束 Pod 内所有不需要运行的容器
		for containerID, containerInfo := range podContainerChanges.ContainersToKill {
            ...
		}
	}
	
	// 默认使用参数 Pod 状态对象的 IP 地址作为 Pod 的 IP
	var podIPs []string
	if podStatus != nil {
		podIPs = podStatus.IPs
	}

	// Step 4: 创建 Pause 容器
	podSandboxID := podContainerChanges.SandboxID
	if podContainerChanges.CreateSandbox {
		podSandboxID, msg, err = m.createPodSandbox(ctx, pod, podContainerChanges.Attempt)
		...
	}
	
	...

	// 针对各种类型容器的通用启动函数
	// 参数说明:
	//   typeName: 容器类型 (例如临时容器,init 容器等)
	//   metricLabel: 容器的标签,主要用于 metric 采集
	start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {
        ...
		
		// 启动容器
		// 真正启动容器的方法
		if msg, err := m.startContainer(ctx, ...); err != nil {
			...
		}

		return nil
	}

	// Step 5: 创建临时容器
	for _, idx := range podContainerChanges.EphemeralContainersToStart {
		start(ctx, "ephemeral container", ...)
	}

	if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) {
		// Step 6: 创建单个 init 容器
		if container := podContainerChanges.NextInitContainerToStart; container != nil {
			if err := start(ctx, "init container", ...); err != nil {
				...
			}
		}
	} else {
		// Step 6: 创建多个 init 容器
		for _, idx := range podContainerChanges.InitContainersToStart {
			container := &pod.Spec.InitContainers[idx]
			if err := start(ctx, "init container", ...); err != nil {
                ...
			}
		}
	}

	// Step 7: 调整运行容器的状态
	if isInPlacePodVerticalScalingAllowed(pod) {
        ...
	}

	// Step 8: 创建具体的业务容器
	for _, idx := range podContainerChanges.ContainersToStart {
		start(ctx, "container", ...)
	}

	return
}

上面的 SyncPod 方法主要完成了 Pod 的创建和 Pod 内各种容器的创建,从方法内部的调用可以看到,真正启动容器的操作实现在 startContainer 方法中:

// startContainer 方法启动容器并返回执行结果
// 主要分为如下几个步骤:
//   1. 拉取镜像
//   2. 创建容器
//   3. 启动容器
//   4. 执行生命周期事件钩子函数 (容器启动后函数)
func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, ...) (string, error) {
	container := spec.container

	// Step 1: 拉取镜像
	imageRef, msg, err := m.imagePuller.EnsureImageExists(ctx, ...)

	...

	// Step 2: 创建容器
	// 新容器的重启次数为 0 
	// 旧容器的重启次数递增
	restartCount := 0
	containerStatus := podStatus.FindContainerStatusByName(container.Name)
	if containerStatus != nil {
		restartCount = containerStatus.RestartCount + 1
	} else {
        ...
	}

	...
	
	// 为容器设置资源限制
	
	...

	// 生成容器配置
	containerConfig, cleanupAction, err := m.generateContainerConfig(ctx, ...)

	...
	
	// Step 3: 启动容器
	err = m.runtimeService.StartContainer(ctx, containerID)

	...
	
	// 将容器日志文件通过软链接 连接到遗留的历史容器日志位置
	if _, err := m.osInterface.Stat(containerLog); !os.IsNotExist(err) {
        ...
	}

	// Step 4: 执行生命周期事件钩子函数 (容器启动后函数)
	if container.Lifecycle != nil && container.Lifecycle.PostStart != nil {
		msg, handlerErr := m.runner.Run(ctx, ...)
		if handlerErr != nil {
			// 如果钩子函数执行失败,杀死容器
			if err := m.killContainer(ctx, ...); err != nil {
			    ...	
			}
			return msg, ErrPostStartHook
		}
	}

	return "", nil
}

到这里,容器的创建和启动代码就分析完了。

Reference

扩展阅读

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。