Kubernetes EndPoint 设计与实现
2023-12-08 Cloud Native Kubernetes 读代码
概述
EndPoint 的概念和作用在 这篇文章 中已经介绍过了,本文不再赘述,这里使用一张图来回顾一下 Service, EndPoint, Pod 三者之间的关系。
EndpointSubset
EndpointSubset 是一组具有公共端口集的地址,扩展的端点集是 Addresses (Pod IP 地址) 和 Ports (Service 名称和端口号) 的笛卡尔积。
下面是一个典型的 EndpointSubset 示例:
Name: "test",
Subsets: [
{
Addresses: [
{
"ip": "10.10.1.1"
},
{
"ip": "10.10.2.2"
}
],
Ports: [
{
"name": "a",
"port": 8675
},
{
"name": "b",
"port": 309
}
]
}
]
将上面的 Subset 转换为对应的端点集合:
a: [ 10.10.1.1:8675, 10.10.2.2:8675 ]
b: [ 10.10.1.1:309, 10.10.2.2:309 ]
源码说明
本文着重从源代码的角度分析一下 EndPoint 的实现原理,EndPoint 功能对应的源代码位于 Kubernetes 项目的 pkg/controller/endpoint/
目录,本文以 Kubernetes v1.28
版本源代码进行分析。
流程图
下面我们跟着流程图一起看下源代码的具体实现。
EndPointController
首先来看看 Endpoints
控制器对象,该对象是实现 Endpoints 功能的核心对象。
// Controller manages selector-based service endpoints.
type Controller struct {
// Service 列表
serviceLister corelisters.ServiceLister
// Pod 列表
podLister corelisters.PodLister
// EndPoint 列表
endpointsLister corelisters.EndpointsLister
// 队列中存储发生了变化 (需要同步) 的 Service
queue workqueue.RateLimitingInterface
}
初始化
NewEndpointController
方法用于 EndPoint
控制器对象的初始化工作,并返回一个实例化对象,控制器对象同时订阅了 Service, Pod, EndPoint 三种资源的变更事件。
func NewEndpointController(podInformer ...) *Controller {
e := &Controller{
...
}
// 增加 Service informer 监听回调方法
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
...
})
// 增加 Pod informer 监听回调方法
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
...
})
// 增加 EndPoint informer 监听回调方法
endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
...
})
...
return e
}
启动控制器
根据控制器的初始化方法 NewEndpointController
的调用链路,可以找到控制器开始启动和执行的地方。
// cmd/kube-controller-manager/app/core.go
func startEndpointsController(ctx context.Context, ...) (controller.Interface, bool, error) {
// 启动一个单独的 goroutine 来完成 {初始化 && 运行}
go endpointcontroller.NewEndpointController(
...
).Run(ctx, int(controllerContext.ComponentConfig.EndpointController.ConcurrentEndpointSyncs))
return nil, true, nil
}
具体逻辑方法
Controller.Run
方法执行具体的初始化逻辑。
func (e *Controller) Run(ctx context.Context, workers int) {
...
// (根据参数配置) 启动多个 goroutine 处理逻辑 (默认为 5 个)
for i := 0; i < workers; i++ {
go wait.UntilWithContext(ctx, e.worker, e.workerLoopPeriod)
}
<-ctx.Done()
}
Controller.worker
方法本质上就是一个无限循环轮询器,不断从队列中取出 EndPoint
对象,然后进行对应的操作。
func (e *Controller) worker(ctx context.Context) {
// 内部调用 processNextWorkItem 方法
for e.processNextWorkItem(ctx) {
}
}
func (e *Controller) processNextWorkItem(ctx context.Context) bool {
// 从队列获取 EndPoint 对象
eKey, quit := e.queue.Get()
...
// 调用 syncService 方法同步
err := e.syncService(ctx, eKey.(string))
return true
}
EndPoint 同步
Controller
的回调处理方法是 syncService
方法,该方法是 EndPoint
控制器操作的核心方法,通过方法的命名,可以知道 EndPoint 主要关注的对象是 Service。
func (e *Controller) syncService(ctx context.Context, key string) error {
// 通过 key 解析出 Service 对象对应的 命名空间和名称
namespace, name, err := cache.SplitMetaNamespaceKey(key)
// 获取 Service 对象
service, err := e.serviceLister.Services(namespace).Get(name)
// Service 类型为 ExternalName
// 直接返回
if service.Spec.Type == v1.ServiceTypeExternalName {
return nil
}
// Service 的标签选择器为 nil
// 这种情况下关联不到 EndPoint 对象
// 直接返回
if service.Spec.Selector == nil {
return nil
}
// 获取 Service 的标签选择器关联的 Pod 列表
pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated())
// 初始化端点集合对象
subsets := []v1.EndpointSubset{}
// 初始化已就绪的 EndPoint 对象计数
var totalReadyEps int
// 初始化未就绪的 EndPoint 对象计数
var totalNotReadyEps int
// 遍历 Pod 列表
for _, pod := range pods {
// 如果 Pod 的 IP 未分配
// 或者
// Pod 的状态已经终止或者正在终止中...
if !endpointsliceutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
continue
}
// 实例化一个 EndPointAddress 对象
// 例如
// {
// "ip": "10.10.1.1"
// }
ep, err := podToEndpointAddressForService(service, pod)
epa := *ep
if len(service.Spec.Ports) == 0 {
if service.Spec.ClusterIP == api.ClusterIPNone {
// Headless 模式下,ClusterIP 的值设置为 "None"
// 构建一个新的对象追加到端点集合里
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(logger, subsets, pod, ...)
}
} else {
// 遍历 Service 端口对象
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
portNum, err := podutil.FindPort(pod, servicePort)
// 根据 Service 端口对象 + 端口号构建一个对象
epp := endpointPortFromServicePort(servicePort, portNum)
// 将构建好的对象追加到端点集合里
var readyEps, notReadyEps int
subsets, readyEps, notReadyEps = addEndpointSubset(logger, subsets, pod, ...)
// 累加已就绪的 EndPoint 对象计数
totalReadyEps = totalReadyEps + readyEps
// 累加未就绪的 EndPoint 对象计数
totalNotReadyEps = totalNotReadyEps + notReadyEps
}
}
}
// 计算并确定最后的 EndPoint 对象集合 (新的 EndPoint Set)
subsets = endpoints.RepackSubsets(subsets)
// 通过 informer 获取 Service 对象对应的 EndPoint Set
// 也就是当前的 EndPoint Set (旧的 EndPoint Set)
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
// 如果 Service 的资源版本号未设置,就需要创建新的 EndPoints
createEndpoints := len(currentEndpoints.ResourceVersion) == 0
// 对新的和旧的 EndPoint Set进行排序 + 比较操作
// 如果新的 Set 和旧的 Set 比较之后,没有任何差异
// 并且 Service 的版本号也不需要创建
// 直接返回就可以了
if !createEndpoints && ...) {
return nil
}
// 深度拷贝当前的 EndPoint Set
newEndpoints := currentEndpoints.DeepCopy()
// 重新设置相关的 (最新) 属性
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
...
if createEndpoints {
// 创建新的 EndPoints
_, err = e.client.CoreV1().Endpoints(service.Namespace).Create(ctx, newEndpoints, metav1.CreateOptions{})
} else {
// 更新已有 EndPoints
_, err = e.client.CoreV1().Endpoints(service.Namespace).Update(ctx, newEndpoints, metav1.UpdateOptions{})
}
...
}
通过 Controller.syncService
方法的源代码,我们可以看到: EndPoint
对象每次同步时,都会执行如下的操作:
- 根据参数 key 获取指定的 Service 对象
- 获取 Service 对象的标签选择器关联的 Pod 列表
- 通过 Service 和 Pod 列表计算出最新的 EndPoint 对象 (新) 集合
- 通过 informer 获取 Service 对象对应的 EndPoint 对象 (旧) 集合
- 如果新集合与旧集合对比,没有任何差异,说明不需要更新,直接退出方法即可
- 根据 Service 资源版本号确定 EndPoints 对象的操作 (创建或更新) 并执行
小结
EndPoint 控制器核心作用是订阅 Service 和 Pod 两类资源的变更事件,计算两者的映射关系集合并记录,最后对计算后的映射关系集合创建/更新响应的 EndPoints 资源。