蛮荆

ants Code Reading

2023-06-30

概述

goroutine 使用简单,但并不意味着没有成本。

ants 是一个高性能且低损耗的 goroutine 池,组件内部会创建一个固定容量的 goroutine 池,并且管理和回收池中的 goroutine, 主要应用场景是允许开发者限制并发程序中的 goroutine 数量,更多的特性介绍请参考 Github 主页

示例代码

我们首先通过两个示例程序来直观地感受一下使用 goroutine 池前后的差异。

未使用 goroutine 池

package main

import (
	"log"
	"runtime"
	"sync"
	"sync/atomic"
)

var (
	sum int32
)

func main() {
	var wg sync.WaitGroup

	var i int32
	// 启动 100 个 goroutine 
	for i = 1; i <= 100; i++ {
		wg.Add(1)
		go func(n int32) {
			defer wg.Done()
			atomic.AddInt32(&sum, n)
		}(i)
	}

	log.Printf("goroutines numbers = %d\n", runtime.NumGoroutine())

	wg.Wait()

	log.Printf("sum = %d, want = %d", sum, 5050)
}

运行代码输出结果如下:

goroutines numbers = 101
sum = 5050, want = 5050

程序运行过程中一共启动了 101 个 goroutine, 其中 1 个是 main goroutine, 其他的 100 个是通过 go 关键字启动的 goroutine

使用 goroutine 池

package main

import (
	"log"
	"runtime"
	"sync"
	"sync/atomic"

	"github.com/panjf2000/ants/v2"
)

var (
	sum int32
)

func main() {
	defer ants.Release()

	var wg sync.WaitGroup

	p, err := ants.NewPoolWithFunc(10, func(i interface{}) {
		defer wg.Done()
		atomic.AddInt32(&sum, i.(int32))
	})
	if err != nil {
		log.Fatal(err)
	}
	defer p.Release()

	for i := 1; i <= 100; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}

	log.Printf("goroutines numbers = %d\n", runtime.NumGoroutine())

	wg.Wait()

	log.Printf("sum = %d, want = %d", sum, 5050)
}

运行代码输出结果如下:

goroutines numbers = 15
sum = 5050, want = 5050

程序运行过程中一共启动了 15 个 goroutine, 其中 1 个是 main goroutine, 还有 10 个是通过 ants 组件限制数量的 goroutine, 剩余的 4 个是 ants 组件用于管理调度的内部 goroutine

源码解析

通过上面两个示例程序的结果比较,ants 确实能够有效降低和控制 goroutine 的数量,这个功能可以有效解决海量 goroutine 场景下的内存暴涨问题。 性能方面,官方给出的基准测试结果是: ants 的吞吐性能相较于原生 goroutine 可以保持在 2~6 倍的性能压制,而内存消耗则可以达到 10~20 倍的节省优势, 感兴趣的读者可以查看 这里的基准测试

数据结构

UML

ants UML

读者在阅读下面的源代码分析时,可以对照着结构图进行校验。

通用池和专用池

ants 中内置了两种不同模式的 goroutine pool:

1. Pool

ants.Pool 表示通用的 goroutine pool, 也就是每个 goroutine 中可以是不同的任务执行函数。

2. PoolWithFunc

ants.PoolWithFunc 表示专用的 goroutine pool, 也就是每个 goroutine 中是相同的任务执行函数,但是参数可以不同 (上面的示例代码中使用就是这种模式)。

本文以 ants.Pool 模式下的源代码来进行分析。

池对象

Pool 表示组件的核心对象,我们挑选几个关键属性字段来进行说明。

type Pool struct {
	// 池的容量
	// 也就是负责执行 worker 的数量上限
	capacity int32

	// 当前正在执行的 worker 数量
	// 也就是 goroutine 的数量 (每个 worker 启动一个 goroutine)
	running int32

	// 标准库的锁接口
	// 组件内部使用了自旋锁作为具体的实现
	lock sync.Locker

	// 存储闲置的 worker
	workers workerQueue
	
	// 池的状态标识 (标识池是否已关闭)
	state int32

	// 获取闲置 worker 时使用的信号量
	cond *sync.Cond
	
	// 池的配置选项对象
	options *Options
}

配置项

Options 对象表示池对象的属性配置,可以在创建池对象时通过 FUNCTIONAL OPTIONS 模式注入自定义的属性值。

type Options struct {
	// 任务的过期时间
	// 空闲 worker 的最后运行时间与当前时间之差
	// 如果大于这个值,表示该 worker 已过期
	// 定时清理任务会清理过期的 worker 
	ExpiryDuration time.Duration

	// 初始化池的时候是否预分配 worker 队列
	PreAlloc bool
	
	// 调用 pool.Submit 方法阻塞时,最多可以阻塞的 worker 数量
	// 默认值为 0,表示不限制
	MaxBlockingTasks int

	// 为 true 时,pool.Submit 方法永远不会阻塞,也就是 MaxBlockingTasks 属性不起作用
	// 如果 pool.Submit 方法因阻塞而无法完成时,返回 ErrPoolOverload 错误
	Nonblocking bool

	// 任务 goroutine 触发 panic 时的错误捕获及处理函数
	PanicHandler func(interface{})

	// 自定义日志对象
	// 默认使用标准库中的 log 对象
	Logger Logger

	// 任务 goroutine 是否常驻内存
	DisablePurge bool
}

func WithExpiryDuration(options Options) Option {}
func WithPreAlloc(options Options) Option {}
...

worker 接口

worker 接口是任务执行者的抽象表示,简单地理解就是一个专门干活的 goroutine, 该接口提供了 5 个操作语义:

方法名称 语义
run 启动一个 goroutine 执行任务
finish 任务已经完成,退出 goroutine
lastUsedTime 最后的任务执行时间
inputFunc 提交一个任务到 goroutine
inputParam 任务参数

ants.Pool 池对应的 worker 接口实现对象是 goWorker

type goWorker struct {
	// 对应的池对象
	pool *Pool

	// 需要执行的任务函数
	task chan func()

	// 最后的任务执行时间
	lastUsed time.Time
}

下面是 goWorker 对象对于接口的具体实现:

func (w *goWorker) run() {
	// 通知 goroutine 池: 运行 goroutine 数量 + 1
	w.pool.addRunning(1)
	
	// 启动一个 goroutine 
	go func() {
		defer func() {
			// 通知 goroutine 池: 运行 goroutine 数量 - 1
			w.pool.addRunning(-1)
			// 将当前 goWorker 对象放入对象池
			// 可以提升后续获取 goWorker 对象操作的性能
			w.pool.workerCache.Put(w)
			if p := recover(); p != nil {
				// panic 处理
				if ph := w.pool.options.PanicHandler; ph != nil {
					ph(p)
				} else {
					w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())
				}
			}
			// 发送闲置 goWorker 对象信号
			w.pool.cond.Signal()
		}()

		for f := range w.task {
			if f == nil {
				// 如果执行函数为 nil (这里的 nil 充当了 goroutine 的退出信号)
				// 直接退出即可
				return
			}
			// 执行具体的函数
			f()
			// 将当前 goWorker 对象放入 worker 队列
			// 最大程度地复用当前 goWorker 对象,同时避免 goroutine 的上下文切换带来的开销
			if ok := w.pool.revertWorker(w); !ok {
				// 如果放入队列失败,可能是由于:
				// 1. Pool 池对象已关闭
				// 2. Pool 池对象当前运行的 goroutine 数量已到达限制
				// 此时直接退出当前 goroutine 即可
				return
			}
		}
	}()
}

// 向任务 channel 发送一个 nil, 充当了 goroutine 的退出信号
func (w *goWorker) finish() {
	w.task <- nil
}

// 返回 worker 最后的任务执行时间
func (w *goWorker) lastUsedTime() time.Time {
	return w.lastUsed
}

// 向 worker 提交一个新的任务
func (w *goWorker) inputFunc(fn func()) {
	w.task <- fn
}

// 因为 Pool.Submit 提交任务的原型为 func (p *Pool) Submit(task func()) error {}
// 所以 goWorker 对象中的任务函数没有参数值
func (w *goWorker) inputParam(interface{}) {
	panic("unreachable")
}

worker 队列接口

workerQueue 接口表示存放 worker 的队列容器接口,和一般的容器类接口相似, 该接口提供了 6 个操作语义:

方法名称 语义
len 队列中 worker 数量
isEmpty 队列是否为空
insert 将一个 worker 放入队列
detach 从队列中取出一个 worker
refresh 检测过期的 worker 并返回
reset 重置队列为空

组件内部给出了两种队列实现方案,分别是基于 Stack (栈)Ring Queue (环形队列),两种方案的内部实现和常规数据结构算法实现基本一致,这里不再赘述。

方法

池的初始化

func NewPool(size int, options ...Option) (*Pool, error) {
	// Pool 的配置项加载和默认值处理
	opts := loadOptions(options...)
	
	...

	// 初始化 Pool 对象
	p := &Pool{
		capacity: int32(size),
		lock:     syncx.NewSpinLock(),
		options:  opts,
	}
	
	// 注册 worker 对象池
	p.workerCache.New = func() interface{} {
		return &goWorker{
			pool: p,
			task: make(chan func(), workerChanCap),
		}
	}
	
	// 根据配置决定是否预分配 worker 队列
	if p.options.PreAlloc {
        // 如果使用预分配
        // 使用 Ring Queue 作为队列的数据结构
		p.workers = newWorkerArray(queueTypeLoopQueue, size)
	} else {
		// 如果不使用预分配
		// 使用 Stack 作为队列的数据结构
		p.workers = newWorkerArray(queueTypeStack, 0)
	}
	
	... 

	// 启动一个 goroutine, 清除过期 worker
	p.goPurge()

	return p, nil
}

池的初始化操作完成后,接下来就是等待调用方提交具体的任务函数。

提交任务到 Pool

调用方提交任务函数到池中的时候,会尝试获取一个可用的 worker 来执行任务函数。

func (p *Pool) Submit(task func()) error {
	if p.IsClosed() {
		// 如果 Pool 已关闭, 直接返回
		return ErrPoolClosed
	}
	if w := p.retrieveWorker(); w != nil {
		// 获取到了可用的 worker
		// 直接将参数任务提交给 worker 即可
		w.inputFunc(task)
		return nil
	}
	// 没有获取到可用的 worker
	return ErrPoolOverload
}

获取可用的 worker

retrieveWorker 方法会根据配置项,采用不同的策略获取可用的 worker 。

func (p *Pool) retrieveWorker() (w worker) {
	spawnWorker := func() {
		// 从对象池中获取一个 worker
		w = p.workerCache.Get().(*goWorker)
		// 获取到 worker 之后直接运行
		w.run()
	}
	
	p.lock.Lock()
	
	// 首先尝试从队列获取 worker
	w = p.workers.detach()
	if w != nil { 
		// 从队列中获取到了 worker
		p.lock.Unlock()
	} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
		// 如果队列为空并且满足以下两个条件任意一个:
		//    1. Pool 没有容量限制
		//    2. 当前 worker 数量没有达到 Pool 的容量限制
		// 直接创建一个新的 worker 
		p.lock.Unlock()
		spawnWorker()
	} else {
		// 如果上述条件都不满足
		// 只能等待其他执行完的 worker 归还到队列中,然后再获取
		
		if p.options.Nonblocking {
			// 如果 Pool 的任务提交模式是非阻塞的
			// 直接返回
			p.lock.Unlock()
			return
		}
	retry:
		if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
			// 如果等待 worker 的 goroutine 数量超过限制
			// 直接返回
			p.lock.Unlock()
			return
		}

		// 等待 worker 的 goroutine 数量 + 1
		p.addWaiting(1)
		// 等待可用的 worker 信号
		p.cond.Wait()
        // 等待 worker 的 goroutine 数量 - 1
		p.addWaiting(-1)

		if w = p.workers.detach(); w == nil {
			// 如果接收到了信号,但是依然没有从队列中获取到 worker
			// 说明队列中的 worker 被其他 goroutine 抢先一步获取到了 
			if p.Free() > 0 {
				// 此时如果还有没有达到 Pool 的容量限制
				// 直接创建一个新的 worker 并返回
				p.lock.Unlock()
				spawnWorker()
				return
			}
			goto retry
		}
		p.lock.Unlock()
	}
	return
}

定时清除过期 worker

func (p *Pool) purgeStaleWorkers(ctx context.Context) {
	// 根据配置项创建定时器
	ticker := time.NewTicker(p.options.ExpiryDuration)

	defer func() {
		// 终止定时器
		ticker.Stop()
		atomic.StoreInt32(&p.purgeDone, 1)
	}()

	for {
		select {
		case <-ctx.Done():
			// 退出信号
			return
		case <-ticker.C:
			// 执行清理操作
		}

		var isDormant bool
		p.lock.Lock()
		// 检测过期的 worker
		staleWorkers := p.workers.refresh(p.options.ExpiryDuration)
		n := p.Running()
		isDormant = n == 0 || n == len(staleWorkers)
		p.lock.Unlock()
		
		// 停止过期 worker 的执行 
		for i := range staleWorkers {
			staleWorkers[i].finish()
			staleWorkers[i] = nil
		}

		// 如果当前没有运行的 worker, 所有的 worker 都已过期
		// 但是与此同时还有正在提交任务的 goroutine 在等待可用的 worker
		// 直接发送信号通知
		if isDormant && p.Waiting() > 0 {
			p.cond.Broadcast()
		}
	}
}

辅助方法

// 返回阻塞在提交任务的 goroutine
func (p *Pool) Waiting() int {}

// 返回 Pool 的容量
func (p *Pool) Cap() int {}

// 返回 Pool 是否已关闭
func (p *Pool) IsClosed() bool {}

...

小结

本文从源代码的角度,剖析了知名的开源组件 ants, 着重分析了 Pool, worker, worker 队列 三个核心对象的设计与实现, 组件代码逻辑可以参照官方的流程图。

图片来源: https://github.com/panjf2000/ants

组件本身存在的问题

可以参考 TiDB 大佬的文章 中的 Section: “跟 goroutine pool 或者 worker pool 有什么区别?”。

Reference

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。