蛮荆

Kubernetes EndPoint 设计与实现

2023-12-08

概述

EndPoint 的概念和作用在 这篇文章 中已经介绍过了,本文不再赘述,这里使用一张图来回顾一下 Service, EndPoint, Pod 三者之间的关系。

Service, EndPoint, Pod 关系图

EndpointSubset

EndpointSubset 是一组具有公共端口集的地址,扩展的端点集是 Addresses (Pod IP 地址) 和 Ports (Service 名称和端口号) 的笛卡尔积。

下面是一个典型的 EndpointSubset 示例:

  Name: "test",
  Subsets: [
    {
      Addresses: [
        {
          "ip": "10.10.1.1"
        },
        {
          "ip": "10.10.2.2"
        }
      ],
      Ports: [
        {
          "name": "a",
          "port": 8675
        },
        {
          "name": "b",
          "port": 309
        }
      ]
    }
]

将上面的 Subset 转换为对应的端点集合:

a: [ 10.10.1.1:8675, 10.10.2.2:8675 ]
b: [ 10.10.1.1:309, 10.10.2.2:309 ]

源码说明

本文着重从源代码的角度分析一下 EndPoint 的实现原理,EndPoint 功能对应的源代码位于 Kubernetes 项目的 pkg/controller/endpoint/ 目录,本文以 Kubernetes v1.28 版本源代码进行分析。

EndPoint 源代码目录

流程图

EndPoint 控制器执行流程图

下面我们跟着流程图一起看下源代码的具体实现。


EndPointController

首先来看看 Endpoints 控制器对象,该对象是实现 Endpoints 功能的核心对象。

// Controller manages selector-based service endpoints.
type Controller struct {
    // Service 列表
	serviceLister corelisters.ServiceLister

    // Pod 列表
	podLister corelisters.PodLister

	// EndPoint 列表
	endpointsLister corelisters.EndpointsLister

	// 队列中存储发生了变化 (需要同步) 的 Service
	queue workqueue.RateLimitingInterface
}

初始化

NewEndpointController 方法用于 EndPoint 控制器对象的初始化工作,并返回一个实例化对象,控制器对象同时订阅了 Service, Pod, EndPoint 三种资源的变更事件。

func NewEndpointController(podInformer ...) *Controller {
	e := &Controller{
        ...
	}

	// 增加 Service informer 监听回调方法
	serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        ...
	})

    // 增加 Pod informer 监听回调方法
	podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        ...
	})

    // 增加 EndPoint informer 监听回调方法
	endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		...
	})
	
	...

	return e
}

启动控制器

根据控制器的初始化方法 NewEndpointController 的调用链路,可以找到控制器开始启动和执行的地方。

// cmd/kube-controller-manager/app/core.go

func startEndpointsController(ctx context.Context, ...) (controller.Interface, bool, error) {
    // 启动一个单独的 goroutine 来完成 {初始化 && 运行}
	go endpointcontroller.NewEndpointController(
		...
	).Run(ctx, int(controllerContext.ComponentConfig.EndpointController.ConcurrentEndpointSyncs))
	
	return nil, true, nil
}

具体逻辑方法

Controller.Run 方法执行具体的初始化逻辑。

func (e *Controller) Run(ctx context.Context, workers int) {
	...
	
    // (根据参数配置) 启动多个 goroutine 处理逻辑 (默认为 5 个)
	for i := 0; i < workers; i++ {
		go wait.UntilWithContext(ctx, e.worker, e.workerLoopPeriod)
	}
	
	<-ctx.Done()
}

Controller.worker 方法本质上就是一个无限循环轮询器,不断从队列中取出 EndPoint 对象,然后进行对应的操作。

func (e *Controller) worker(ctx context.Context) {
	// 内部调用 processNextWorkItem 方法
	for e.processNextWorkItem(ctx) {
	}
}
func (e *Controller) processNextWorkItem(ctx context.Context) bool {
	// 从队列获取 EndPoint 对象
	eKey, quit := e.queue.Get()

	...
	
	// 调用 syncService 方法同步
	err := e.syncService(ctx, eKey.(string))

	return true
}

EndPoint 同步

Controller 的回调处理方法是 syncService 方法,该方法是 EndPoint 控制器操作的核心方法,通过方法的命名,可以知道 EndPoint 主要关注的对象是 Service。

func (e *Controller) syncService(ctx context.Context, key string) error {
	// 通过 key 解析出 Service 对象对应的 命名空间和名称
	namespace, name, err := cache.SplitMetaNamespaceKey(key)

    // 获取 Service 对象
	service, err := e.serviceLister.Services(namespace).Get(name)

    // Service 类型为 ExternalName
	// 直接返回
	if service.Spec.Type == v1.ServiceTypeExternalName {
		return nil
	}
    // Service 的标签选择器为 nil
    // 这种情况下关联不到 EndPoint 对象
	// 直接返回
	if service.Spec.Selector == nil {
		return nil
	}

	// 获取 Service 的标签选择器关联的 Pod 列表
	pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
	
	// 初始化端点集合对象
	subsets := []v1.EndpointSubset{}
	// 初始化已就绪的 EndPoint 对象计数
	var totalReadyEps int
	// 初始化未就绪的 EndPoint 对象计数
	var totalNotReadyEps int

	// 遍历 Pod 列表
	for _, pod := range pods {
		// 如果 Pod 的 IP 未分配
		//   或者 
		// Pod 的状态已经终止或者正在终止中... 
		if !endpointsliceutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
			continue
		}

		// 实例化一个 EndPointAddress 对象
		// 例如
		// {
		//   "ip": "10.10.1.1"
        // }
		ep, err := podToEndpointAddressForService(service, pod)

		epa := *ep
		
		if len(service.Spec.Ports) == 0 {
			if service.Spec.ClusterIP == api.ClusterIPNone {
				// Headless 模式下,ClusterIP 的值设置为 "None" 
				// 构建一个新的对象追加到端点集合里
				subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(logger, subsets, pod, ...)
			}
		} else {
			// 遍历 Service 端口对象
			for i := range service.Spec.Ports {
				servicePort := &service.Spec.Ports[i]
				portNum, err := podutil.FindPort(pod, servicePort)

				// 根据 Service 端口对象 + 端口号构建一个对象
				epp := endpointPortFromServicePort(servicePort, portNum)
				
				// 将构建好的对象追加到端点集合里
				var readyEps, notReadyEps int
				subsets, readyEps, notReadyEps = addEndpointSubset(logger, subsets, pod, ...)
				
				// 累加已就绪的 EndPoint 对象计数
				totalReadyEps = totalReadyEps + readyEps
				// 累加未就绪的 EndPoint 对象计数
				totalNotReadyEps = totalNotReadyEps + notReadyEps
			}
		}
	}
	
	// 计算并确定最后的 EndPoint 对象集合 (新的 EndPoint Set)
	subsets = endpoints.RepackSubsets(subsets)
	
	// 通过 informer 获取 Service 对象对应的 EndPoint Set
	// 也就是当前的 EndPoint Set (旧的 EndPoint Set)
	currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)

	// 如果 Service 的资源版本号未设置,就需要创建新的 EndPoints 
	createEndpoints := len(currentEndpoints.ResourceVersion) == 0

	// 对新的和旧的 EndPoint Set进行排序 + 比较操作
    // 如果新的 Set 和旧的 Set 比较之后,没有任何差异
    //   并且 Service 的版本号也不需要创建
    // 直接返回就可以了
	if !createEndpoints && ...) {
		return nil
	}
	
	// 深度拷贝当前的 EndPoint Set
	newEndpoints := currentEndpoints.DeepCopy()
	
	// 重新设置相关的 (最新) 属性
	newEndpoints.Subsets = subsets
	newEndpoints.Labels = service.Labels
	
	...
	
	if createEndpoints {
        // 创建新的 EndPoints
		_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
	} else {
        // 更新已有 EndPoints
		_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
	}
	
    ...
}

通过 Controller.syncService 方法的源代码,我们可以看到: EndPoint 对象每次同步时,都会执行如下的操作:

  1. 根据参数 key 获取指定的 Service 对象
  2. 获取 Service 对象的标签选择器关联的 Pod 列表
  3. 通过 Service 和 Pod 列表计算出最新的 EndPoint 对象 (新) 集合
  4. 通过 informer 获取 Service 对象对应的 EndPoint 对象 (旧) 集合
  5. 如果新集合与旧集合对比,没有任何差异,说明不需要更新,直接退出方法即可
  6. 根据 Service 资源版本号确定 EndPoints 对象的操作 (创建或更新) 并执行

小结

EndPoint 控制器核心作用是订阅 Service 和 Pod 两类资源的变更事件,计算两者的映射关系集合并记录,最后对计算后的映射关系集合创建/更新响应的 EndPoints 资源。

EndPoint 控制器执行流程图

Reference

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。