蛮荆

Go 线程安全 map 方案选型

2023-06-25

概述

Go 语言标准库中的 map 数据类型并不是线程安全的,多个 goroutine 可以并发读取同一个 map, 但是不能并发写入同一个 map, 否则会引发 panic。

为了解决这个问题,实际开发中通常会使用下面的三种方案中的一个或多个:

  1. 通过 map 数据类型 + 锁 (互斥锁, 读写锁)
  2. 标准库内置的 sync.Map 对象 (支持并发读写)
  3. 分段锁

作为补充,本文会顺带对比一下自旋锁和标准库中的互斥锁的性能差异,对于 map 数据类型及其操作原语来说,两者实现的功能保证是一致的, 而且自旋锁更多的应用场景在无锁编程,所以文章末尾的基准测试不包含自旋锁 (当然,感兴趣的读者可以在本文基础上进行修改,自行对比测试结果)。

💡 本文代码较多,对测试过程不感兴趣的读者可以直接跳转到文章末尾看结论。

读写锁和互斥锁

标准库中的读写锁和互斥锁的性能差异及使用场景,在之前的 这篇文章中 已经有基础的说明,本文不再赘述。

自旋锁

基础概念在 死锁、活锁、饥饿、自旋锁 一文中 已经介绍过,这里不再赘述,其中自旋锁操作获取锁的核心代码如下:

// 获取自旋锁
func (sl *spinLock) lock() {
	for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
		// 获取到自旋锁
	}
}

上面的代码直观上很符合自旋锁的语义,只要没有获取到锁,就一直空转 CPU 尝试获取锁,但是这会带来一个问题: CPU 空转带来了很大的资源浪费, 是否可以降低甚至避免这种资源浪费吗?

一个显而易见的方法是在每两次获取锁的操作之间休眠一下,但是这样做会带来两个新的问题:

  1. 延迟增加,本来也许下次获取锁就可以成功,但是现在必须等休眠结束才能继续获取锁
  2. 引起上下文切换,因为当前 goroutine 休眠,根据 GMP 调取器的管理规则,处理器 P 会切换到其他可以运行的 goroutine, 如果当前 P 的 goroutine 队列已经是空的, 那么会给当前 M 关联一个新的处理器,不管是哪种情况发生,都会引起上下文切换

优化版

看起来似乎没有完美的解决方案,笔者前两两天在阅读 ants 的源代码时,看到作者是这样处理自旋锁的:

const maxBackoff = 16

func (sl *spinLock) Lock() {
	backoff := 1
	for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
		for i := 0; i < backoff; i++ {
			runtime.Gosched()
		}
		
		if backoff < maxBackoff {
			backoff <<= 1
		}
	}
}

作者借鉴了 TCP 流量控制中的指数退避理念,每两次获取锁的间隔时间呈指数级别增长,并且在间隔时间内执行 N 次 GMP 调取,当然这是根据该组件的场景特性决定的 (goroutine pool), 在实际项目中实现和使用自旋锁时,也可以根据具体的业务场景来自定义间隔时间内的操作,比如可以执行一个 CPU 密集型的任务,最终的目的只有一个: 尽可能榨干 CPU 资源

基准测试

首先来对标准库中的互斥锁、普通自旋锁、优化版自旋锁做一个简单的基准测试。

// 普通自旋锁实现 --------------------------------------------
type originSpinLock uint32

func (sl *originSpinLock) Lock() {
	for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
		runtime.Gosched()
	}
}

func (sl *originSpinLock) Unlock() {
	atomic.StoreUint32((*uint32)(sl), 0)
}

func NewOriginSpinLock() sync.Locker {
	return new(originSpinLock)
}

// 优化自旋锁实现 --------------------------------------------
type spinLock uint32

const maxBackoff = 16

func (sl *spinLock) Lock() {
	backoff := 1
	for !atomic.CompareAndSwapUint32((*uint32)(sl), 0, 1) {
		for i := 0; i < backoff; i++ {
			runtime.Gosched()
		}
		if backoff < maxBackoff {
			backoff <<= 1
		}
	}
}

func (sl *spinLock) Unlock() {
	atomic.StoreUint32((*uint32)(sl), 0)
}

func NewSpinLock() sync.Locker {
	return new(spinLock)
}

// 标准库的互斥锁
func BenchmarkMutex(b *testing.B) {
	m := sync.Mutex{}
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			m.Lock()
			m.Unlock()
		}
	})
}

// 普通自旋锁
func BenchmarkSpinLock(b *testing.B) {
	spin := NewOriginSpinLock()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			spin.Lock()
			spin.Unlock()
		}
	})
}

// 优化版自旋锁
func BenchmarkBackOffSpinLock(b *testing.B) {
	spin := NewSpinLock()
	b.RunParallel(func(pb *testing.PB) {
		for pb.Next() {
			spin.Lock()
			spin.Unlock()
		}
	})
}

从测试结果可以看到,优化后的自旋锁相比普通自旋锁和互斥锁,性能有了很大的提高。

// goos: linux
// goarch: amd64
// cpu: Intel(R) Core(TM) i5-8300H CPU @ 2.30GHz
// BenchmarkMutex
// BenchmarkMutex-8                21886387                55.83 ns/op
// BenchmarkSpinLock
// BenchmarkSpinLock-8             46848830                25.81 ns/op
// BenchmarkBackOffSpinLock
// BenchmarkBackOffSpinLock-8      55894545                21.16 ns/op

分段锁

分段锁 (Segmented Locking) 是一种并发控制的技术,它将共享资源划分成多个不重叠的片段,并对每个片段进行独立的加锁,通过这种方法可以减小锁的粒度,提高系统的并发性能。

使用分段锁时,每个线程只需要获取 key 所在的范围片段的锁,而不必像互斥锁那样锁住并独占整个共享资源,有效避免了多个线程因为竞争全局锁而导致的等待和延迟,提高系统的并发性能。

虽然分段锁可以提高系统的并发性能,但同时也会增加锁冲突的概率,并且需要付出额外的开销来维护锁的状态 (互斥锁只需要一把全局锁即可,分段锁每个区间范围都需要一把锁)。

读写锁和分段锁差异

代码实现

笔者在项目中用到的分段锁组件是开源的 concurrent-map, 下面就以该组件的源代码为基础,来分析如何实现一个分段锁,本文选用实现了泛型的 v2 版本。

Map 对象

ConcurrentMap 对象表示实现了分段锁的 Map 对象,内部有两个字段:

  1. 表示区域元素对象的 shares 字段
  2. 表示哈希函数的 sharding 字段
type ConcurrentMap[K comparable, V any] struct {
	shards   []*ConcurrentMapShared[K, V]
	sharding func(key K) uint32
}

GetShard 方法用于计算给定的参数 key 对应的区间元素集合对象并返回。

func (m ConcurrentMap[K, V]) GetShard(key K) *ConcurrentMapShared[K, V] {
	// 优化版: m % n = m & ( n - 1 )
    return m.shards[uint(m.sharding(key))%uint(SHARD_COUNT)]
}

区间元素集合对象

ConcurrentMapShared 对象表示 Map 中某个区间元素集合对象,内部有两个字段:

  1. 用于存储具体元素的 map, 数据结构就是标准库中的 map 类型
  2. 内嵌一个读写锁,用于管理对 map 结构的读写并发控制
type ConcurrentMapShared[K comparable, V any] struct {
	items        map[K]V
	sync.RWMutex
}

操作原语

下面来看一下常用的几个操作原语的代码实现。

1. SET

方法的内部执行分为 4 步:

  1. 通过 key 获取区间元素集合对象
  2. 获取写锁
  3. 写入 key 对应的数据
  4. 释放写锁
func (m ConcurrentMap[K, V]) Set(key K, value V) {
	shard := m.GetShard(key)
	shard.Lock()
	shard.items[key] = value
	shard.Unlock()
}

2. GET

方法的内部执行分为 4 步:

  1. 通过 key 获取区间元素集合对象
  2. 获取读锁
  3. 写入 key 对应的数据
  4. 释放读锁
func (m ConcurrentMap[K, V]) Get(key K) (V, bool) {
	shard := m.GetShard(key)
	shard.RLock()
	val, ok := shard.items[key]
	shard.RUnlock()
	return val, ok
}

3. Has

出了返回值之外,内部实现和 GET 方法实现一致,这里不在赘述。

func (m ConcurrentMap[K, V]) Has(key K) bool {
	shard := m.GetShard(key)
	shard.RLock()
	_, ok := shard.items[key]
	shard.RUnlock()
	return ok
}

4. Remove

方法的内部执行分为 4 步:

  1. 通过 key 获取区间元素集合对象
  2. 获取写锁
  3. 写入 key 对应的数据
  4. 释放写锁
func (m ConcurrentMap[K, V]) Remove(key K) {
	shard := m.GetShard(key)
	shard.Lock()
	delete(shard.items, key)
	shard.Unlock()
}

哈希算法

FNV32 是一种快速哈希函数,采用 32 位哈希值,算法实现非常简单并且具有很高的性能和较低的哈希碰撞率。

值得注意的是,concurrent-map 并没有使用标准库的 "hash/fnv" 方法作为求内部哈希函数实现,而是在组件内部重新实现了一个 fnv32 函数, 但是算法用到的算子常数 FNV_PRIMEFNV_OFFSET_BASIS, concurrent-map 和标准库是保持一致的,感兴趣的读者可以对比一下实现差异。

func strfnv32[K fmt.Stringer](key K) uint32 {
    return fnv32(key.String())
}

func fnv32(key string) uint32 {
    ...
}

除此之外,也可以通过 NewWithCustomShardingFunction 函数在创建 Map 时来指定哈希函数:

func NewWithCustomShardingFunction[K comparable, V any](sharding func(key K) uint32) ConcurrentMap[K, V] {
	return create[K, V](sharding)
}

sync.Map

标准库中的 sync.Map 的底层实现和应用场景在之前的 这篇文章中 已经有基础的说明,本文不再赘述。

基准测试

最后,我们对文章开头提到的三种方案进行基准测试,根据测试结果来总结不同方案的各自适用场景。

下面的测试代码分别对 读多写少、读少写多、读写均等这三种常见的负载场景,进行了性能基准测试:

package maps

import (
	"strconv"
	"sync"
	"testing"

	cmap "github.com/orcaman/concurrent-map/v2"
)

// 线程安全 Map 接口
type ThreadSafeMap interface {
	Get(key string) any
	Set(key string, val any)
}

// -------------------------------------------------------------------
// map 数据类型 + 读写锁实现线程安全的 map
type MutexMap struct {
	sync.RWMutex
	m map[string]any
}

func (m *MutexMap) Get(key string) any {
	m.RLock()
	v, ok := m.m[key]
	m.RUnlock()
	if ok {
		return v
	}
	return nil
}

func (m *MutexMap) Set(key string, val any) {
	m.Lock()
	m.m[key] = val
	m.Unlock()
}

// -------------------------------------------------------------------
// sync.Map 实现线程安全的 map
type SyncMap struct {
	m sync.Map
}

func (s *SyncMap) Get(key string) any {
	v, _ := s.m.Load(key)
	return v
}

func (s *SyncMap) Set(key string, val any) {
	s.m.Store(key, val)
}

// -------------------------------------------------------------------
// 分段锁实现线程安全的 map
type ConcurMap struct {
	m cmap.ConcurrentMap[string, any]
}

func (c *ConcurMap) Get(key string) any {
	v, _ := c.m.Get(key)
	return v
}

func (c *ConcurMap) Set(key string, val any) {
	c.m.Set(key, val)
}

// 基准测试
func benchmark(b *testing.B, m ThreadSafeMap, read, write int) {
	for i := 0; i < b.N; i++ {
		var wg sync.WaitGroup

		// 注意: 这里的读写操作有一部分 key 是重合的

		// 读操作
		for k := 0; k < read*100; k++ {
			wg.Add(1)
			go func(key int) {
				m.Get(strconv.Itoa(i * key))
				wg.Done()
			}(k)
		}

		// 写操作
		for k := 0; k < write*100; k++ {
			wg.Add(1)
			go func(key int) {
				m.Set(strconv.Itoa(i*key), key)
				wg.Done()
			}(k)
		}

		wg.Wait()
	}
}

// 读写比例 9:1
func BenchmarkReadMoreRWMutex(b *testing.B)   { benchmark(b, &MutexMap{m: make(map[string]any)}, 9, 1) }
func BenchmarkReadMoreSyncMap(b *testing.B)   { benchmark(b, &SyncMap{m: sync.Map{}}, 9, 1) }
func BenchmarkReadMoreConcurMap(b *testing.B) { benchmark(b, &ConcurMap{m: cmap.New[any]()}, 9, 1) }

// 读写比例 1:9
func BenchmarkWriteMoreRWMutex(b *testing.B)   { benchmark(b, &MutexMap{m: make(map[string]any)}, 1, 9) }
func BenchmarkWriteMoreSyncMap(b *testing.B)   { benchmark(b, &SyncMap{m: sync.Map{}}, 1, 9) }
func BenchmarkWriteMoreConcurMap(b *testing.B) { benchmark(b, &ConcurMap{m: cmap.New[any]()}, 1, 9) }

// 读写比例 5:5
func BenchmarkEqualRWMutex(b *testing.B)   { benchmark(b, &MutexMap{m: make(map[string]any)}, 5, 5) }
func BenchmarkEqualSyncMap(b *testing.B)   { benchmark(b, &SyncMap{m: sync.Map{}}, 5, 5) }
func BenchmarkEqualConcurMap(b *testing.B) { benchmark(b, &ConcurMap{m: cmap.New[any]()}, 5, 5) }

运行基准测试:

$ go test -count=1 -run='^$' -bench=. -benchtime=3s  -benchmem

输出结果如下:

goos: linux
goarch: amd64
cpu: Intel(R) Core(TM) i5-8300H CPU @ 2.30GHz
BenchmarkReadMoreRWMutex-8                  9107            362827 ns/op           84094 B/op       3001 allocs/op
BenchmarkReadMoreSyncMap-8                  7740            765258 ns/op          128974 B/op       3284 allocs/op
BenchmarkReadMoreConcurMap-8               10000            345271 ns/op           83985 B/op       3000 allocs/op
BenchmarkWriteMoreRWMutex-8                 6212            825778 ns/op          137330 B/op       3656 allocs/op
BenchmarkWriteMoreSyncMap-8                 3352           1155236 ns/op          157766 B/op       6041 allocs/op
BenchmarkWriteMoreConcurMap-8               9480            370214 ns/op          119970 B/op       3653 allocs/op
BenchmarkEqualRWMutex-8                     7108            529450 ns/op          104626 B/op       3249 allocs/op
BenchmarkEqualSyncMap-8                     5360            735393 ns/op          133008 B/op       4601 allocs/op
BenchmarkEqualConcurMap-8                   9548            347809 ns/op          104303 B/op       3250 allocs/op
PASS

从基准测试的输出结果来看,不论是哪种应用场景,结合运行速度还是内存分配,三者的排序都是一致的: 分段锁优于读写锁 + map, 后者优于 sync.Map 。 笔者没有遇到过 100% 的只读或只写操作的应用场景,所以没有做对应的基准测试,不过这里可以猜测一下:

  1. 100% 只读场景下 sync.Map 的性能最好 (不过既然都 100% 只读了, 直接使用 map 类型即可,因为无需加锁,所以性能肯定最高)
  2. 100% 只写场景下 分段锁 的性能最好

感兴趣的读者可以通过调整基准测试代码的百分比参数来验证一下,这里是 concurrent-map 官方的基准测试代码

小结

本文主要介绍了在 Go 语言中如何实现线程安全的 map 的三种方法,并通过三种常见的业务场景对方法进行了性能基准测试,最后,我们来简单总结下三种方法的特点。

方案 优点 不足
读写锁 + map 自定义性最强,性能适中 需要开发实现
sync.Map 标准库自带,开箱即用,方便省心 性能较低, API 参数类型是 any,需要数据类型转换
分段锁 性能最好 标准库未提供,需要引入第三方组件

Reference

扩展阅读

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。