蛮荆

time/Timer Code Reading

2023-06-10

概述

Go 语言中的计时器常用于定期执行某个操作或者在指定时间内执行某个操作,主要通过标准库的 time 包中的相关方法来实现,如 time.After(), time.Tick() 等。 相信读者已经可以熟练掌握应用层面,本文主要分析计时器功能的内部代码实现。

K 叉堆

在正式分析源代码之前,我们首先简单了解下 K 叉堆 这种数据结构。

K 叉堆(k-ary heap)是一种基于二叉树 (K=2) 的变体堆数据结构,在 K 叉堆中,每个节点最多有 K 个子节点。

K 叉堆和二叉堆一样,单个节点只和其父节点、子节点之间有约束关系,邻居节点之间没有任何约束关系。

K 叉堆 满足如下条件:

  1. K 叉堆是一棵完全二叉树结构,除了叶子节点外,所有非叶子节点层都完全填充 (如果叶子节点也被完全填充,那就是一颗满二叉树)
  2. 和二叉堆一样,也可以分为最小 K 叉堆和最大 K 叉堆
    • 最大 K 叉堆: 根节点的值大于其所有子节点的值
    • 最小 K 叉堆: 根节点的值小于其所有子节点的值
# 示例: 三叉最大堆

         10
     /    |   \
   7      9     8
 / | \   /
4  6  5 7


# 示例: 三叉最小堆

         10
      /   |  \
    12    11  13
  / | \
14 15 18 

使用 K 叉堆 可以实现类似于二叉堆的功能,快速取出最小(或最大)元素,并维护堆的元素有序性,时间复杂度都是 O(logK N), 但是 K 叉堆的层级更低 (因为底数 K 更大),所以 K 叉堆的插入和删除操作时间复杂度实际上更低一些。 此外,二叉堆对数组的访问范围更大也更加随机,但是 K 叉堆更集中于数组的前部,对局部性缓存更友好,有助于提升性能。

源码路径

计时器的源代码文件路径为 $GOROOT/src/runtime/time.go,笔者的 Go 版本为 go1.19 linux/amd64

数据结构

四叉堆

Go 的计时器实现选择了 四叉堆 数据结构 (因为时间涉及先后,所以是最小四叉堆),每个节点保存 4 个 timer 对象,缓存数据局部性比二叉堆好 (极端情况下性能可以翻倍), 没有选择更多的叉树堆,应该是在性能和实际应用场景方面做的折衷 (毕竟大多数业务中定时器数量不会太多)。

四叉堆数据结构示意图

下面是一个典型的的计时器堆结构示意图 (时间单位部分只保留 分-秒):

计时器四叉堆示意图

计时器对象

timer 对象是计时器的运行时表示,存储在处理器 P 的 四叉堆 结构中,然后由处理器中的 goroutine 负责执行计时器的回调函数。

type timer struct {
	// 计时器关联的处理器
	pp puintptr

	// 计时器下次被唤醒的时间
	when   int64
	// 计时器被唤醒的周期时间
	period int64
	// 计时器被唤醒时的回调函数
	f      func(any, uintptr)
	// 计时器被唤醒时的回调函数参数1
	arg    any
	// 计时器被唤醒时的回调函数参数2
	seq    uintptr

	// 当计时器处于 timerModifiedXX 状态时,设置 when 字段
	// 详情见 modtimer 函数
	nextwhen int64

	// 计时器状态
	status atomic.Uint32
}

P 的计时器字段

每个处理器 P 中有一个用于存储计时器的四叉堆数据字段 (timers),这样可以提高数据局部性性能,并且避免了不同处理器之间的锁争用。

type p struct {
	// 处理器中第一个时间器被唤醒的时间
	// 也就是时间最早的计时器
    timer0When atomic.Int64
	
	// 计时器操作锁
	timersLock mutex

	// 存储计时器的四叉堆
	timers []*timer

	// 计时器数量
	numTimers atomic.Uint32

	// 处于 timerDeleted 状态的计时器数量
	deletedTimers atomic.Uint32
}

G 的计时器字段

每个 goroutine 中引用了一个计时器,毕竟 goroutine 是负责具体干活的。

type g struct {
	timer *timer 
}

关系示意图

每个处理器中的计时器最终都需要分配给 goroutine 来负责具体的执行。

处理器和计时器结构示意图

10 种状态

定时器共有 10 种状态来表示整个生命周期中的相关操作和状态变化,关联的是 timer 对象的 status 字段。

const (
	// 初始状态
	timerNoStatus = iota

    // 等待启动
	timerWaiting

	// 运行
	timerRunning

	// 删除 (不会再次运行)
	timerDeleted

	// 正在删除中
	timerRemoving

	// 已经从堆中删除
	timerRemoved

	// 正在修改中
	timerModifying

	// 被修改到了更早的时间 (具体的值关联到了 nextwhen 字段)
	timerModifiedEarlier

	// 被修改到了更晚的时间 (具体的值关联到了 nextwhen 字段)
	timerModifiedLater

	// 已经被修改并正在移动中 (四叉堆数据变化)
	timerMoving
)

状态机

根据标准库中源代码的注释,我们可以画出如下状态机图示。

计时器状态机

算法

现在我们有了定时器数据结构和对应的示意图,剩下的就是基于数据结构形成的算法部分,开始愉快地阅读源代码。:-)

新增计时器

函数 addtimer 用于新增计时器。

func addtimer(t *timer) {
	// 参数合法性检测
    ...
	
	// 计时器状态修改为等待启动
	t.status.Store(timerWaiting)
	
	when := t.when

	// 禁止抢占
	mp := acquirem()

	pp := getg().m.p.ptr()
	// 获取计时器操作锁
	lock(&pp.timersLock)
	// 清理处理器中的定时器
	cleantimers(pp)
	// 将参数计时器加入处理器的四叉堆中
	doaddtimer(pp, t)
	// 释放计时器操作锁
	unlock(&pp.timersLock)

	// 触发调度: 唤醒网络轮询器中休眠的线程
	wakeNetPoller(when)

	releasem(mp)
}

函数 doaddtimer 用于将计时器添加到指定的处理器中。

func doaddtimer(pp *p, t *timer) {
	// 计时器的唤醒和执行依赖于网络轮询器
	if netpollInited.Load() == 0 {
		// 如果网络轮询器未初始化
		// 那就先进行其初始化
		netpollGenericInit()
	}

    // 计时器关联处理器
	t.pp.set(pp)
	// 先将计时器添加到四叉堆尾部
	i := len(pp.timers)
	pp.timers = append(pp.timers, t)
	// 然后通过向上调整 (排序) 操作将计时器放到对应的位置
	siftupTimer(pp.timers, i)
	// 如果计时器调整后位于四叉堆第一个元素
	// 更新处理器的第一个计时器的唤醒时间
	if t == pp.timers[0] {
		pp.timer0When.Store(t.when)
	}
	// 更新计时器数量
	pp.numTimers.Add(1)
}

修改计时器

函数 modtimer 用于修改计时器,该函数会被网络轮训器、Ticker.ResetTimer.Reset 方法调用。

func modtimer(t *timer, when, period int64, f func(any, uintptr), arg any, seq uintptr) bool {
    // 参数合法性检测
    ...
	
	status := uint32(timerNoStatus)
	wasRemoved := false
	var pending bool
	var mp *m
loop:
	for {
		// 状态机内部变化
		// 这里可以对照着上面的状态机图示查看源代码
		switch status = t.status.Load(); status {
		case timerWaiting, timerModifiedEarlier, timerModifiedLater:
            ...
		case timerNoStatus, timerRemoved:
			...
		case timerDeleted:
            ...
		case timerRunning, timerRemoving, timerMoving:
            ...
		case timerModifying:
            ...
		default:
			badTimer()
		}
	}

	t.period = period
	t.f = f
	t.arg = arg
	t.seq = seq

	if wasRemoved {
		// 如果计时器已经被删除
		// 创建新的计时器
		// 这部分代码和 addtimer 函数内部代码几乎一致,这里直接省略 ...
        ...
	} else {
		// 计时器可能位于另一个处理器的四叉堆中
		//    读者可以思考一下为什么会出现这种情况
		// 如果直接修改 when 字段,另一个处理器的四叉堆就乱序了
		// 因此将新的 when 值放入 nextwhen 字段
		// 并让另一个处理器在四叉堆调整时设置 when 字段
		t.nextwhen = when

		// 默认计时器被修改到了更晚的时间
		newStatus := uint32(timerModifiedLater)
		if when < t.when {
			// 如果修改后时间小于修改前时间
			// 将状态设置为 timerModifiedEarlier
			newStatus = timerModifiedEarlier
		}

		// 如果修改后时间小于修改前时间
		// 触发调度: 唤醒网络轮询器中休眠的线程
		if newStatus == timerModifiedEarlier {
			wakeNetPoller(when)
		}
	}

	return pending
}

删除计时器

函数 deltimer 用于删除计时器。

// 计时器可能位于另一个处理器的四叉堆中
// 所以不能直接将其删除,否则另一个处理器的四叉堆就乱序了
// 只能将其状态标记为 “删除” (类似软删除机制)
// 最后由计时器所在处理器删除
func deltimer(t *timer) bool {
	for {
		// 状态机内部变化
		// 这里可以对照着上面的状态机图示查看源代码
		switch s := t.status.Load(); s {
		case timerWaiting, timerModifiedLater:
			...
		case timerModifiedEarlier:
            ...
		case timerDeleted, timerRemoving, timerRemoved:
            ...
		case timerRunning, timerMoving:
			...
		case timerNoStatus:
			...
		case timerModifying:
            ...
		default:
			badTimer()
		}
	}
}

函数 dodeltimer0 用于删除当前处理器四叉堆的堆顶计时器。

func dodeltimer0(pp *p) {
	// 删除堆顶计时器
	// 通过将堆顶计时器替换为堆中最后一个计时器实现
	last := len(pp.timers) - 1
	if last > 0 {
		pp.timers[0] = pp.timers[last]
	}
	pp.timers[last] = nil
	pp.timers = pp.timers[:last]
	
	// 删除完成后,重新进行堆排序
	if last > 0 {
		siftdownTimer(pp.timers, 0)
	}
	updateTimer0When(pp)
	n := pp.numTimers.Add(-1)
}

清除定时器

函数 cleantimers 用于清除计时器。

func cleantimers(pp *p) {
	gp := getg()
	for {
		// 四叉堆为空时直接返回
		if len(pp.timers) == 0 {
			return
		}

		// 取出四叉堆顶的计时器
		t := pp.timers[0]
		
		// 只处理三种状态的计时器
		switch s := t.status.Load(); s {
		case timerDeleted:
			// 将计时器状态修改为正在删除中
			if !t.status.CompareAndSwap(s, timerRemoving) {
				continue
			}
			// 删除计时器
			dodeltimer0(pp)
			// 将计时器状态修改为已删除
			if !t.status.CompareAndSwap(timerRemoving, timerRemoved) {
				badTimer()
			}
			// 计时器数量减 1 
			pp.deletedTimers.Add(-1)
		case timerModifiedEarlier, timerModifiedLater:
			// 将计时器状态修改为正在移动中
			if !t.status.CompareAndSwap(s, timerMoving) {
				continue
			}
			// 更新计时器下次被唤醒时间
			t.when = t.nextwhen
			// 首先删除计时器
			dodeltimer0(pp)
			// 然后将计时器加入四叉堆中
			doaddtimer(pp, t)
			// 将计时器状态修改为等待启动
			if !t.status.CompareAndSwap(timerMoving, timerWaiting) {
				badTimer()
			}
		default:
			return
		}
	}
}

调整计时器

函数 adjusttimers 用于调整计时器,内部主要处理三种状态的计时器:

  1. timerDeleted
  2. timerModifiedEarlier
  3. timerModifiedLater
func adjusttimers(pp *p, now int64) {
	...

	var moved []*timer
	for i := 0; i < len(pp.timers); i++ {
		// 遍历四叉堆,逐个处理计时器
		t := pp.timers[i]
		
		switch s := t.status.Load(); s {
		case timerDeleted:
			if t.status.CompareAndSwap(s, timerRemoving) {
				// 删除计时器
				changed := dodeltimer(pp, i)
				// 将计时器状态修改为已删除
				if !t.status.CompareAndSwap(timerRemoving, timerRemoved) {
					badTimer()
				}
				// 计时器数量减 1 
				pp.deletedTimers.Add(-1)
			}
		case timerModifiedEarlier, timerModifiedLater:
			if t.status.CompareAndSwap(s, timerMoving) {
				// 更新计时器下次被唤醒时间
				t.when = t.nextwhen
                // 首先删除计时器
				changed := dodeltimer(pp, i)
				// 然后将计数器放入一个单独的队列
				moved = append(moved, t)
			}
			
         ...
		}
	}

	if len(moved) > 0 {
		// 
		addAdjustedTimers(pp, moved)
	}

	if verifyTimers {
		// 检测四叉堆中的计时器是否存在异常
		verifyTimerHeap(pp)
	}
}

函数 dodeltimer 用于删除处理器中指定索引的计时器,并返回四叉堆排序中受到影响的计时器索引最小值,其内部实现和 dodeltimer0 函数差不多,这里不在赘述。

func dodeltimer(pp *p, i int) int {
    ...
}

函数 addAdjustedTimers 将参数计时器队列中的所有计时器加入四叉堆中。

func addAdjustedTimers(pp *p, moved []*timer) {
	for _, t := range moved {
		doaddtimer(pp, t)
		// 将计时器状态修改为等待唤醒
		if !t.status.CompareAndSwap(timerMoving, timerWaiting) {
			badTimer()
		}
	}
}

运行计时器

函数 runtimer 用于计时器运行,内部会检查四叉堆的堆顶计时器,如果堆顶计时器准备就绪可以启动运行,那就直接运行并在运行完成后删除或更新该计时器 (取决于计时器的具体类型)。 如果有计时器运行,返回 0,如果没有计时器,返回 -1, 如果没有可运行的计时器,返回堆顶计时器的下次启动时间。

func runtimer(pp *p, now int64) int64 {
	for {
		// 四叉堆顶元素
		t := pp.timers[0]

        // 状态机内部变化
        // 这里可以对照着上面的状态机图示查看源代码
		switch s := t.status.Load(); s {
		case timerWaiting:
			if t.when > now {
				// 没有可运行的计时器,返回堆顶计时器的下次启动时间
				return t.when
			}
			
			// 运行计时器
			runOneTimer(pp, t, now)
			// 有计时器运行,返回 0
			return 0

		case timerDeleted:
            // 删除堆顶计时器
			dodeltimer0(pp)
			// 计时器数量减 1 
			pp.deletedTimers.Add(-1)
			if len(pp.timers) == 0 {
				// 如果没有计时器,返回 -1
				return -1
			}

		...
	}
}

函数 runOneTimer 用于运行单个计时器。

func runOneTimer(pp *p, t *timer, now int64) {
	f := t.f
	arg := t.arg
	seq := t.seq

	// 如果计时器唤醒周期大于 0
	if t.period > 0 {
		// 修改计时器下一次运行时间
		delta := t.when - now
		t.when += t.period * (1 + -delta/t.period)
		if t.when < 0 {
			t.when = maxWhen
		}
		// 更新计时器在四叉堆中的排序 (位置)
		siftdownTimer(pp.timers, 0)
		// 修改计时器状态为等待启动
		if !t.status.CompareAndSwap(timerRunning, timerWaiting) {
			badTimer()
		}
		updateTimer0When(pp)
	} else {
		// 如果计时器唤醒周期小于 0
		// 将计时器从堆中删除
		dodeltimer0(pp)
		// 将计时器状态改为初始状态
		if !t.status.CompareAndSwap(timerRunning, timerNoStatus) {
			badTimer()
		}
	}

	// 执行计时器回调函数
	f(arg, seq)

	...
}

调度器触发条件

调度器

函数 checkTimers 用于运行指定处理器中的计时器,有两个调用方:

  1. runtime.findRunnable 函数用于获取可运行的 goroutine
  2. runtime.stealWork 函数用于从其他处理器窃取 goroutine 或者计时器
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
	// 选出离当前时间最近的计时器
	next := pp.timer0When.Load()
	nextAdj := pp.timerModifiedEarliest.Load()
	if next == 0 || (nextAdj != 0 && nextAdj < next) {
		next = nextAdj
	}

	if next == 0 {
		// 没有可以启动运行的计时器
		// 直接返回
		return now, 0, false
	}

	if now == 0 {
        // 如果参数 now 为 0,将其设置为当前时间
		now = nanotime()
	}
	
	// 没有可以启动的计时器
	if now < next {
		// 如果参数处理器和当前处理器不一致,直接返回
		// 如果需要删除的计时器数量比例低于 25%,直接返回
		if pp != getg().m.p.ptr() || int(pp.deletedTimers.Load()) <= int(pp.numTimers.Load()/4) {
			return now, next, false
		}
	}


	// 如果处理器中还有计时器
	// 首先进行一次调整,详情见: adjusttimers 函数 
	if len(pp.timers) > 0 {
		adjusttimers(pp, now)
		for len(pp.timers) > 0 {
			// 计时器运行
			if tw := runtimer(pp, now); tw != 0 {
				if tw > 0 {
					// 如果没有可运行的计时器,使用堆顶计时器的下次启动时间
					pollUntil = tw
				}
				break
			}
			ran = true
		}
	}

	// 如果参数处理器和当前处理器一致 (避免锁争用)
	// 并且需要删除的计时器数量比例高于 25%
    // 删除处理器的四叉堆中状态被标记为 timerDeleted 的所有计时器
	if pp == getg().m.p.ptr() && int(pp.deletedTimers.Load()) > len(pp.timers)/4 {
		clearDeletedTimers(pp)
	}
	
	return now, pollUntil, ran
}

系统监控

获取最近的计时器

函数 timeSleepUntil 返回所有处理器中离当前时间最近的计时器的启动时间,如果没有任何计时器,就返回常量 maxWhen

该函数只能被 sysmon 函数 (监控线程方法) 和 checkdead (检测死锁) 函数调用。

func timeSleepUntil() int64 {
	next := int64(maxWhen)

	// 遍历所有处理器
	for _, pp := range allp {
		w := pp.timer0When.Load()
		if w != 0 && w < next {
			next = w
		}

		// 获取处于临时转换状态的计时器时间
		w = pp.timerModifiedEarliest.Load()
		if w != 0 && w < next {
			next = w
		}
	}

	return next
}

sysmon 监控

sysmon 监控在之前的 GMP 调度器 一文中已经提到过,这里不再赘述,着重看一下内部调用 timeSleepUntil 函数的部分。

func sysmon() {
	...
	
	for {
		...
		
		now := nanotime()
		if debug.schedtrace <= 0 && (sched.gcwaiting.Load() || sched.npidle.Load() == gomaxprocs) {
			if sched.gcwaiting.Load() || sched.npidle.Load() == gomaxprocs {
				// 获取最近的计时器的启动时间
				// 根据计时器的启动时间调整休眠时间,便于第一时间启动定时器
				next := timeSleepUntil()
				if next > now {
                    ...
				}
			}
		}
		
		...
	}
}

小结

本文着重介绍了 Go 中计时器的数据结构和状态操作相关的算法,Go 的计时器采用 四叉堆 数据结构并且直接绑定到处理器 P 上, 通过 GMP 全局调度体系可以直接管理所有处理器中的计时器,并且使单个计时器的启动延迟最小化。文中没有摘录计时器四叉堆的算法相关代码, 感兴趣的读者可以自行阅读 runtime.siftupTimer, runtime.siftdownTimer 两个函数。

Reference

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。