蛮荆

sync.Pool Code Reading

2023-06-15

概述

sync.Pool 是 Go 语言标准库中的一个并发安全的对象池,可以用来缓存那些需要重复创建和销毁的对象,从而避免频繁地进行内存分配和回收,降低内存和 GC 压力。

需要注意的是: 任何存储在对象池中的元素可能会被随时删除,如果元素是一个资源类的引用,并且该资源仅在对象池中被引用 (没有其他地方引用了),那么当该元素被对象池删除时,其指向的资源同时也会被释放。

内部实现

sync.Pool 的使用方法相信读者已经熟练掌握,本文主要来探究一下底层源代码实现,文件路径为 $GOROOT/src/sync/pool.go,笔者的 Go 版本为 go1.19 linux/amd64

💡 sync.Pool 的源代码中细节非常之多,为了阅读体验和效率,笔者几乎没有删减代码,而且也基本对每行代码都做了对应的注解和上下文联系,这是本文的特色,请读者留意。

数据结构

全局变量

var (
	// 锁
	allPoolsMu Mutex

	// 全局的所有缓存池
	allPools []*Pool

	// victim cache 缓存池
	oldPools []*Pool
)

数据结构图

这里假设 runtime.GOMAXPROCS() = 4, 处理器 P 的数量为 4 个,读者在阅读下面的源代码探究过程时,可以对照着结构图进行分析。

sync.Pool 数据结构

缓存池对象

sync.Pool 包的核心对象,所有的操作都是基于该对象进行的。

// Pool 一旦使用后,便不能再复制

// 在 Go 内存模型术语中,调用 Put(x) 方法在调用 Get 方法之前同步 
// 在 Go 内存模型术语中,调用 New 方法在调用 Get 方法之前同步
type Pool struct {
    // noCopy 可以添加到 struct 中,实现 "首次使用之后,无法被复制" 的功能,主要服务于 `go vet`
	// 假设一个缓存池对象 A 被对象 B 拷贝了,接着 A 被清空了,B 里面的缓存对象指针指向的对象将会不可控
	noCopy noCopy

	// 指向固定长度的数组,数组长度为处理器 P 的个数,转换后其实就是 [P]poolLocal 数组
	//    实际的底层数据结构是切片,不过下文中统一用数组描述,读者不必在意这个细节
	// 访问时根据处理器 P 的 ID (作为索引) 去访问
	// 优化点: 多个 goroutine 使用同一个缓存池时,可以减少竞争,提高性能
	//        类似于分段锁中降低锁粒度的设计理念
	local     unsafe.Pointer
	// local 数组的长度
	localSize uintptr
	
    // 上一轮的 local, 内容语义和 local 一致
	// 新一轮 GC 到来时,更新为当前 local 的值
	victim     unsafe.Pointer
	
	// 上一轮的 localSize, 内容语义和 localSize 一致
	// 新一轮 GC 到来时,更新为当前 localSize 的值
	victimSize uintptr

	// 创建对象的函数
	New func() any  
}

这里引用下维基百科关于 victim cache 的描述:

所谓受害者缓存(Victim Cache),是 CPU 硬件处理缓存的一种技术,是一个与直接匹配或低相联缓存并用的、容量很小的全相联缓存。 当一个数据块被逐出缓存时,并不直接丢弃,而是暂先进入受害者缓存。如果受害者缓存已满,就替换掉其中一项。当进行缓存标签匹配时, 在与索引指向标签匹配的同时,并行查看受害者缓存,如果在受害者缓存发现匹配,就将其此数据块与缓存中的不匹配数据块做交换,同时返回给处理器。

简单通俗地来说,就是已经失效的缓存先不清除,保留一段时间,如果保留时间内该缓存又被用到了,就重新启用,如果保留时间内一直没有被用到,就清除。

poolLocal 对象

每个处理器 P 都有一个 poolLocal 对象,GetPut 方法会优先操作当前处理器的对象池。

type poolLocal struct {
	poolLocalInternal

	// CPU Cache 是距离 CPU 最近的 Cache,如果能充分利用,会极大提升程序性能
	// 防止伪共享,凑齐 128 bytes 的整数倍 (这个小技巧非常值得学习)
	
	// 什么是CPU 伪共享?
	//   CPU CacheLine 通常是以 64 byte 或 128 byte 为单位
	//   在缓存池场景中,各个 P 的 poolLocal 以数组形式存储在一起
	//   假设 CPU CacheLine 为 128 byte,而 poolLocal 不足 128 byte 时
	//   CacheLine 将会带上其他 P 的 poolLocal 的内存数据,以凑齐一个整块的 CacheLine
	//   如果这时两个相邻的 P 同时在两个不同的 CPU 核上运行,将会同时去覆盖刷新 CacheLine 
	//   造成 CacheLine 的反复失效,那 CPU Cache 就失去了作用
	
	//   例如 两个相邻但是不同的处理器 P (PA, PB) 被分配在同一个 CacheLine
	//   此时 PA 要修改, PB 也要修改 (两者去竞争 同一个 CacheLine)
	//   当 PA 被修改时,缓存系统强制 PB 所在 CPU 核的 CacheLine 失效
	//   当 PB 被修改时,缓存系统强制 PA 所在 CPU 核的 CacheLine 失效
	//   最终导致 PA 和 PB 所在 CPU 核的 CacheLine 失效,降低性能
	
	// 如何避免 CPU 伪共享?
	//   将需要独立访问的变量放在不同的 CacheLine 中
	//   保证和 CacheLine 内存对齐
	
	// Linux 查看 CacheLine 单位大小
	// $ cat /sys/devices/system/cpu/cpu1/cache/index0/coherency_line_size
	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

poolLocalInternal 对象

poolLocalInternal 对象表示每个处理器 P 的本地对象池。

type poolLocalInternal struct {
    // 私有变量,只能由当前处理器操作
	private any
    // 共享变量,当前处理器可以执行 pushHead/popHead 操作,其他处理器只能执行 popTail 操作
	shared  poolChain 
}

Go 1.13 版本开始,shared 字段的数据结构修改为 单个生产者/多个消费者 双端无锁环形队列,当前处理器 P 可以执行 pushHead/popHead 操作, 其他处理器 P 只能执行 popTail 操作。

单个生产者:当前处理器 P 上面运行的 goroutine 执行 Put 方法时,将对象放入队列,并且只能放在队列头部,但是其他处理器 P 上运行的 goroutine 不能放入。 由于每个处理器 P 在任意时刻只有一个 goroutine 运行,所以无需加锁。

多个消费者分两种角色

  1. 在当前处理器 P 上运行的 goroutine,执行 Get 方法时,从队列头部取对象,由于每个处理器 P 在任意时刻只有一个 goroutine 运行,所以无需加锁
  2. 在其他处理器 P 上运行的 goroutine,执行 Get 方法时,如果该处理器 P 没有缓存对象,就到别的处理器 P 的队列上窃取。 此时窃取者 goroutine 只能从队列尾部取对象,因为同时可能有多个窃取者 goroutine 窃取同一个处理器 P 的队列, 所以用 CAS 来实现无锁队列功能

按照这种设计,poolDequeue.pushHeadpoolDequeue.popTail 存在竞争 (可能同时有多个 goroutine 同时操作), 而 poolDequeue.pushHeadpoolDequeue.popHead 不存在竞争 (只能有一个 goroutine 操作)。

  • poolDequeue.pushHead: 将对象添加到队列头部
  • poolDequeue.popHead : 从队列头部获取对象
  • poolDequeue.popTail : 从队列尾部获取对象

poolChain 对象

poolChain 对象表示 poolDequeue 数据类型的双端环形队列链表,每个节点表示的队列长度是后驱节点队列长度的两倍, 如果当前所有的节点队列满了,就创建一个新的队列 (长度是当前头节点队列长度的 2 倍),然后挂载到头节点。

// 队列节点示意图
// --------------------------------------------------------------------------
// | 节点 1, size: 64 | 节点 2, size: 32 | 节点 3, size: 16 | 节点 4, size: 8 |
// --------------------------------------------------------------------------
type poolChain struct {
	// head 表示头节点队列,只能由生产者操作,不存在竞争
	head *poolChainElt

	// tail 表示尾节点队列,由多个消费者操作,存在竞争
	tail *poolChainElt
}

type poolChainElt struct {
	poolDequeue
	
	// next 由生产者原子性写入,由消费者原子性读取
	// 值只会从 nil 转换为非 nil
	
	// prev 由消费者原子性写入,由生产者原子性读取
	// 值只会从非 nil 转换为 nil
	next, prev *poolChainElt
}

为什么 poolChain 的数据结构是链表 + ring buffer (环形队列) 呢?

因为使用 ring buffer 数据结构的优点非常适用于 sync.Pool 对象池的使用场景。

  1. 预先分配好内存并且分配的元素内存可复用,避免了数据迁移
  2. 作为底层数据结构的数组是连续内存结构,非常利于 CPU Cache, 在访问 poolDequeue 队列中的某个元素时,其附近的元素可能被加载到同一个 Cache Line 中,访问速度更快
  3. 更高效的出队和入队操作,因为环形队列是首尾相连的,避免了普通队列中队首和队尾频繁变动的问题

poolDequeue 对象

poolDequeue 对象是一个由 单个生产者/多个消费者 模式组成的固定大小的无锁队列。单个生产者可以从队列头部执行 pushpop 操作, 多个消费者只能从队列尾部执行 pop 操作。

type poolDequeue struct {
	// 经典的字段合并使用方法
    // 高 32 位 是 head, 指向下一个存放对象的索引 
    // 低 32 位 是 tail, 指向队列中最早 (下一个读取) 的对象索引
	
	// 索引区间 tail <= i < head, 表示消费者可以操作的索引区域
	// 消费者可以在该区间不断获取对象,直至获取到的对象为 nil
	headTail uint64

	// vals 表示队列元素容器,大小必须为 2 的 N 次幂
	// 容器会在初始化时指定容量,实现数据元素内存预初始化
	vals []eface
}

为什么要将 headtail 合并到一个变量里面?

因为这样可以进行原子操作,完成两个字段的 lock free (无锁编程) 优化。

例如:当队列中仅剩一个对象时,如果多个处理器 P 同时访问队列,如果没有进行并发限制,两个处理器 P 都可能获取到对象,这显然是不符合预期的。 那么在不引入互斥锁的前提下,sync.Pool 是如何实现临界区数据控制的呢? sync.Pool 利用了 atomic 包的提供的 CAS 操作,并发情况下两个处理器 P 都可能获取到对象,但是最终只会有一个处理器 P CAS 操作成功, 另外一个处理器操作失败,在更新 headtail 两个字段的时候,也是通过 CAS + 位运算 进行操作的。

小结

通过对源代码中的数据结构进行分析,我们可以看到内部隐藏了非常多的设计技巧和对应的基础理论知识,接下来开始阅读构建于数据结构之上的具体算法。

这里再放一张数据结构图,方便读者结合算法代码进行分析。

sync.Pool 数据结构

对象归还

我们首先来看下对象归还流程,也就是如何把一个对象放入缓存池的某个队列中,从 Pool.Put 方法开始追踪代码。

func (p *Pool) Put(x any) {
    ...
	
	l, _ := p.pin()
	if l.private == nil {
		// 优先设置私有变量
		l.private = x  
	} else {
		// 其次设置共享变量
		l.shared.pushHead(x)
	}
    
	...
}

func (c *poolChain) pushHead(val any) {
	d := c.head
	if d == nil {
		// 初始化头节点
		// 对象池元素数量从 8 个开始,必须为 2 的 N 次幂
		const initSize = 8 
		d = new(poolChainElt)
		d.vals = make([]eface, initSize)
		c.head = d
		storePoolChainElt(&c.tail, d)
	}
	
	if d.pushHead(val) {
		// 如果对象成功加入队列,直接返回
		return
	}

	// 如果当前队列已满,分配一个新的队列 (长度是当前队列的 2 倍)
	newSize := len(d.vals) * 2
	if newSize >= dequeueLimit {
		// 队列长度最大为 1073741824
		newSize = dequeueLimit
	}

	// 初始化新的队列
	d2 := &poolChainElt{prev: d}
	d2.vals = make([]eface, newSize)
	// 将头节点指向到新的队列
	c.head = d2
	// 将新的队列添加到链表中
	storePoolChainElt(&d.next, d2)
    // 将对象添加到新的队列
	d2.pushHead(val)
}

poolDequeue.pushHead 方法将一个对象加入到队列中,如果队列已满,返回 false,该方法必须由 单个生产者 操作。

func (d *poolDequeue) pushHead(val any) bool {
	ptrs := atomic.LoadUint64(&d.headTail)
	head, tail := d.unpack(ptrs)
	if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
        // 说明队列已满 (tail 索引 + 当前队列元素个数) == head 索引
		return false
	}
	slot := &d.vals[head&uint32(len(d.vals)-1)]

	// 检测索引位置的对象是否和 popTail 方法操作冲突
	typ := atomic.LoadPointer(&slot.typ)
	if typ != nil {
		// 有其他 goroutine 正在调用 popTail 方法操作当前位置的对象
		// 所以队列实际上已满
		return false
	}
	
	// 执行到这里,typ == nil 
	// 说明即使存在 popTail 方法操作当前位置的对象,操作也已经结束了,冲突解除
	if val == nil {
		val = dequeueNil(nil)
	}
	// 使用归还的对象填充索引位置
	*(*any)(unsafe.Pointer(slot)) = val

	// head 索引 + 1
	atomic.AddUint64(&d.headTail, 1<<dequeueBits)
	return true
}

获取对象

接下来探究从缓存池中获取对象的流程,从 Pool.Get 方法开始追踪代码。

func (p *Pool) Get() any {
	l, pid := p.pin()
    // 首先尝试从当前处理器的私有变量获取对象
	x := l.private 
	// 从私有变量获取后,及时将私有变量置为 nil
	l.private = nil
	
	if x == nil {
		// 私有变量没有获取到对象,尝试从共享变量获取
		x, _ = l.shared.popHead()
		if x == nil {
			// 当前处理器 P 没有对象,尝试从其他处理器窃取
			x = p.getSlow(pid)
		}
	}

	// 如果从所有处理器的缓存池都没有获取到对象,并且 New 方法不为 nil
	// 那就调用 New 方法创建一个对象返回
	if x == nil && p.New != nil {
		x = p.New()
	}
	return x
}

func (c *poolChain) popHead() (any, bool) {
	d := c.head
	for d != nil {
        // 从队列头部开始获取对象
		if val, ok := d.popHead(); ok {  
			return val, ok
		}
		// 将当前队列前驱节点作为接下来要遍历的队列
		d = loadPoolChainElt(&d.prev)
	}
	return nil, false
}

poolDequeue.popHead 方法从队列头部删除一个对象并返回,如果队列为空,返回 false, 该方法必须由 单个生产者 操作。

func (d *poolDequeue) popHead() (any, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		// 高 32 bit 是 head
		// 低 32 bit 是 tail
		head, tail := d.unpack(ptrs)
		if tail == head {
			// 头尾相等,说明队列为空
			return nil, false
		}
		
		// head 索引指向下一个新对象的索引位置,所以使用前先减 1
		head--
		ptrs2 := d.pack(head, tail)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
			// CAS 操作成功,移除头部的对象,此时 head 指向 head - 1
			// vals 切片的长度是 2 的 N 次幂,因此 len(d.vals)-1 之后,低的 N 位全是 1
			// 和 head 进行与运算之后,可以获取到对象的索引下标
			// 例如: 切片长度 = 32, len(d.vals)-1 = 31
			//      head = 5, 索引下标 = 5 & 31 = 5
			//      head = 25, 索引下标 = 25 & 31 = 25
			slot = &d.vals[head&uint32(len(d.vals)-1)]
			break
		}
	}
	
	val := *(*any)(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
        // 获取到的对象是 nil
		val = nil
	}
	// 重置索引位置的元素
	*slot = eface{}
	// 返回获取到的对象
	return val, true
}

Pool.getSlow 方法用于当前处理器 P 没有对象时,尝试从其他处理器 P 窃取对象。

func (p *Pool) getSlow(pid int) any {
	// 原子加载 localSize 字段
	size := runtime_LoadAcquintptr(&p.localSize) 
	locals := p.local                            
	
	// 尝试从其他处理器获取对象
	for i := 0; i < int(size); i++ {
		// 注意这里定位处理器 P 的索引计算方式
		// pid+i+1 是为了忽略当前处理器 P
		l := indexLocal(locals, (pid+i+1)%int(size))
        // 从队列尾部获取,减少并发冲突
		if x, _ := l.shared.popTail(); x != nil { 
			return x
		}
	}

	// 如果从其他处理器没有获取到对象,尝试从 victim 缓存中获取 (也就是上一轮对象池中的对象)
	// 这样做可以尽可能地复用对象
	size = atomic.LoadUintptr(&p.victimSize)
	if uintptr(pid) >= size {
		// 当前处理器不存在于 victim 中,可能原因如下:
		// 1. victim 已经被标记为空
		// 2. 当前处理器比 victim 的长度要大,属于 "后来创建的"
		// 此时直接返回即可,否则会发生处理器 pid 索引越界错误
		return nil
	}

	// 下面开始尝试从 victim 中获取对象
	
    // 尝试从尾部处理器的私有变量获取对象
	locals = p.victim
	l := indexLocal(locals, pid)
	if x := l.private; x != nil {
		l.private = nil
		return x
	}
	
	// 尝试从其他处理器获取对象
	for i := 0; i < int(size); i++ {
        // 注意这里定位处理器 P 的索引计算方式和刚才的不同
		// 这里不需要忽略任何处理器
		l := indexLocal(locals, (pid+i)%int(size))
		// 从队列尾部获取,减少并发冲突
		if x, _ := l.shared.popTail(); x != nil { 
			return x
		}
	}
	
	// 将 victim 缓存标记为空,后续请求直接返回,避免多余的查询
	atomic.StoreUintptr(&p.victimSize, 0)

	return nil
}

func (c *poolChain) popTail() (any, bool) {
	d := loadPoolChainElt(&c.tail)
	if d == nil {
		// 如果链表尾部节点为 nil, 直接返回
		return nil, false
	}

	for {
		// 在队列尾部节点出队之前,提前加载 d.next 指针很重要 (删除链表节点的边界条件)
		// 因为 d 节点的尾元素出队之后,d 节点可能会变为 nil, 这样永远无法找到 d.next 节点了
		d2 := loadPoolChainElt(&d.next)

		if val, ok := d.popTail(); ok {
			return val, ok
		}

		if d2 == nil {
			// 如果 next 节点都变成 nil 了,说明队列已经空了
			return nil, false
		}
		
		// 代码执行到这里,说明尾部节点 (d) 为 nil
		// 这时就可以将其删除了,防止下次调用 popTail 时发生错误 (误以为队列已空)
		if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
			// 删除 d2 的前驱节点,也就是 d (因为此时前驱节点已经没有数据了)
			storePoolChainElt(&d2.prev, nil)
		}
		d = d2
	}
}

poolDequeue.popTail 方法从队列尾部删除一个对象并返回,如果队列为空,返回 false, 该方法必须由 多个生产者 操作。

func (d *poolDequeue) popTail() (any, bool) {
	var slot *eface
	for {
		ptrs := atomic.LoadUint64(&d.headTail)
		head, tail := d.unpack(ptrs)
		if tail == head {
            // 头尾相等,说明队列为空
			return nil, false
		}
		
		ptrs2 := d.pack(head, tail+1)
		if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
			// CAS 操作成功,移除尾部的对象,此时 tail 指向 tail + 1
			slot = &d.vals[tail&uint32(len(d.vals)-1)]
			break
		}
	}
	
	val := *(*any)(unsafe.Pointer(slot))
	if val == dequeueNil(nil) {
		val = nil
	}
	
	// 通过重置元素为 nil 的方式通知 pushHead 方法 (因为这两个方法存在并发操作同一位置元素的可能)
	// 当前位置的元素已经操作完成 (pushHead 方法操作当前元素时会检测否和 popTail 方法冲突)
	// 先重置 val, 再重置 typ
	slot.val = nil
	atomic.StorePointer(&slot.typ, nil)

	return val, true
}

流程图

注意当前 P 和 其他 P 的区别:

注意当前 P 和 其他 P 的区别

辅助方法

pin

pin 方法绑定当前 goroutine 到处理器 P 并禁止抢占,返回一个 poolLocal 对象指针和处理器 P 的 ID。

func (p *Pool) pin() (*poolLocal, int) {
	pid := runtime_procPin()
    // 原子加载 localSize 字段
	s := runtime_LoadAcquintptr(&p.localSize) 
	l := p.local      
	if uintptr(pid) < s {
		// 如果 pid 小于 local 数组的长度
		// 说明对应的 poolLocal 对象已经创建,直接返回即可
		return indexLocal(l, pid), pid
	}
	
	// 代码执行到这里,一般是因为两种原因:
	// 1. 缓存池还未创建
	// 2. 处理器 P 的数量被动态调整了
	return p.pinSlow()
}

pinSlow

pinSlow 方法创建一个新的 poolLocal 对象并返回。

func (p *Pool) pinSlow() (*poolLocal, int) {
	runtime_procUnpin()
	allPoolsMu.Lock()
	defer allPoolsMu.Unlock()
	pid := runtime_procPin()
	
	// 加锁完成,就不需要原子性加载了
	s := p.localSize
	l := p.local
	if uintptr(pid) < s {
		// 双重检测
		// local 数组已经发生变化 (加锁期间被其他线程修改)
		// 直接返回即可
		return indexLocal(l, pid), pid
	}
	if p.local == nil {
		allPools = append(allPools, p)
	}
	
	// 使用处理器 P 的数量作为数组长度
	size := runtime.GOMAXPROCS(0)
	// 初始化新的 local 数组
	local := make([]poolLocal, size)
    // 原子更新 local 字段
	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0]))
    // 原子更新 localSize 字段
	runtime_StoreReluintptr(&p.localSize, uintptr(size))     
	return &local[pid], pid
}

indexLocal

indexLocal 方法根据索引参数,返回 local 数组中对应的 poolLocal 对象。

func indexLocal(l unsafe.Pointer, i int) *poolLocal {
	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
	return (*poolLocal)(lp)
}

缓存池 GC 过程

sync.Pool 包文件中有一个 init 函数,内部注册了 GC 执行方法。

func init() {
    runtime_registerPoolCleanup(poolCleanup)
}

poolCleanup 方法在缓存池的清理过程中,并不会直接释放池对象,而是会将其放入 victim 中,等到下一轮清理时再释放。 这样可以防止缓存池被直接释放后,变为冷启动时面对突然暴涨的对象请求造成的性能抖动,通过将缓存池放入 victim 中,可以起到避免 GC 毛刺、平滑过渡的作用

func poolCleanup() {
	// 函数会在 GC 过程中的 STW 阶段被调用

	// 清理 victim 缓存对象池
	for _, p := range oldPools {
		p.victim = nil
		p.victimSize = 0
	}

	// 将当前所有缓存池对象移动到 victim 缓存池对象
	for _, p := range allPools {
		p.victim = p.local
		p.victimSize = p.localSize
		p.local = nil
		p.localSize = 0
	}

	// 将全局缓存池移动到全局 victim 缓存池
	// 将全局 victim 重置为 nil
	oldPools, allPools = allPools, nil
}

小结

通过学习 sync.Pool 的源代码,我们可以深入理解和学习到的高性能编程设计理念和技巧:

  • noCopy 机制
  • CPU CacheLine 伪共享、内存对齐
  • poolDequeue.headTail 字段合并设计,压缩、解压、CAS 操作、索引定位等
  • 每个处理器 P 持有一个对象池,最大限度降低并发冲突
  • 私有变量/共享变量
  • 单生产者/多消费者模式实现 “读写分离”
  • 双端队列的出队顺序 (当前处理器 P 从队列头部操作,其他处理器 P 从队列尾部操作),最大限度降低并发冲突
  • 无锁编程
  • 对象窃取机制
  • 垃圾回收时的新旧对象交替使用,类似分代垃圾回收的设计理念

Reference

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。