蛮荆

sync/atomic Code Reading

2023-04-21

概述

sync/atomic 包提供了原子同步操作原语,整个操作过程无需加锁,也不会产生 goroutine 上下文切换。

API

atomic API

Swap 操作由 SwapT 系列函数 (例如 SwapInt32, SwapInt64) 实现,等价于如下的原子操作:

old = *addr
*addr = new
return old

CAS 操作由 CompareAndSwapT 系列函数 (例如 CompareAndSwapInt32, CompareAndSwapInt64) 实现,等价于如下的原子操作:

if *addr == old {
    *addr = new
    return true
}
return false

Add 操作由 AddT 系列函数 (例如 AddInt32, AddInt64) 实现,等价于如下的原子操作:

*addr += delta
return *addr

Load And Store 操作由 LoadT and StoreT 系列函数 (例如 LoadInt32, LoadInt64) 实现,等价于如下的原子操作:

*addr = val

内部实现

我们来探究一下 sync/atomic 包的内部实现,文件目录路径为 $GOROOT/src/sync/atomic,笔者的 Go 版本为 go1.19 linux/amd64。 需要注意的是,该文件内只给出了函数的定义,函数的实现为在对应的 asm.s 汇编文件中。

函数声明

Swap, CAS 等操作的函数原型全部定义在 doc.go 文件中。

// 在 386 上, 64 位函数使用的指令在 Pentium MMX 之前不可用
// 在非 Linux ARM 上,64 位函数使用的指令在 ARMv6k core 之前不可使用
// 在 ARM 上,386 和 32 位 MIPS,调用方负责以原子的方式访问 64 位对齐的 64 位字
// 全局变量、局部变量、已经分配的结构体、数组、切片中的第一个字可以依赖于 64 位对齐

func SwapInt32(addr *int32, new int32) (old int32)

func SwapInt64(addr *int64, new int64) (old int64)

...


func StoreUintptr(addr *uintptr, val uintptr)

func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

函数实现

Swap, CAS 等操作的函数实现全部在 asm.s 汇编文件中,但是该文件中的函数并不是直接实现,而是 “套了一层壳”, 最终的实现在平台体系结构对应的汇编文件中,目录为$GOROOT/src/runtime/internal/atomic。 例如: 笔者的 Go 版本为 go1.19 linux/amd64, 那么就会跳转到 runtime/internal/atomic/atomic_amd64.s 文件对应的函数。

不同平台体系结构的实现

这里以 ·SwapInt32 汇编函数为例,来看下函数的跳转和实现。

# asm.s 文件

TEXT ·SwapInt32(SB),NOSPLIT,$0
	JMP	runtime∕internal∕atomic·Xchg(SB)

函数的实现中使用了 JMP 跳转指令,最终跳转到了文件 $GOROOT/src/runtime/internal/atomic/atomic_amd64.s 中的 ·Xchg 函数。

// atomic_amd64.s 文件

TEXT ·Xchg(SB), NOSPLIT, $0-20
	MOVQ	ptr+0(FP), BX   // 参数 1,8 字节 *int32 指针
	MOVL	new+8(FP), AX   // 参数 2,4 字节 int32
	XCHGL	AX, 0(BX)       // 交换指令
	MOVL	AX, ret+16(FP)  // 交换后的 AX(old value) 写入 FP 伪寄存器返回值位
	RET

// 上面的汇编代码等价于如下 Go 代码
// uint32 Xchg(ptr *uint32, new uint32)
// Atomically:
//	old := *ptr
//	*ptr = new
//	return old

atomic.Value

接下来看一下原子数据类型 atomic.Value 的内部实现。

Value 对象

Value 数据类型提供了一致性原子性的 Swap, CAS 等方法,其中

  • Load 方法调用时,Value 的零值返回 nil
  • Store 方法一旦被调用,Value 对象就不能再复制
  • Value 对象一旦使用后,就不能再复制
type Value struct {
    v any
}

ifaceWords 对象

ifaceWords 对象是 Value 对象对应的 数据类型 + 值 的内部抽象表示,虽然 Value 对象相关方法的参数类型是 any,但是内部操作的都是 ifaceWords 对象 (通过类型转换机制)。

type ifaceWords struct {
	typ  unsafe.Pointer // 类型
	data unsafe.Pointer // 值
}

ifaceWords 对象

Value.Load 方法

Load 返回最新设置的值,如果未对该值调用过 Store 方法 (Value 处于零值状态),返回 nil。

func (v *Value) Load() (val any) {
	// 类型转化
	vp := (*ifaceWords)(unsafe.Pointer(v))
	// 原子获取值类型
	typ := LoadPointer(&vp.typ)
    // 从未调用过 Store 方法
	if typ == nil || typ == unsafe.Pointer(&firstStoreInProgress) {
		return nil
	}
	
	// 原子获取值
	data := LoadPointer(&vp.data)
	// 返回值类型转换
	vlp := (*ifaceWords)(unsafe.Pointer(&val))
	// 返回值类型
	vlp.typ = typ
	// 返回值数据值
	vlp.data = data
	return
}

Value.Store 方法

Store 方法设置 Value 对象的值,对于某个特定的 Value 对象,每次调用 Store 方法时都必须使用相同的数据类型值 (例如第一次存储的是 int 类型, 那么之后调用时必须传递 int 类型), 类型不一致时会产生 panic, 传递 nil 参数 ( Store(nil) ) 同样会产生 panic

func (v *Value) Store(val any) {
	if val == nil {
		// 参数为 nil, 直接 panic
		panic("sync/atomic: store of nil value into Value")
	}
	
	// 参数类型转换
	vp := (*ifaceWords)(unsafe.Pointer(v))
	// 返回值类型转换
	vlp := (*ifaceWords)(unsafe.Pointer(&val))
	
	for {
		// 原子获取参数值数据类型
		typ := LoadPointer(&vp.typ)
		
		if typ == nil {
			// 如果类型为 nil, 尝试开始第一次 Store
			// 禁止抢占(避免操作未完成时,被其他 goroutine 抢占),其他 goroutine 可以使用自旋锁来等待完成
			// 同时可以避免 GC 的时候看到 unsafe.Pointer(^uintptr(0)) 这个中间状态的值
			
			runtime_procPin() // 禁止抢占 (具体的实现这里先忽略)
			
			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(&firstStoreInProgress)) {
				// 如果设置值的类型时失败
				// 说明当前 goroutine 在禁止抢占执行结束前,已经有其他 goroutine 设置完了值类型
				runtime_procUnpin() // 恢复抢占
				continue
			}
			
			// 完成第一次 Store 操作
			StorePointer(&vp.data, vlp.data)
			StorePointer(&vp.typ, vlp.typ)
			
			runtime_procUnpin() // 恢复抢占 (具体的实现这里先忽略)
			return
		}
		
		if typ == unsafe.Pointer(&firstStoreInProgress) {
			// 第一次 Store 操作正在执行中,等待...
			// [待优化项] 因为在第一次 Store 禁用了抢占,所以可以使用自旋锁等待完成
			continue
		}
		
		// 已经完成了第一次 Store 操作, 检查类型并覆盖数据(检查是否和第一次设置的数据类型一致)
		if typ != vlp.typ {
			// 当前数据类型和第一次设置的数据类型不一致,产生 panic
			panic("sync/atomic: store of inconsistently typed value into Value")
		}
		
		StorePointer(&vp.data, vlp.data)
		return
	}
}

Value.Swap 方法

Swap 方法将新值存储到 Value 对象,并返回 Value 对象的旧值,如果 Value 为空, 则返回 nil。 对于某个特定的 Value 对象,每次调用 Swap 方法时都必须使用相同的数据类型值 (例如第一次存储的是 int 类型, 那么之后调用时必须传递 int 类型), 类型不一致时会产生 panic, 传递 nil 参数 ( Swap(nil) ) 同样会产生 panic

func (v *Value) Swap(new any) (old any) {
	if new == nil {
        // 参数为 nil, 直接 panic
		panic("sync/atomic: swap of nil value into Value")
	}
	
    // 当前值类型转换
	vp := (*ifaceWords)(unsafe.Pointer(v))
	// 参数类型转换
	np := (*ifaceWords)(unsafe.Pointer(&new))
	
	for {
		// 原子获取当前值数据类型
		typ := LoadPointer(&vp.typ)
		
		if typ == nil {
            // Value 对象还没有被设置
            // 尝试开始第一次 Store 操作
			
			// 流程和 Store 方法内部类型,这里直接省略掉
			...

			return nil
		}
		
        ...
		
        // 已经完成了第一次 Store 操作, 检查类型并覆盖数据(检查是否和第一次设置的数据类型一致)
		if typ != np.typ {
			// 当前数据类型和第一次设置的数据类型不一致,产生 panic
			panic("sync/atomic: swap of inconsistently typed value into Value")
		}
		
		// 返回值类型转换
		op := (*ifaceWords)(unsafe.Pointer(&old))
		op.typ, op.data = np.typ, SwapPointer(&vp.data, np.data)
		return old
	}
}

Value.CompareAndSwap 方法

CompareAndSwap 执行 CAS 操作,对于某个特定的 Value 对象,每次调用 CompareAndSwap 方法时都必须使用相同的数据类型值 (例如第一次存储的是 int 类型, 那么之后调用时必须传递 int 类型), 类型不一致时会产生 panic, 传递 nil 参数 ( CompareAndSwap(old, nil) ) 同样会产生 panic

func (v *Value) CompareAndSwap(old, new any) (swapped bool) {
	if new == nil {
        // 参数为 nil, 直接 panic
		panic("sync/atomic: compare and swap of nil value into Value")
	}
	
	// 当前值类型转换
	vp := (*ifaceWords)(unsafe.Pointer(v))
	// 参数新值类型转换
	np := (*ifaceWords)(unsafe.Pointer(&new))
	// 参数旧值类型转换
	op := (*ifaceWords)(unsafe.Pointer(&old))
	
	if op.typ != nil && np.typ != op.typ {
		// 新值和旧值数据类型不一致
		panic("sync/atomic: compare and swap of inconsistently typed values")
	}
	
	for {
		// 原子获取当前值数据类型
		typ := LoadPointer(&vp.typ)
		if typ == nil {
			if old != nil {
				// 当前值和参数值类型不一样
				return false
			}

            // 流程和 Store 方法内部类型,这里直接省略掉
			...
			
			return true
		}
		
        ...
		
		if typ != np.typ {
			// 参数新值数据类型和当前值的数据类型不一致,产生 panic
			panic("sync/atomic: compare and swap of inconsistently typed value into Value")
		}
		
		// CompareAndSwapPointer 函数只能确保 vp.data 从获取到之后没有发生变化
		data := LoadPointer(&vp.data)
		
		// 拷贝当前值的变量,然后和参数旧值进行比较 
		var i any
		// 当前值类型
		(*ifaceWords)(unsafe.Pointer(&i)).typ = typ
		// 当前值数据
		(*ifaceWords)(unsafe.Pointer(&i)).data = data
		if i != old {
			// 当前值已经发生变化
			return false
		}
		
		// 调用 CAS 操作 (内部再调用对应的汇编)
		return CompareAndSwapPointer(&vp.data, data, np.data)
	}
}

小结

atomic 实现了同步算法的底层内存原子操作原语,其内部实现主要是汇编 (各个平台对应各自不同的指令,通过编译器链接),使用这些函数时需要更加谨慎, 官方给出的建议是除了特殊的、底层的应用程序外,其他情况最好使用 channel 或其他同步原语来完成 (但是从大多数开源组件实现代码来看,并没有遵守官方的建议)。

标准库还提供了 atomic.Value 原子数据类型,并且为该类型实现了常见的原子操作,如 Load, Store, CAS 等, 在实现类似 对象需要原子操作 这样的功能时可以直接复用该类型,例如标准库 context 包的内部实现中里面就用到了 atomic.Value

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。