蛮荆

Kubernetes Informer 设计与实现

2023-11-30

概述

Kubernetes 中的 Informer 是客户端组件库中的一种机制,用于监听 Kubernetes 集群中资源对象的变化,并将事件通知给控制器。 Informer 提供了一种简洁的方式来处理 API Server 发送的事件,并通过开放 API 使开发者能够实时获取资源对象的状态更新并进行对应的操作。

主要功能

  • 初始化时从 API Server 拉取全量资源并缓存
  • 同步缓存的资源对象的状态
  • 监听资源对象变化事件并执行对应的操作,如创建、更新、删除等
  • 开发者可以基于 Informer API 提供的资源对象变化进行交互,实现自定义的业务逻辑

应用场景

  • 控制器: Kubernetes 中各种控制器 (例如 DeploymentController, CronJobController) 通过 Informer 来监听资源对象变化
  • 调度器: Kubernetes 调度器通过 Informer 观察 Node 和 Pod 的状态变化
  • Operator: Operator 作为一种扩展机制,通过 Informer 来监听资源对象变化
  • 监控: 通过 Informer 来收集 Metric

源码说明

本文着重从源代码的角度分析一下 Informer 的实现原理,不同于其他 Kubernetes 组件,Informer 组件位于一个 单独的代码仓库, 本文以 v0.28.0 版本源代码进行分析。

流程图

Informer 流程图

下面我们跟着流程图一起看下源代码的具体实现。


Reflector

Reflector 对象负责从 API Server 中监听指定的资源,并将资源变更事件记录到 DeltaFIFO 队列 (下文会提到) 中。

type Reflector struct {
	...
	
	// 存储接口
	// Reflector 对象初始化时被赋值为 DeltaFIFO 队列
	store Store
	// 资源监听 & 拉取接口
	listerWatcher ListerWatcher
	
	// 每次请求的数据块大小
	// HTTP Response Header 中的 Transfer-Encoding: chunked
	WatchListPageSize int64

	// 和 API Server 之间使用 stream 流式传输数据 (提升传输效率,节省资源)
    UseWatchList bool
}

启动 Reflector

Reflector.Run 方法启动 Reflector, 开始监听并拉取资源,内部是个无限循环,直到收到参数 stopCh channel 的关闭信号。

func (r *Reflector) Run(stopCh <-chan struct{}) {
	wait.BackoffUntil(func() {
		if err := r.ListAndWatch(stopCh); err != nil {
			r.watchErrorHandler(r, err)
		}
	}, r.backoffManager, true, stopCh)
}

资源拉取&同步入口

Reflector.ListAndWatch 方法会先执行一次资源的全量拉取并记录资源的当前版本,然后使用资源版本去定时监听资源变化并进行必要的同步。

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
	var err error
	var w watch.Interface
	fallbackToList := !r.UseWatchList
	
	if r.UseWatchList {
		// 创建资源变更事件监听对象
		w, err = r.watchList(stopCh)
	}
	
	if fallbackToList {
		// 拉取资源
		err = r.list(stopCh)
	}

	resyncerrc := make(chan error, 1)
	cancelCh := make(chan struct{})
	defer close(cancelCh)
	
	// 启动一个 goroutine 定时同步
	go r.startResync(stopCh, cancelCh, resyncerrc)
	
	// 监听资源变化
	return r.watch(w, stopCh, resyncerrc)
}

资源拉取

Reflector.list 方法第一次获取全量资源后,记录资源的版本号并调用 syncWith 方法将事件写入到队列中。

当资源版本号被记录之后,后续只需要根据版本号获取增量的资源变更事件即可。

func (r *Reflector) list(stopCh <-chan struct{}) error {
	// 请求资源事件时,携带版本号
	options := metav1.ListOptions{ResourceVersion: r.relistResourceVersion()}
	
	listCh := make(chan struct{}, 1)
	panicCh := make(chan interface{}, 1)
	
	go func() {
		// 根据分页对象获取资源事件 “数据块”
		// 因为第一次获取数据时版本号是空的
		//   所以会获取所有的资源事件
		pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
			return r.listerWatcher.List(opts)
		}))
		
		
		switch {
		case r.WatchListPageSize != 0:
			// 设置单次请求的数据量大小
			pager.PageSize = r.WatchListPageSize
		case options.ResourceVersion != "" && options.ResourceVersion != "0":
			// 如果已经有版本号了,说明进入了增量同步阶段
			// 自然也就不需要分页了
			pager.PageSize = 0
		}
		
		// 请求获取数据
		list, paginatedResult, err = pager.ListWithAlloc(context.Background(), options)
		// 如果记录的版本号已经过期 或者 版本号错误
		// 重新发起请求 
		if isExpiredError(err) || isTooLargeResourceVersionError(err) {
			// 更新资源最新版本号可用状态为 {不可用}
			r.setIsLastSyncResourceVersionUnavailable(true)
			list, paginatedResult, err = pager.ListWithAlloc(context.Background(), ...)
		}
		close(listCh)
	}()
	
	...

    // 更新资源最新版本号可用状态为 {可用}
	r.setIsLastSyncResourceVersionUnavailable(false)
	
	listMetaInterface, err := meta.ListAccessor(list)
	resourceVersion = listMetaInterface.GetResourceVersion()
	
	// 解析资源事件
	items, err := meta.ExtractListWithAlloc(list)
	
	// 调用 syncWith 方法
	// 将资源事件写入到队列
	if err := r.syncWith(items, resourceVersion); err != nil {
		return fmt.Errorf("unable to sync list result: %v", err)
	}
	
	// 更新资源最新版本号
	r.setLastSyncResourceVersion(resourceVersion)
	
	return nil
}

Reflector.syncWith 方法会将给定的数据更新到 DeltaFIFO 队列 中,方法内部只是将参数数据组装为 interface{} 类型的列表,然后调用 DeltaFIFO 队列 (下文会提到) 的方法更新即可。

func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
	found := make([]interface{}, 0, len(items))
	for _, item := range items {
		found = append(found, item)
	}
	return r.store.Replace(found, resourceVersion)
}

创建监听对象

Reflector.watchList 方法会创建一个监听对象并返回,该对象会和 ApiService 之间建立一个数据流来同步资源变更事件,并调用 watchHandler 方法对变更事件进行对应的操作。

func (r *Reflector) watchList(stopCh <-chan struct{}) (watch.Interface, error) {
	// 创建资源监听对象失败时,检测是否可以跳过报错的回调函数
	isErrorRetriableWithSideEffectsFn := func(err error) bool {
		...
		if isExpiredError(err) || isTooLargeResourceVersionError(err) {
			// 更新资源最新版本号可用状态为 {不可用}
			r.setIsLastSyncResourceVersionUnavailable(true)
			return true
		}
		return false
	}
	
	for {
		select {
		case <-stopCh:
			return nil, nil
		default:
		}

		resourceVersion = ""
		// 请求资源事件时,携带版本号
		lastKnownRV := r.rewatchResourceVersion()
		temporaryStore = NewStore(DeletionHandlingMetaNamespaceKeyFunc)
		
		// 请求超时时间
		timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
		// 生成请求配置对象
		options := metav1.ListOptions{
			ResourceVersion:      lastKnownRV,
			AllowWatchBookmarks:  true,
			SendInitialEvents:    pointer.Bool(true),
			ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
			TimeoutSeconds:       &timeoutSeconds,
		}
		start := r.clock.Now()
		
		// 创建一个监听对象,获取发生变更的资源
		w, err = r.listerWatcher.Watch(options)
		if err != nil {
			// 如果创建资源监听对象发生了错误
			//   如果创建资源监听对象失败没有什么影响
			//     就直接进入到下一个循环
			//   否则就返回错误
			if isErrorRetriableWithSideEffectsFn(err) {
				continue
			}
			return nil, err
		}
		
		// 调用 watchHandler 方法监听增量事件,然后放入队列
		err = watchHandler(start, w, ...)
	}
	
	// 更新资源最新版本号可用状态为 {可用}
	r.setIsLastSyncResourceVersionUnavailable(false)

    // 更新资源最新版本号
	r.setLastSyncResourceVersion(resourceVersion)

	return w, nil
}

定时同步

Reflector.startResync 方法在一个独立的 goroutine 中定时调用 DeltaFIFO 队列 (下文会提到) 的 Resync 方法。

func (r *Reflector) startResync(stopCh <-chan struct{}, cancelCh <-chan struct{}, resyncerrc chan error) {
	resyncCh, cleanup := r.resyncChan()
	defer func() {
		cleanup() 
	}()
	for {
		select {
		case <-resyncCh:
		case <-stopCh:
			return
		case <-cancelCh:
			return
		}
		
		// 队列同步
		if r.ShouldResync == nil || r.ShouldResync() {
			if err := r.store.Resync(); err != nil {
				resyncerrc <- err
				return
			}
		}
		cleanup()
		resyncCh, cleanup = r.resyncChan()
	}
}

Reflector.watch 方法完成的工作很简单,就是不断轮询请求,然后调用 watchHandler 方法将资源变更事件放入队列,限于篇幅,方法内部的源代码就不做具体分析了。

func (r *Reflector) watch(w watch.Interface, stopCh <-chan struct{}, resyncerrc chan error) error {
	...
	
	for {
		...

		err = watchHandler(start, w, r.store, ...)

		...
	}
}

watchHandler 方法也就是监听的请求回调处理方法,负责从监听对象的请求结果中获取资源变更事件,然后针对 DeltaFIFO 队列 (下文会提到) 进行不同的操作, 同时会记录的资源的最新版本号。

func watchHandler(start time.Time, w watch.Interface, store Store, ...) error {
    ...

loop:
	for {
		select {
        ...
		
		// 从请求结果 channel 中获取事件数据
		case event, ok := <-w.ResultChan():
			...
			
			// 获取资源最新版本号
			resourceVersion := meta.GetResourceVersion()
			
			// 根据不同的事件执行不同的操作
			switch event.Type {
			case watch.Added:
				// 添加事件
				err := store.Add(event.Object)
			case watch.Modified:
				// 修改事件
				err := store.Update(event.Object)
			case watch.Deleted:
				// 删除事件
				err := store.Delete(event.Object)
				
            ...
				
			}
			
			// 更新资源最新版本号
			setLastSyncResourceVersion(resourceVersion)
    
			...
		}
	}
	
	...
	
	return nil
}

小结

Reflector 流程图

Reflector 启动运行之后,会先执行一次资源对象的全量拉取,通过调用 syncWith 方法将数据更新到 DeltaFIFO 队列 中,然后持续监听资源对象的增量事件 (创建、更新、删除等), 并且将事件去重之后更新到 DeltaFIFO 队列 中,最后 DeltaFIFO 队列 中的数据会被消费者取出,执行具体的业务逻辑操作 (细节见下文)。


资源相关数据类型

资源事件类型

const (
	Added   DeltaType = "Added"
	Updated DeltaType = "Updated"
	Deleted DeltaType = "Deleted"
	Replaced DeltaType = "Replaced"
	Sync DeltaType = "Sync"
)

资源事件对象

Delta 对象表示 {某个资源+变化事件} 的组合。

type Delta struct {
	Type   DeltaType
	Object interface{}
}

type Deltas []Delta

资源编码

MetaNamespaceKeyFunc 方法将资源对象编码为一个字符串表示,编码后的字符串表示格式为:

<namespace>/<name>

# 例如命名空间为 prod, Deployment 名称为 user_api, 编码后的字符串就表示为

prod/user_api

# 如果命名空为空,就以具体的资源名称为准
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
	if key, ok := obj.(ExplicitKey); ok {
		return string(key), nil
	}
	objName, err := ObjectToName(obj)
	if err != nil {
		return "", err
	}
	return objName.String(), nil
}

DeltaFIFO 队列

DeltaFIFO 是一个 “生产者/消费者” 模式的队列,这里的生产者是指 Reflector 对象及其相关操作,消费者是指 DeltaFIFO.Pop 方法的调用方。

type DeltaFIFO struct {
	// 读写锁
	lock sync.RWMutex
	cond sync.Cond

	// 存储单个资源的变更事件列表 (Deltas)
	// 这里的 key 是通过 Delta 对象编码的
	items map[string]Deltas

	// 队列消费时按照 FIFO (先进先出) 的顺序执行
	// 队列中的元素都是去重的
	// 字段存储的字符串 key 就是资源对象的编码 key
	queue []string

	// 表示是否执行过资源的全量拉取|增量拉取 (也就是是否有生产者写入过数据)
	// 第一次向队列写入数据时,调用的是 Replace 方法 (由 Reflector.list 方法调用)
	// 或者调用的是 Delete/Add/Update/AddIfNotPresent 方法 (由前文中的 Reflector.watchHandler 方法调用)
	populated bool
	// 生产者写入数据时递增
	// 消费者取出数据时递减
	// 等于 0 时就表示队列为空 (事件已经全部同步完成)
	initialPopulationCount int
	
	// 用于生成 key 的方法
	// 默认为上面提到的 MetaNamespaceKeyFunc 方法
	keyFunc KeyFunc

	// 已知的 key 列表 (其实指 Indexer)
	knownObjects KeyListerGetter

	// 队列是否已关闭
	closed bool
}

队列元素去重

队列元素 (Delta 对象) 的去重操作独立成了一个方法,该方法是一个公共方法,这里先来看下其内部实现。

func dedupDeltas(deltas Deltas) Deltas {
	n := len(deltas)
	if n < 2 {
		return deltas
	}
	a := &deltas[n-1]
	b := &deltas[n-2]
	if out := isDup(a, b); out != nil {
		// 如果重复元素不为空
		// 使用较新的替换掉较旧的
		deltas[n-2] = *out
		return deltas[:n-1]
	}
	return deltas
}

isDup 方法用于判断两个 Delta 对象是否一致,如果两个 Delta 对象一致,就返回其中较新的 Delta 对象。

func isDup(a, b *Delta) *Delta {
	if out := isDeletionDup(a, b); out != nil {
		return out
	}
	return nil
}

isDeletionDupe 方法检测两个参数 Delta 对象是否都是 Deleted (删除事件) 类型,如果是的话,就返回两个 Delta 对象中较新的一个。

func isDeletionDup(a, b *Delta) *Delta {
	if b.Type != Deleted || a.Type != Deleted {
		return nil
	}
	if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
		return a
	}
	return b
}

插入元素到队列

DeltaFIFO.Add 方法负责将资源对象插入到队列,内部会做去重复操作,仅当元素不存在队列中时才会执行插入操作。

func (f *DeltaFIFO) Add(obj interface{}) error {
	f.lock.Lock()
	defer f.lock.Unlock()
	
	// 更新 populated 字段状态
	f.populated = true
	return f.queueActionLocked(Added, obj)
}

DeltaFIFO.queueActionLocked 方法负责具体的入队操作。

func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
	// 获取资源的编码 key
	id, err := f.KeyOf(obj)
	
	...

	// 根据资源 key 获取对应的资源列表
	oldDeltas := f.items[id]
	// 直接将参数资源追加到已有列表中
	newDeltas := append(oldDeltas, Delta{actionType, obj})
	// 然后去重
	newDeltas = dedupDeltas(newDeltas)

	if len(newDeltas) > 0 {
		// 检测对应的资源列表是否存在
		if _, exists := f.items[id]; !exists {
			// 没有对应的列表,说明队列本身也是空的
			// 这块逻辑写的有点绕,其实在上面直接 预检测+初始化,代码可读性更高
			f.queue = append(f.queue, id)
		}
		// 更新资源列表
		f.items[id] = newDeltas
		f.cond.Broadcast()
	} else {
		// 异常状态检测 + 日志
        ...   
	}
	
	return nil
}

插入 (不存在的) 元素到队列

DeltaFIFO.AddIfNotPresent 方法向队列插入元素时,会先检测对应的元素是否存在,只有元素不存在时才会插入到队列中,和刚才的 Add 方法不同, 该方法用于直接添加 Delta 资源列表数据类型。

func (f *DeltaFIFO) AddIfNotPresent(obj interface{}) error {
	deltas, ok := obj.(Deltas)
	
	id, err := f.KeyOf(deltas)

	f.lock.Lock()
	defer f.lock.Unlock()
	f.addIfNotPresent(id, deltas)
	return nil
}
func (f *DeltaFIFO) addIfNotPresent(id string, deltas Deltas) {
	// 更新 populated 字段状态
	f.populated = true
	if _, exists := f.items[id]; exists {
		return
	}

	// 写入 key
	f.queue = append(f.queue, id)
	// 写入资源列表
	f.items[id] = deltas
	f.cond.Broadcast()
}

从队列取出元素

DeltaFIFO.Pop 方法负责从队列中消费元素 (Deltas 资源列表),需要注意的是,该方法为阻塞执行。

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
	f.lock.Lock()
	defer f.lock.Unlock()
	
	for {
		for len(f.queue) == 0 {
			if f.closed {
				return nil, ErrFIFOClosed
			}

			// 阻塞等待其他 goroutine 向队列添加元素
			f.cond.Wait()
		}
		
		isInInitialList := !f.hasSynced_locked()
		
		// 从队列头部取出一个元素 (遵从 FIFO 原则)
		// 这里取出的元素是一个 key  
		id := f.queue[0]
		f.queue = f.queue[1:]
		depth := len(f.queue)
		
		if f.initialPopulationCount > 0 {
			// 消费者取出数据时递减
			f.initialPopulationCount--
		}
		
		// 取出 key 对应的资源对象 (Delta) 列表
		item, ok := f.items[id]
        // 删除对应的资源对象
		delete(f.items, id)
		
		// 使用参数 process 方法处理 Deltas
		// 这里的 process 指的就是下文中的 Controller.HandleDeltas 方法
		err := process(item, isInInitialList)
		if e, ok := err.(ErrRequeue); ok {
			// Deltas 重新入队
			f.addIfNotPresent(id, item)
			err = e.Err
		}

		return item, err
	}
}

小结

DeltaFIFO 队列流程图

DeltaFIFO 队列简单来说,就是一个 生产者 -> 消费者 队列,生产者是 Reflector,消费者是 process 处理逻辑方法。 队列命名中的 FIFO 表示其是一个先进先出结构队列,而 Delta 是一个资源对象的增量事件存储 (Delta 对应的数学符号为 Δ {大写} δ {小写}), 用来表示资源对象的具体操作类型,例如 Add 创建、Update 更新、Delete 删除等。


StoreIndex 索引

Index 数据类型中的 key 表示通过索引函数计算出来的值,value 表示对应的结果字符串集合。

Indexers 数据类型中的 key 表示索引函数名称,value 表示索引函数。

Indices 数据类型中的 key 表示索引函数名称,value 表示 Index 数据类型。

type Index map[string]sets.String

type Indexers map[string]IndexFunc

type Indices map[string]Index

storeIndex 对象表示索引结构。

type storeIndex struct {
	
	indexers Indexers
	
	indices Indices
}

添加索引

storeIndex.addKeyToIndex 方法用于为给定的 key 构建索引。

func (i *storeIndex) addKeyToIndex(key, indexValue string, index Index) {
	set := index[indexValue]
	if set == nil {
		set = sets.String{}
		index[indexValue] = set
	}
	set.Insert(key)
}

获取索引

storeIndex.getKeysByIndex 方法用于获取索引对应的值。

func (i *storeIndex) getKeysByIndex(indexName, indexedValue string) (sets.String, error) {
	// 先获取对应的索引函数
	indexFunc := i.indexers[indexName]
	if indexFunc == nil {
		return nil, fmt.Errorf("Index with name %s does not exist", indexName)
	}

	// 获取对应的 Index 对象
	index := i.indices[indexName]
	// 获取对应的字符串集合
	return index[indexedValue], nil
}

更新索引

storeIndex.updateIndices 方法用于更新指定对象的索引,内部的处理逻辑如下:

  • 如果只传递了新对象,未传递旧对象,会执行创建索引操作
  • 如果同时传递了新对象和对象,会执行更新索引操作
  • 如果未传递新对象,只传递了旧对象,会执行创建删除操作
func (i *storeIndex) updateIndices(oldObj interface{}, newObj interface{}, key string) {
	var oldIndexValues, indexValues []string
	var err error
	
	// 遍历索引函数列表
	for name, indexFunc := range i.indexers {
		// 计算旧对象的索引
		if oldObj != nil {
			oldIndexValues, err = indexFunc(oldObj)
		} else {
			oldIndexValues = oldIndexValues[:0]
		}

		// 计算新对象的索引
		if newObj != nil {
			indexValues, err = indexFunc(newObj)
		} else {
			indexValues = indexValues[:0]
		}

		index := i.indices[name]
		if index == nil {
			index = Index{}
			i.indices[name] = index
		}

		// 如果新对象和旧对象的索引值一致
		// 无需执行任何操作
		if len(indexValues) == 1 && len(oldIndexValues) == 1 && indexValues[0] == oldIndexValues[0] {
			continue
		}

		// 删除旧值的索引
		for _, value := range oldIndexValues {
			i.deleteKeyFromIndex(key, value, index)
		}
		// 添加新值的索引
		for _, value := range indexValues {
			i.addKeyToIndex(key, value, index)
		}
	}
}

删除索引

storeIndex.deleteKeyFromIndex 方法负责删除索引,内部实现就是添加方法 addKeyToIndex 的逆过程实现。

func (i *storeIndex) deleteKeyFromIndex(key, indexValue string, index Index) {
	set := index[indexValue]
	if set == nil {
		return
	}
	set.Delete(key)

	// 如果索引值对应的字符串集合已经没有任何元素了
	// 及时释放对应的索引空间
	// 避免 map 内存持续增长带来的 “内存泄漏” 问题
	if len(set) == 0 {
		delete(index, indexValue)
	}
}

小结

StoreIndex 流程图

StoreIndex 对象实现了资源对象的存储索引功能,每次存储索引对象时,通过内部的索引函数计算出索引对象对应的索引值,然后写入到内部的 Indices 对象中, 简单来说,StoreIndex 对象就是用于存储资源对象并自动完成索引功能的本地存储组件,完成存储之后,对于接下来的查询操作,可以通过索引机制高效获取到对应资源对象。

Reflector 通过监听机制将资源对象变更事件数据传入 DeltaFIFO 队列 之后,经过消费者函数操作之后,将资源对象索引存入到 StoreIndex 对象中, 当然 StoreIndex 对象只是完成了资源对象的索引功能,资源对象的具体存储由下文中的 threadSafeMap 对象实现。

通过 StoreIndex 索引对象和 threadSafeMap 对象的配合,可以保证本地拉取到资源对象和 Kubernetes etcd 集群中的资源对象数据一致, 这样就需要获取指定的资源对象时,就不需要每次都从 API Server 中实时获取,大大降低控制平面节点的负载压力。


ThreadSafeStore 接口

ThreadSafeStore 接口表示索引和资源对象缓存的相关操作,具体的实现由 threadSafeMap 对象来完成。

type ThreadSafeStore interface {
	Add(key string, obj interface{})
	Update(key string, obj interface{})
	Delete(key string)
	Get(key string) (item interface{}, exists bool)
	List() []interface{}
	ListKeys() []string
	
	...
}

threadSafeMap

threadSafeMap 对象用于维护索引和资源对象,其中索引通过 storeIndex 对象来实现,缓存资源对象使用一个 map 数据结构实现,在进行具体的操作时加锁。

type threadSafeMap struct {
	lock  sync.RWMutex
	items map[string]interface{}

	// 索引通过 storeIndex 对象来实现
	index *storeIndex
}

初始化

func NewThreadSafeStore(indexers Indexers, indices Indices) ThreadSafeStore {
	return &threadSafeMap{
		items: map[string]interface{}{},
		index: &storeIndex{
			indexers: indexers,
			indices:  indices,
		},
	}
}

CURD 操作

就是普通的 map 操作,为了防止并发读写可能引起的错误,方法内部使用了读写锁。

func (c *threadSafeMap) Add(key string, obj interface{}) {
	c.Update(key, obj)
}
func (c *threadSafeMap) Update(key string, obj interface{}) {
	c.lock.Lock()
	defer c.lock.Unlock()
	oldObject := c.items[key]
	c.items[key] = obj
	c.index.updateIndices(oldObject, obj, key)
}
func (c *threadSafeMap) Delete(key string) {
	c.lock.Lock()
	defer c.lock.Unlock()
	if obj, exists := c.items[key]; exists {
		c.index.updateIndices(obj, nil, key)
		delete(c.items, key)
	}
}
func (c *threadSafeMap) Get(key string) (item interface{}, exists bool) {
	c.lock.RLock()
	defer c.lock.RUnlock()
	item, exists = c.items[key]
	return item, exists
}

小结

threadSafeMap 流程图

控制器

作为控制中心,控制器集成了上文中提到的 Reflector、DeltaFIFO、Indexer、Store 组件,使各组件可以协同工作。

Controller 接口

Controller 表示控制器接口,具体的实现由 controller 来完成。

K8s 代码命名约定俗成:大写定义 interface 接口,小写定义具体实现。

type Controller interface {
	// Run 方法主要做两件事情
	//   1. 初始化并启动 Reflector, 通过 ListerWatcher 监听变化的资源事件并放入队列中
	//   2. 不断从队列中获取变化的资源事件,并执行对应的操作
	Run(stopCh <-chan struct{})

	// HasSynced 的实现委托给了具体的队列
	// 检测同步是否完成
	HasSynced() bool
}

Config 对象

Config 对象是一个大杂烩配置容器,里面包含了控制器运行逻辑中需要的对象。

type Config struct {
	// DeltaFIFO 作为具体实现
	Queue

	// 资源监听
	ListerWatcher

	// 队列中资源事件处理方法
	Process ProcessFunc

	// 全量同步周期
	FullResyncPeriod time.Duration

	// 是否需要重新同步的检测方法
	ShouldResync ShouldResyncFunc

	// 监听错误处理回调方法
	WatchErrorHandler WatchErrorHandler

	// 单次请求数据块大小
	WatchListPageSize int64
}

controller 对象

controller 对象实现了 Controller 接口。

type controller struct {
	config         Config
	reflector      *Reflector
	reflectorMutex sync.RWMutex
	clock          clock.Clock
}

初始化控制器

newInformer 方法用于初始化一个 控制器 实例。

func newInformer(...) Controller {
	// 初始化一个 DeltaFIFO 队列,注入到 Config 配置对象中 
	fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
		KnownObjects:          clientState,
		EmitDeltaTypeReplaced: true,
		Transformer:           transformer,
	})

	cfg := &Config{
		Queue:            fifo,
        ...
	}
	return New(cfg)
}

启动控制器

controller.Run 方法用于处理上下游的业务流程操作,通过启动 Reflector 调用 ListAndWatch 获取全量资源并监听资源变化事件,然后存储到 DeltaFIFO 队列中, 另一方面,通过 controller.processLoop 方法不断从 DeltaFIFO 队列中获取元素并执行对应的操作。

func (c *controller) Run(stopCh <-chan struct{}) {
	// 启动单独的 goroutine 监听关闭 channel 信号
	go func() {
		<-stopCh
		c.config.Queue.Close()
	}()
	
	// 初始化一个 Reflector 
	r := NewReflectorWithOptions(
        ...
	)
	
	var wg wait.Group

	// 调用 Reflector.Run 方法
	// 具体细节请参考前文中的 [启动 Reflector] 小节
	wg.StartWithChannel(stopCh, r.Run)

	// 调用 controller.processLoop 消费队列元素
	wait.Until(c.processLoop, time.Second, stopCh)
	wg.Wait()
}

队列消费

controller.processLoop 方法内部是一个无限循环,不断从队列中取出元素,并执行对应的操作。

func (c *controller) processLoop() {
	for {
		// 这里配置对象中的回调方法也就是 processDeltas
		obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
		
		...
	}
}

最后来看下队列中元素出队后的回调方法 processDeltas, 方法内部会遍历资源的事件列表,然后根据不同的事件类型执行不同的操作。

func processDeltas(...) error {
	// 遍历资源事件列表
	for _, d := range deltas {
		obj := d.Object

		switch d.Type {
		case Sync, Replaced, Added, Updated:
			// 资源非删除事件
			if old, exists, err := clientState.Get(obj); err == nil && exists {
				// 资源更新事件
				// 先从索引中更新资源对象
				if err := clientState.Update(obj); err != nil {
					return err
				}
				// 调用资源更新回调方法
				handler.OnUpdate(old, obj)
			} else {
				// 资源创建事件
				// 先从索引中创建资源对象
				if err := clientState.Add(obj); err != nil {
					return err
				}
				// 调用资源新增回调方法
				handler.OnAdd(obj, isInInitialList)
			}
		case Deleted:
			// 资源删除事件
			// 先从索引中删除资源对象
			if err := clientState.Delete(obj); err != nil {
				return err
			}
			// 调用资源删除回调方法
			handler.OnDelete(obj)
		}
	}
	return nil
}

Controller 流程图

小结

Kubernetes 中的 Informer 是 client-go 客户端库中实现的一种机制,它可以监听 Kubernetes 集群中各类资源对象的变更,并将这些变更事件通知发送到控制器, 并进行对应的数据索引和存储。 通俗地说,Informer 的关键作用就是充当 Kubernetes API Server 和 资源控制器 之间的中间层,类似的工作机制有消息队列、设计模式中的观察者模式等。

Reference

扩展阅读

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。