蛮荆

sync.Map Code Reading

2023-04-27

概述

sync.Map 提供了并发安全的 map 操作,数据结构语义类似 map[any]any,多个 goroutine 并发操作时无需加锁。

官方的建议是大多数情况下,应该使用普通 map 类型,并完成对应的锁和并发控制以保证安全性,这样可以使类型拥有更好的安全性和可维护性。

sync.Map 类型是针对两种特殊的场景进行优化的

  1. 当指定的 key 只被写入一次,但是被读取多次,例如不断增长的缓存类应用
  2. 当多个 goroutine 同时读和写,但是每个 goroutine 覆盖到的是不同的 key (类似分段锁的概念)

在这两种场景下,与使用 map + 锁 相比,使用 sync.Map 可以大大降低锁的竞争。

本文来探究一下 sync.Map 的内部实现,文件路径为 $GOROOT/src/sync/map.go,笔者的 Go 版本为 go1.19 linux/amd64

数据结构

Map 对象

// Map 零值是空的并且可以使用
// Map 一旦使用后,便不能再复制
// Loads, stores, and deletes 通过延迟操作来均摊成本,从而达到常数运行时间
// 在 Go 内存模型术语中,Map 确保写操作在任何观察其变动的读操作之前同步,其中读和写的定义如下
//     Load, LoadAndDelete 是读操作
//     Delete, LoadAndDelete, Store 是写操作
//     当调用 LoadOrStore 方法返回值 loaded 为 false 时,LoadOrStore 是写操作,否则是读操作
type Map struct {
	// 当写 read map 或者读写 dirty map 时,需要加互斥锁
	mu Mutex
	
	// read 字段包含了一部分 map 内容,这部分内容并发访问安全(无论是否持有互斥锁)
	// read 字段本身对 load 而言永远是安全的,但执行 store 操作时,必须持有互斥锁
	// read 是只读的数据结构,访问无需加锁,sync.Map 优先访问 read 字段
	// read 中存储的数据值可以在不持有互斥锁的情况下并发更新
	//     但更新一个之前被标记为 expunged 的元素需要持有互斥锁,将该元素拷贝到 dirty map 并标记 unexpunged (expunged 逆操作)
	read atomic.Value // readOnly
	
	// dirty map 包含了访问时需要持有互斥锁的元素
	// 为了确保 dirty map 中的元素能够快速地移动到 read map
	// 它也包含了那些 read map 中标记为 (non-expunged) 的元素
	// 新添加的元素会优先放入 dirty map
	
	// 被标记为 expunged 的元素不会存储在 dirty map 中
	// 被标记为 expunged 的元素如果要存储新的值,需要先执行 unexpunged (expunged 逆操作) 添加到 dirty map, 然后再更新值 
	
	// 如果 dirty map == nil, 下一个对 map 的写入将通过浅拷贝一个空 map 来初始化它,忽略过期的元素
	dirty map[any]*entry
	
	// 一旦发生足够多的 misses 次数,超过拷贝 dirty map 的成本,dirty map 会被合并进 read map(unamended 状态下)
	// 并且下一次的存储操作会生成一个新的 dirty map
	misses int
}

readOnly 对象

readOnly 对象表示 Map.read 字段中的只读数据 (注意: 这里的只读指的是逻辑只读,底层的数据还是可以修改的), 其中 m 字段用来表示具体的只读数据,如果某个 key 存在于 dirty 字段中,却不存在于 m 字段中,amended 字段等于 true

type readOnly struct {
	m       map[any]*entry
	amended bool
}

expunged 类型

expunged 类型表示一个任意类型数据的指针,主要作用是作为删除标志,标记从 dirty map 中删除的元素

var expunged = unsafe.Pointer(new(any))

entry 对象

entry 对象表示任意 key 对应的数据,这里同样使用指针来表示。

type entry struct {
	// p 指向 entry 对应的数据
	
	// 如果 p == nil, 说明对应的 entry 已经被删除
	//     此时,m.dirty == nil 或 m.dirty[k] 指向该 entry
	// 如果 p == expunged, 说明对应的 entry 已经被删除
	//     此时,m.dirty != nil 且 m.dirty 中不存在 entry
	// 其他情况代表 entry 是合法的值并且存在于 m.read.m[key]
	//     如果 m.dirty != nil, entry 同时也会存在于 m.dirty[key] 中
	
	// 删除 entry 时执行 CAS 操作替换为 nil (并不进行实际删除)
	// 当 m.dirty 后续创建时 (dirty 提升后第一次新建 entry)
	// 会对 entry 执行 CAS 操作,由 nil 替换为 expunged, 且不设置 m.dirty[key] 的值
	
	// 一个 entry 对应的值可以用 CAS 操作来更新,前提是 p != expunged
	// 如果 p == expunged, entry 对应的值只能在首次赋值 m.dirty[key] = e 之后更新
	// 这样查找操作可以通过 dirty map 来找到这个 entry
	p unsafe.Pointer // *interface{}
}

func newEntry(i any) *entry {
    return &entry{p: unsafe.Pointer(&i)}
}

// 从 entry 中原子性加载实际的数据值
func (e *entry) load() (value any, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == nil || p == expunged {
		return nil, false
	}
	return *(*any)(p), true
}

获取操作

Load 方法

Load 方法会优先读取 read map (无需加锁),如果没有找到对应的元素,会加锁尝试从 dirty map 中读取。

// 获取 map 中 key 对应的值,如果不存在返回 nil, 第二个返回值表示 key 值是否存在于 map 中
func (m *Map) Load(key any) (value any, ok bool) {
	// 优先从 read map 中读取 (无锁操作)
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	
	// 如果 read 没有对应的 key,并且 amended 字段标识 dirty map 存在 read map 中不存在的 key
	// 则加锁尝试从 dirty.map 中获取
	if !ok && read.amended {
		m.mu.Lock()
		// double check 是避免在加锁期间,dirty map 提升为 read map
		// 如果已经发生合并,可以避免报告一个不必要的 miss 
		// 因为足够多的 miss 数会将 dirty map 提升为 read map
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		
		if !ok && read.amended {
			e, ok = m.dirty[key]
			// 无论元素是否存在 dirty map 中,都记录 1 次 miss
			// 在 dirty map 被提升到 read map 之前,这个 key 对应的值会一直从 dirty map 中获取

			// 方法内部可能将 dirty map 提升到 read map
			m.missLocked()  
		}
		m.mu.Unlock()
	}
	if !ok {
		return nil, false
	}
	
	// 如果 key 存在,加载对应的值
	return e.load()
}

设置操作

Store 方法

Store 方法用于设置 key 和对应的值。

func (m *Map) Store(key, value any) {
	// 如果 key 对应的值存在于 read map 中,尝试直接修改
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok && e.tryStore(&value) {
		return
	}

	m.mu.Lock()
	// double check 是避免在加锁期间,dirty map 提升为 read map
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
			// 如果 key 对应的值存在于 read map 中,但 p == expunged, 说明 dirty != nil 并且 key 对应的值不存在于 dirty map 中
			//    先将 p 的状态由 expunged 改为 nil 
			//    dirty map 新建 key
			//    更新 entry.p = value (read map 和 dirty map 指向同一个entry)
			m.dirty[key] = e
		}
        // 如果 key 对应的值存在于 read map 中,但 p != expunged, 直接更新 entry
		// 此时 m.dirty == nil 或 m.dirty[key] == e
		e.storeLocked(&value)
	} else if e, ok := m.dirty[key]; ok {
		// 如果 key 对应的值不存在于 read map 中,但存在于 dirty map 中,直接写入更新 entry (read map 中依然不存在)
		e.storeLocked(&value)
	} else {
		if !read.amended {
			// 如果 read map 和 dirty map 都不存在该值
			//     如果 dirty map == nil, 创建 dirty map, 并从 read map 中拷贝未删除的元素
			//     更新 amended 字段,标识 dirty map 中存在 read map 中不存在的 key
			//     将 K => V 写入 dirty map, read map 不变
			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
	}
    // Tips: hot path 不使用 defer 释放锁
	m.mu.Unlock()   
}

tryStore 方法

tryStore 方法尝试直接更新 entry, 如果 entry == expunged, 返回 false。

func (e *entry) tryStore(i *any) bool {
	for {
		p := atomic.LoadPointer(&e.p)
		if p == expunged {
			return false
		}
		if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
			return true
		}
	}
}

unexpungeLocked 方法

unexpungeLocked 方法用于取消 entry 的 expunged 标记。

func (e *entry) unexpungeLocked() (wasExpunged bool) {
	return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
}

storeLocked 方法

storeLocked 方法用于直接将值存入 entry。

func (e *entry) storeLocked(i *any) {
	atomic.StorePointer(&e.p, unsafe.Pointer(i))
}

LoadOrStore 方法

LoadOrStore 方法用于获取 key 对应的值,如果 key 对应的值存在,直接返回 (此时第二个返回值为 true),否则就将 key 设置为参数值然后返回 (此时第二个返回值为 false)。

func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool) {
	// 如果命中就直接返回,避免锁操作
	read, _ := m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		actual, loaded, ok := e.tryLoadOrStore(value)
		if ok {
			return actual, loaded
		}
	}

	m.mu.Lock()
	// double check 是避免在加锁期间,dirty map 提升为 read map
	read, _ = m.read.Load().(readOnly)
	if e, ok := read.m[key]; ok {
		if e.unexpungeLocked() {
            // 如果 key 对应的值存在于 read map 中,但 p == expunged,  说明 dirty != nil 并且 key 对应的值不存在于 dirty map 中
            //    先将 p 的状态由 expunged 改为 nil 
            //    dirty map 新建 key
            //    更新 entry.p = value (read map 和 dirty map 指向同一个entry)
			m.dirty[key] = e
		}
		actual, loaded, _ = e.tryLoadOrStore(value)
	} else if e, ok := m.dirty[key]; ok {
        // 如果 key 对应的值不存在于 read map 中,但存在于 dirty map 中,直接写入更新 entry (read map 中依然不存在)
		actual, loaded, _ = e.tryLoadOrStore(value)
		// 记录 1 次 miss
        // 方法内部可能将 dirty map 提升到 read map
		m.missLocked()
	} else {
		if !read.amended {
			// 如果 read map 和 dirty map 都不存在该值
			//     如果 dirty map == nil, 创建 dirty map, 并从 read map 中拷贝未删除的元素
			//     更新 amended 字段,标识 dirty map 中存在 read map 中不存在的 key
			//     将 K => V 写入 dirty map, read map 不变
			m.dirtyLocked()
			m.read.Store(readOnly{m: read.m, amended: true})
		}
		m.dirty[key] = newEntry(value)
		actual, loaded = value, false
	}
	m.mu.Unlock()

	return actual, loaded
}

tryLoadOrStore 方法

tryLoadOrStore 方法用于原子性地加载或存储一个 entry 对应的值,前提是 entry 未被标记为 expunged, 如果 entry 被标记为 expunged, 函数不做任何修改,第二个返回值为 false。

func (e *entry) tryLoadOrStore(i any) (actual any, loaded, ok bool) {
	p := atomic.LoadPointer(&e.p)
	if p == expunged {
		return nil, false, false
	}
	if p != nil {
		return *(*any)(p), true, true
	}
	
	ic := i
	for {
		if atomic.CompareAndSwapPointer(&e.p, nil, unsafe.Pointer(&ic)) {
			return i, false, true
		}
		p = atomic.LoadPointer(&e.p)
		if p == expunged {
			return nil, false, false
		}
		if p != nil {
			return *(*any)(p), true, true
		}
	}
}

删除操作

Delete 方法

// 删除 key 对应的值
func (m *Map) Delete(key any) {
	m.LoadAndDelete(key)
}

LoadAndDelete 方法

LoadAndDelete 方法是 Delete 方法的具体实现,和 Load 方法的机制一样,优先从 read map 中删除 (无需加锁),如果没有找到对应的元素, 会加锁尝试从 dirty map 中删除,第二个返回值表示 key 对应的值是否存在。

func (m *Map) LoadAndDelete(key any) (value any, loaded bool) {
	read, _ := m.read.Load().(readOnly)
	e, ok := read.m[key]
	
	if !ok && read.amended {
		m.mu.Lock()
		// double check 是避免在加锁期间,dirty map 提升为 read map
		read, _ = m.read.Load().(readOnly)
		e, ok = read.m[key]
		if !ok && read.amended {
            // 如果 key 对应的值不存在于 read map 中,但存在于 dirty map 中,直接删除
			e, ok = m.dirty[key]
			delete(m.dirty, key)
			
			// 记录 1 次 miss
			// 在 dirty map 被提升到 read map 之前,这个 key 对应的值会一直从 dirty map 中获取
			m.missLocked()
		}
		m.mu.Unlock()
	}
	if ok {
		// key 对应的值存在于 read map 中,直接删除
		return e.delete()
	}
	return nil, false
}

delete 方法

delete 方法负责 entry 的具体删除。

func (e *entry) delete() (value any, ok bool) {
	for {
		p := atomic.LoadPointer(&e.p)
		// 如果 p == nil, 说明对应的 entry 已经被删除
		// 如果 p == expunged, 说明对应的 entry 已经被删除
		if p == nil || p == expunged {
			return nil, false
		}
		// 这里是数据真正被删除的代码
		if atomic.CompareAndSwapPointer(&e.p, p, nil) {
			return *(*any)(p), true
		}
	}
}

遍历操作

Range 方法

如果全部的数据都在 read map 中,无需加锁,直接读取即可,否则会先加锁,将 dirty map 中的数据全部加载到 read map, 然后重置 dirty mapmisses 计数器,这样可以避免遍历过程中多次访问 dirty map 时导致的加锁和性能影响。

// Range 方法按照每个 key 和 value 在 map 中出现的顺序依次调用参数函数 f
// 如果函数 f 返回 false, range 停止遍历

// Range 方法是无序遍历,但是保证每个 key 只会被访问一次
// 如果某个 key 对应的 value 被并发地更新或者删除了,Range 可能返回修改前或修改后的值
// Range 方法不阻塞接收者的其他方法,甚至回调函数 f 本身也可以调用 Map 的任何方法

// 不论 Map 中有多少元素,Range 时间复杂度都是 O(N)
// 即使函数 f 在一定的调用次数之后返回 false 也一样
func (m *Map) Range(f func(key, value any) bool) {
	// 在调用 Range 时,能够遍历 map 里面所有的 key
	// 如果 read.amended == false
	//  说明 dirty map 中没有元素,直接遍历 read map 就可以
	read, _ := m.read.Load().(readOnly)
	
	if read.amended {
		// m.dirty 包含 read.m 中不存在的 key, 不过影响不大,毕竟 Range 时间复杂度为 O(N)
		// 假设调用方不会中断遍历过程,对于 Range 的调用必然会分阶段完整地拷贝整个 map
		// 这时候可以将 dirty map 提升到 read map 就可以提升性能
		m.mu.Lock()
		// double check 是避免在加锁期间,dirty map 提升为 read map
		read, _ = m.read.Load().(readOnly)  
		if read.amended {
			read = readOnly{m: m.dirty}
			m.read.Store(read)
			m.dirty = nil
			m.misses = 0
		}
        // Tips: hot path 不使用 defer 释放锁
		m.mu.Unlock()   
	}
	
	// 此时 dirty map 和 read map 已经合并
	// 普通 map 遍历
	for k, e := range read.m {
		v, ok := e.load()
		if !ok {
			continue
		}
		if !f(k, v) {
			break
		}
	}
}

miss 计数

missLocked 方法

missLocked 方法用于增加 Map 操作过程中的 misses 计数,一旦发生足够多的 misses 次数,超过拷贝 dirty map 的成本,直接将 dirty map 赋值给 read map。

func (m *Map) missLocked() {
	m.misses++
	// 成本计算: miss 次数小于 dirty map 长度
	if m.misses < len(m.dirty) {
		return
	}
	// 提升过程很简单,直接将 dirty map 赋值给 read map
	// 提升完成之后,amended == false, m.dirty == nil
	m.read.Store(readOnly{m: m.dirty})
	m.dirty = nil
	m.misses = 0
}

数据拷贝

dirtyLocked 方法

dirtyLocked 方法用于当 dirty map 等于 nil 时,将 read map 元素拷贝到 dirty map。

func (m *Map) dirtyLocked() {
	if m.dirty != nil {
		return
	}

	read, _ := m.read.Load().(readOnly)
	m.dirty = make(map[any]*entry, len(read.m))
	for k, e := range read.m {
		// 1. 将所有为 nil 的元素标记为 expunged
		// 2. 只拷贝没有标记为 expunged 的元素
		if !e.tryExpungeLocked() {
			m.dirty[k] = e
		}
	}
}

tryExpungeLocked 方法

tryExpungeLocked 方法用于将等于 nil 的 entry 标记为 expunged。

func (e *entry) tryExpungeLocked() (isExpunged bool) {
	p := atomic.LoadPointer(&e.p)
	for p == nil {
		if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
			return true
		}
		p = atomic.LoadPointer(&e.p)
	}
	return p == expunged
}

相关问题

expunged 的使用场景

调用 Store, LoadOrStore 方法存入新的元素时,如果 read mapdirty map 中都不存在该元素,会调用 dirtyLocked 方法将 read map 中所有未标记为 expunged 的元素复制到 dirty map 中。

expunged 标记可以去除吗?

不行,expunged 作为一个 过渡状态标识 是必要的。

  1. 如果没有 expunged, 删除元素时就没有办法实现 软删除,这就导致必须对 read map 加锁,性能直接退化到了 普通 map + 互斥锁 的方案
  2. 实现不了 软删除, 只能将元素设置为 nil, 这就导致即使 dirty map 提升为 read map, 但是里面被删除的垃圾数据依然会保留 (这里的保留指的是 key 相关数据,元素删除时就被置为 nil 了),最终导致内存占用过多
  3. 调用 Store 方法存入新的元素时,没有办法区分元素是不存在还是已经被删除 (因为两者都是 nil),造成 read mapdirty map 数据不同步 (read map 里面有 dirty map 不存在的数据), 那么在 dirty map 提升为 read map 时,就会导致数据丢失

为什么需要双重锁检测?

Map 操作相关方法中,可以看到下面这行代码往往会出现两次:

read, _ := m.read.Load().(readOnly)

这是为了避免在加锁期间,dirty map 提升为 read map,所以获取锁操作后需要再次检查 key 是否存在于 read map 中。

为什么没有实现 Len 方法?

  • 实现 Len 方法需要统计 readOnlydirty 两个字段的数据,这样就需要引入锁机制,导致性能下降,还会额外增加代码实现复杂度
  • 针对 sync.Map 的并发操作会导致其数据变化很快,Len 方法难以保证时效性

为什么适用的场景是读多写少?

参考 Load 方法和 Store 方法的注释。

小结

从应用层面来说,针对读多写少的场景,sync.Map 提供了并发安全的 map 操作,包对外提供的方法非常简洁,同时可以避免 普通 map + 互斥锁 实现方案中, 获取锁和释放锁等繁琐和容易出错的地方,有两个小的细节需要注意:

  1. 包对外提供的方法参数类型都是 any, 在使用具体的数据类型时,需要转换
  2. 包没有提供 Len 方法,如果需要获取元素个数,需要调用 Range 方法计算

从内部底层实现来说,sync.Map 通过 entry 类型将 map 的变化和元素的变化进行隔离的同时, 还保证了底层数据的复用 (read mapdirty map 指向同一个 entry), read, dirty 2 个字段加原子操作的紧密配合,非常简洁优雅地实现了 sync.Map 根据读写比例动态变化和转换内部字段数据结构, 尤其是 readdirty 两个字段保持部分内存结构布局一致并且通过指针直接转换,这些都是值得我们学习的编码技巧。

图片来源: http://russellluo.com/2017/06/go-sync-map-diagram.html

图片来源: http://russellluo.com/2017/06/go-sync-map-diagram.html

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。