sync.Map Code Reading
2023-04-27 Golang 并发编程 Go 源码分析 读代码
概述
sync.Map 提供了并发安全的 map
操作,数据结构语义类似 map[any]any
,多个 goroutine
并发操作时无需加锁。
官方的建议是大多数情况下,应该使用普通 map
类型,并完成对应的锁和并发控制以保证安全性,这样可以使类型拥有更好的安全性和可维护性。
sync.Map
类型是针对两种特殊的场景进行优化的:
- 当指定的
key
只被写入一次,但是被读取多次,例如不断增长的缓存类应用 - 当多个
goroutine
同时读和写,但是每个goroutine
覆盖到的是不同的key
(类似分段锁的概念)
在这两种场景下,与使用 map + 锁
相比,使用 sync.Map
可以大大降低锁的竞争。
本文来探究一下 sync.Map
的内部实现,文件路径为 $GOROOT/src/sync/map.go
,笔者的 Go 版本为 go1.19 linux/amd64
。
数据结构
Map 对象
// Map 零值是空的并且可以使用
// Map 一旦使用后,便不能再复制
// Loads, stores, and deletes 通过延迟操作来均摊成本,从而达到常数运行时间
// 在 Go 内存模型术语中,Map 确保写操作在任何观察其变动的读操作之前同步,其中读和写的定义如下
// Load, LoadAndDelete 是读操作
// Delete, LoadAndDelete, Store 是写操作
// 当调用 LoadOrStore 方法返回值 loaded 为 false 时,LoadOrStore 是写操作,否则是读操作
type Map struct {
// 当写 read map 或者读写 dirty map 时,需要加互斥锁
mu Mutex
// read 字段包含了一部分 map 内容,这部分内容并发访问安全(无论是否持有互斥锁)
// read 字段本身对 load 而言永远是安全的,但执行 store 操作时,必须持有互斥锁
// read 是只读的数据结构,访问无需加锁,sync.Map 优先访问 read 字段
// read 中存储的数据值可以在不持有互斥锁的情况下并发更新
// 但更新一个之前被标记为 expunged 的元素需要持有互斥锁,将该元素拷贝到 dirty map 并标记 unexpunged (expunged 逆操作)
read atomic.Value // readOnly
// dirty map 包含了访问时需要持有互斥锁的元素
// 为了确保 dirty map 中的元素能够快速地移动到 read map
// 它也包含了那些 read map 中标记为 (non-expunged) 的元素
// 新添加的元素会优先放入 dirty map
// 被标记为 expunged 的元素不会存储在 dirty map 中
// 被标记为 expunged 的元素如果要存储新的值,需要先执行 unexpunged (expunged 逆操作) 添加到 dirty map, 然后再更新值
// 如果 dirty map == nil, 下一个对 map 的写入将通过浅拷贝一个空 map 来初始化它,忽略过期的元素
dirty map[any]*entry
// 一旦发生足够多的 misses 次数,超过拷贝 dirty map 的成本,dirty map 会被合并进 read map(unamended 状态下)
// 并且下一次的存储操作会生成一个新的 dirty map
misses int
}
readOnly 对象
readOnly
对象表示 Map.read 字段中的只读数据 (注意: 这里的只读指的是逻辑只读,底层的数据还是可以修改的),
其中 m 字段用来表示具体的只读数据,如果某个 key 存在于 dirty 字段中,却不存在于 m 字段中,amended 字段等于 true。
type readOnly struct {
m map[any]*entry
amended bool
}
expunged 类型
expunged
类型表示一个任意类型数据的指针,主要作用是作为删除标志,标记从 dirty map 中删除的元素。
var expunged = unsafe.Pointer(new(any))
entry 对象
entry
对象表示任意 key
对应的数据,这里同样使用指针来表示。
type entry struct {
// p 指向 entry 对应的数据
// 如果 p == nil, 说明对应的 entry 已经被删除
// 此时,m.dirty == nil 或 m.dirty[k] 指向该 entry
// 如果 p == expunged, 说明对应的 entry 已经被删除
// 此时,m.dirty != nil 且 m.dirty 中不存在 entry
// 其他情况代表 entry 是合法的值并且存在于 m.read.m[key]
// 如果 m.dirty != nil, entry 同时也会存在于 m.dirty[key] 中
// 删除 entry 时执行 CAS 操作替换为 nil (并不进行实际删除)
// 当 m.dirty 后续创建时 (dirty 提升后第一次新建 entry)
// 会对 entry 执行 CAS 操作,由 nil 替换为 expunged, 且不设置 m.dirty[key] 的值
// 一个 entry 对应的值可以用 CAS 操作来更新,前提是 p != expunged
// 如果 p == expunged, entry 对应的值只能在首次赋值 m.dirty[key] = e 之后更新
// 这样查找操作可以通过 dirty map 来找到这个 entry
p unsafe.Pointer // *interface{}
}
func newEntry(i any) *entry {
return &entry{p: unsafe.Pointer(&i)}
}
// 从 entry 中原子性加载实际的数据值
func (e *entry) load() (value any, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == nil || p == expunged {
return nil, false
}
return *(*any)(p), true
}
获取操作
Load 方法
Load
方法会优先读取 read map
(无需加锁),如果没有找到对应的元素,会加锁尝试从 dirty map
中读取。
// 获取 map 中 key 对应的值,如果不存在返回 nil, 第二个返回值表示 key 值是否存在于 map 中
func (m *Map) Load(key any) (value any, ok bool) {
// 优先从 read map 中读取 (无锁操作)
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
// 如果 read 没有对应的 key,并且 amended 字段标识 dirty map 存在 read map 中不存在的 key
// 则加锁尝试从 dirty.map 中获取
if !ok && read.amended {
m.mu.Lock()
// double check 是避免在加锁期间,dirty map 提升为 read map
// 如果已经发生合并,可以避免报告一个不必要的 miss
// 因为足够多的 miss 数会将 dirty map 提升为 read map
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
e, ok = m.dirty[key]
// 无论元素是否存在 dirty map 中,都记录 1 次 miss
// 在 dirty map 被提升到 read map 之前,这个 key 对应的值会一直从 dirty map 中获取
// 方法内部可能将 dirty map 提升到 read map
m.missLocked()
}
m.mu.Unlock()
}
if !ok {
return nil, false
}
// 如果 key 存在,加载对应的值
return e.load()
}
设置操作
Store 方法
Store
方法用于设置 key 和对应的值。
func (m *Map) Store(key, value any) {
// 如果 key 对应的值存在于 read map 中,尝试直接修改
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok && e.tryStore(&value) {
return
}
m.mu.Lock()
// double check 是避免在加锁期间,dirty map 提升为 read map
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// 如果 key 对应的值存在于 read map 中,但 p == expunged, 说明 dirty != nil 并且 key 对应的值不存在于 dirty map 中
// 先将 p 的状态由 expunged 改为 nil
// dirty map 新建 key
// 更新 entry.p = value (read map 和 dirty map 指向同一个entry)
m.dirty[key] = e
}
// 如果 key 对应的值存在于 read map 中,但 p != expunged, 直接更新 entry
// 此时 m.dirty == nil 或 m.dirty[key] == e
e.storeLocked(&value)
} else if e, ok := m.dirty[key]; ok {
// 如果 key 对应的值不存在于 read map 中,但存在于 dirty map 中,直接写入更新 entry (read map 中依然不存在)
e.storeLocked(&value)
} else {
if !read.amended {
// 如果 read map 和 dirty map 都不存在该值
// 如果 dirty map == nil, 创建 dirty map, 并从 read map 中拷贝未删除的元素
// 更新 amended 字段,标识 dirty map 中存在 read map 中不存在的 key
// 将 K => V 写入 dirty map, read map 不变
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
}
// Tips: hot path 不使用 defer 释放锁
m.mu.Unlock()
}
tryStore 方法
tryStore
方法尝试直接更新 entry, 如果 entry == expunged, 返回 false。
func (e *entry) tryStore(i *any) bool {
for {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return false
}
if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
return true
}
}
}
unexpungeLocked 方法
unexpungeLocked
方法用于取消 entry 的 expunged 标记。
func (e *entry) unexpungeLocked() (wasExpunged bool) {
return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}
storeLocked 方法
storeLocked
方法用于直接将值存入 entry。
func (e *entry) storeLocked(i *any) {
atomic.StorePointer(&e.p, unsafe.Pointer(i))
}
LoadOrStore 方法
LoadOrStore
方法用于获取 key 对应的值,如果 key 对应的值存在,直接返回 (此时第二个返回值为 true),否则就将 key 设置为参数值然后返回 (此时第二个返回值为 false)。
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) {
// 如果命中就直接返回,避免锁操作
read, _ := m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
actual, loaded, ok := e.tryLoadOrStore(value)
if ok {
return actual, loaded
}
}
m.mu.Lock()
// double check 是避免在加锁期间,dirty map 提升为 read map
read, _ = m.read.Load().(readOnly)
if e, ok := read.m[key]; ok {
if e.unexpungeLocked() {
// 如果 key 对应的值存在于 read map 中,但 p == expunged, 说明 dirty != nil 并且 key 对应的值不存在于 dirty map 中
// 先将 p 的状态由 expunged 改为 nil
// dirty map 新建 key
// 更新 entry.p = value (read map 和 dirty map 指向同一个entry)
m.dirty[key] = e
}
actual, loaded, _ = e.tryLoadOrStore(value)
} else if e, ok := m.dirty[key]; ok {
// 如果 key 对应的值不存在于 read map 中,但存在于 dirty map 中,直接写入更新 entry (read map 中依然不存在)
actual, loaded, _ = e.tryLoadOrStore(value)
// 记录 1 次 miss
// 方法内部可能将 dirty map 提升到 read map
m.missLocked()
} else {
if !read.amended {
// 如果 read map 和 dirty map 都不存在该值
// 如果 dirty map == nil, 创建 dirty map, 并从 read map 中拷贝未删除的元素
// 更新 amended 字段,标识 dirty map 中存在 read map 中不存在的 key
// 将 K => V 写入 dirty map, read map 不变
m.dirtyLocked()
m.read.Store(readOnly{m: read.m, amended: true})
}
m.dirty[key] = newEntry(value)
actual, loaded = value, false
}
m.mu.Unlock()
return actual, loaded
}
tryLoadOrStore 方法
tryLoadOrStore
方法用于原子性地加载或存储一个 entry 对应的值,前提是 entry 未被标记为 expunged, 如果 entry 被标记为 expunged, 函数不做任何修改,第二个返回值为 false。
func (e *entry) tryLoadOrStore(i any) (actual any, loaded, ok bool) {
p := atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*any)(p), true, true
}
ic := i
for {
if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
return i, false, true
}
p = atomic.LoadPointer(&e.p)
if p == expunged {
return nil, false, false
}
if p != nil {
return *(*any)(p), true, true
}
}
}
删除操作
Delete 方法
// 删除 key 对应的值
func (m *Map) Delete(key any) {
m.LoadAndDelete(key)
}
LoadAndDelete 方法
LoadAndDelete
方法是 Delete
方法的具体实现,和 Load
方法的机制一样,优先从 read map
中删除 (无需加锁),如果没有找到对应的元素,
会加锁尝试从 dirty map
中删除,第二个返回值表示 key 对应的值是否存在。
func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
read, _ := m.read.Load().(readOnly)
e, ok := read.m[key]
if !ok && read.amended {
m.mu.Lock()
// double check 是避免在加锁期间,dirty map 提升为 read map
read, _ = m.read.Load().(readOnly)
e, ok = read.m[key]
if !ok && read.amended {
// 如果 key 对应的值不存在于 read map 中,但存在于 dirty map 中,直接删除
e, ok = m.dirty[key]
delete(m.dirty, key)
// 记录 1 次 miss
// 在 dirty map 被提升到 read map 之前,这个 key 对应的值会一直从 dirty map 中获取
m.missLocked()
}
m.mu.Unlock()
}
if ok {
// key 对应的值存在于 read map 中,直接删除
return e.delete()
}
return nil, false
}
delete 方法
delete
方法负责 entry
的具体删除。
func (e *entry) delete() (value any, ok bool) {
for {
p := atomic.LoadPointer(&e.p)
// 如果 p == nil, 说明对应的 entry 已经被删除
// 如果 p == expunged, 说明对应的 entry 已经被删除
if p == nil || p == expunged {
return nil, false
}
// 这里是数据真正被删除的代码
if atomic.CompareAndSwapPointer(&e.p, p, nil) {
return *(*any)(p), true
}
}
}
遍历操作
Range 方法
如果全部的数据都在 read map
中,无需加锁,直接读取即可,否则会先加锁,将 dirty map
中的数据全部加载到 read map
,
然后重置 dirty map
和 misses
计数器,这样可以避免遍历过程中多次访问 dirty map
时导致的加锁和性能影响。
// Range 方法按照每个 key 和 value 在 map 中出现的顺序依次调用参数函数 f
// 如果函数 f 返回 false, range 停止遍历
// Range 方法是无序遍历,但是保证每个 key 只会被访问一次
// 如果某个 key 对应的 value 被并发地更新或者删除了,Range 可能返回修改前或修改后的值
// Range 方法不阻塞接收者的其他方法,甚至回调函数 f 本身也可以调用 Map 的任何方法
// 不论 Map 中有多少元素,Range 时间复杂度都是 O(N)
// 即使函数 f 在一定的调用次数之后返回 false 也一样
func (m *Map) Range(f func(key, value any) bool) {
// 在调用 Range 时,能够遍历 map 里面所有的 key
// 如果 read.amended == false
// 说明 dirty map 中没有元素,直接遍历 read map 就可以
read, _ := m.read.Load().(readOnly)
if read.amended {
// m.dirty 包含 read.m 中不存在的 key, 不过影响不大,毕竟 Range 时间复杂度为 O(N)
// 假设调用方不会中断遍历过程,对于 Range 的调用必然会分阶段完整地拷贝整个 map
// 这时候可以将 dirty map 提升到 read map 就可以提升性能
m.mu.Lock()
// double check 是避免在加锁期间,dirty map 提升为 read map
read, _ = m.read.Load().(readOnly)
if read.amended {
read = readOnly{m: m.dirty}
m.read.Store(read)
m.dirty = nil
m.misses = 0
}
// Tips: hot path 不使用 defer 释放锁
m.mu.Unlock()
}
// 此时 dirty map 和 read map 已经合并
// 普通 map 遍历
for k, e := range read.m {
v, ok := e.load()
if !ok {
continue
}
if !f(k, v) {
break
}
}
}
miss 计数
missLocked 方法
missLocked
方法用于增加 Map
操作过程中的 misses 计数,一旦发生足够多的 misses 次数,超过拷贝 dirty map 的成本,直接将 dirty map 赋值给 read map。
func (m *Map) missLocked() {
m.misses++
// 成本计算: miss 次数小于 dirty map 长度
if m.misses < len(m.dirty) {
return
}
// 提升过程很简单,直接将 dirty map 赋值给 read map
// 提升完成之后,amended == false, m.dirty == nil
m.read.Store(readOnly{m: m.dirty})
m.dirty = nil
m.misses = 0
}
数据拷贝
dirtyLocked 方法
dirtyLocked
方法用于当 dirty map 等于 nil 时,将 read map 元素拷贝到 dirty map。
func (m *Map) dirtyLocked() {
if m.dirty != nil {
return
}
read, _ := m.read.Load().(readOnly)
m.dirty = make(map[any]*entry, len(read.m))
for k, e := range read.m {
// 1. 将所有为 nil 的元素标记为 expunged
// 2. 只拷贝没有标记为 expunged 的元素
if !e.tryExpungeLocked() {
m.dirty[k] = e
}
}
}
tryExpungeLocked 方法
tryExpungeLocked
方法用于将等于 nil 的 entry 标记为 expunged。
func (e *entry) tryExpungeLocked() (isExpunged bool) {
p := atomic.LoadPointer(&e.p)
for p == nil {
if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
return true
}
p = atomic.LoadPointer(&e.p)
}
return p == expunged
}
相关问题
expunged 的使用场景
调用 Store
, LoadOrStore
方法存入新的元素时,如果 read map
和 dirty map
中都不存在该元素,会调用 dirtyLocked
方法将 read map
中所有未标记为 expunged
的元素复制到 dirty map
中。
expunged 标记可以去除吗?
不行,expunged
作为一个 过渡状态标识
是必要的。
- 如果没有
expunged
, 删除元素时就没有办法实现软删除
,这就导致必须对read map
加锁,性能直接退化到了普通 map + 互斥锁
的方案 - 实现不了
软删除
, 只能将元素设置为nil
, 这就导致即使dirty map
提升为read map
, 但是里面被删除的垃圾数据依然会保留 (这里的保留指的是key
相关数据,元素删除时就被置为nil
了),最终导致内存占用过多 - 调用
Store
方法存入新的元素时,没有办法区分元素是不存在还是已经被删除 (因为两者都是nil
),造成read map
和dirty map
数据不同步 (read map
里面有dirty map
不存在的数据), 那么在dirty map
提升为read map
时,就会导致数据丢失
为什么需要双重锁检测?
在 Map
操作相关方法中,可以看到下面这行代码往往会出现两次:
read, _ := m.read.Load().(readOnly)
这是为了避免在加锁期间,dirty map
提升为 read map
,所以获取锁操作后需要再次检查 key
是否存在于 read map
中。
为什么没有实现 Len
方法?
- 实现
Len
方法需要统计readOnly
和dirty
两个字段的数据,这样就需要引入锁机制,导致性能下降,还会额外增加代码实现复杂度 - 针对
sync.Map
的并发操作会导致其数据变化很快,Len
方法难以保证时效性
为什么适用的场景是读多写少?
参考 Load
方法和 Store
方法的注释。
小结
从应用层面来说,针对读多写少的场景,sync.Map
提供了并发安全的 map
操作,包对外提供的方法非常简洁,同时可以避免 普通 map + 互斥锁
实现方案中,
获取锁和释放锁等繁琐和容易出错的地方,有两个小的细节需要注意:
- 包对外提供的方法参数类型都是
any
, 在使用具体的数据类型时,需要转换 - 包没有提供
Len
方法,如果需要获取元素个数,需要调用Range
方法计算
从内部底层实现来说,sync.Map
通过 entry
类型将 map
的变化和元素的变化进行隔离的同时, 还保证了底层数据的复用 (read map
和 dirty map
指向同一个 entry
),
read
, dirty
2 个字段加原子操作的紧密配合,非常简洁优雅地实现了 sync.Map
根据读写比例动态变化和转换内部字段数据结构,
尤其是 read
和 dirty
两个字段保持部分内存结构布局一致并且通过指针直接转换,这些都是值得我们学习的编码技巧。
图片来源: http://russellluo.com/2017/06/go-sync-map-diagram.html