漏桶算法和令牌桶算法
2023-07-20 软件工程 微服务 Go 源码分析 读代码
概述
限流的目的 是通过对并发请求进行限速来保护系统,请求一旦达到限制速率,就可以选择性地拒绝服务,例如下面的几种方案:
- 提示用户系统繁忙,请稍后重试
- 提示用户进入排队等待
- 跳转到指定页面
- ….
漏桶
算法和 令牌桶
算法是 接口限流设计
中常用的两种算法,本文通过两个常用的开源组件,研究下两者的区别和具体代码实现。
漏桶
漏桶
算法将 服务的请求量限额
比喻为一个一直装满水的桶,每隔固定时间向外漏 N
滴水。如果请求方接到了这滴水,就可以继续请求服务,如果没有接到,就需要等待下一滴水。
也就是说,不管请求量有多少,单位时间内请求额度 (漏水流出的容量) 是固定的。
算法实现
笔者选择的组件是由 Uber
开源的 uber-go/ratelimit 作为研究 漏桶
算法代码实现,版本为 v0.2.0
。
示例代码
package main
import (
"fmt"
"go.uber.org/ratelimit"
"time"
)
func main() {
// 每秒速率限制为 1000 个请求
// 平均每个请求 1 ms
rl := ratelimit.New(1000)
prev := time.Now()
for i := 0; i < 5; i++ {
now := rl.Take()
// 打印每个请求距离上个请求的时间
fmt.Println(i, now.Sub(prev))
prev = now
}
}
// $ go run main.go
// 输出如下
// 0 1ms
// 1 1ms
// 2 1ms
// 3 1ms
// 4 1ms
接口
Limiter
用来进行限流,可能存在多个 goroutine
并发的情况。
type Limiter interface {
// Take 方法调用时可能会阻塞以确保限流被满足
Take() time.Time
}
Clock
通过时间来计算请求速率限制 (内部实现是在标准库的 time
包上面封装了一层,便于在测试中 Mock)。
type Clock interface {
Now() time.Time
Sleep(time.Duration)
}
两种实现
对于 Limiter
接口,ratelimit
组件的内部通过实现了两种方案,分别基于 互斥锁
和基于 atomic (无锁编程)
,
本文主要基于 互斥锁
方案的代码进行分析,对 无锁
方案感兴趣的读者可以自行阅读源代码,如果对 无锁
概念比较模糊,
建议阅读 扩展列表的文章。
除此之外,还有一个表示不限流的 unlimited
也实现了 Limiter
接口。
限流器配置对象
type config struct {
clock Clock
slack int // 浮动请求数量
per time.Duration // 单个时间周期 (默认为 1 秒)
}
// 通过 FUNCTIONAL OPTIONS 模式设置配置信息
func buildConfig(opts []Option) config {
// 先给出默认配置
c := config{
clock: clock.New(),
slack: 10,
per: time.Second,
}
// 根据参数自定义配置
for _, opt := range opts {
opt.apply(&c)
}
return c
}
限流器对象
type mutexLimiter struct {
sync.Mutex
last time.Time // 上次请求时间
sleepFor time.Duration // 当前请求休眠时间
perRequest time.Duration // 单个请求时间片
maxSlack time.Duration // 最大浮动请求时间
clock Clock
}
newMutexBased
通过 FUNCTIONAL OPTIONS
模式创建一个 限流器
。
func newMutexBased(rate int, opts ...Option) *mutexLimiter {
// 根据参数构造配置信息
config := buildConfig(opts)
// 通过配置信息得到限流的单个生命周期 (默认为 1 秒)
// 单个声明周期 / 最大请求速率 = 单个请求时间片
// 例如 rate = 1000, config.per = 1 秒
// 意味着单个请求时间片为 1 毫秒
perRequest := config.per / time.Duration(rate)
// maxSlack 表示最大请求浮动时间
// 最大请求浮动时间 = -1 * 浮动请求数量 * perRequest
// 例如 config.slack = 10, perRequest = 1 毫秒, maxSlack = -10 毫秒
l := &mutexLimiter{
perRequest: perRequest,
maxSlack: -1 * time.Duration(config.slack) * perRequest,
clock: config.clock,
}
return l
}
实现 Limiter 接口
Take
方法可能会在并发调用时阻塞,保证每个请求的时间片均为 config.per / rate
(也就是 perRequst
)。
func (t *mutexLimiter) Take() time.Time {
// 加锁
t.Lock()
defer t.Unlock()
now := t.clock.Now()
// 第一个请求到达时,初始化最后访问时间
if t.last.IsZero() {
t.last = now
return t.last
}
// sleepFor 根据 perRequest 时间片和最后的请求花费的时间,来计算当前请求需要的休眠时间
// 由于距离上次请求的时间可能比 perRequest 时间片要多
// 因此 t.perRequest - now.Sub(t.last) 得出的结果可能为负数
t.sleepFor += t.perRequest - now.Sub(t.last)
// sleepFor 不能为负数 (如果执行到这里,sleepFor 依然为负数,说明距离上次请求的时间太久远了)
// 因为这意味着会将之前的请求速率进行累加,从而突破了最大请求速率限制,这不符合漏桶算法的规则
// 举例可能比较形象:在过去的 10 分钟,漏桶里面一直没有水,但是现在桶里面的水突然满了,但是漏桶本身的流速 (规则) 并不会收到影响
if t.sleepFor < t.maxSlack {
// 如果 sleepFor 比 maxSlack 还要小,直接以 maxSlack 为准
// 也就是说,即使距离上次请求的时间很久,面对突增的流量,我们最多给到 maxSlack 个时间片 (作为缓冲)
t.sleepFor = t.maxSlack
}
if t.sleepFor > 0 {
// 休眠
t.clock.Sleep(t.sleepFor)
// 更新上次访问时间为休眠后时间
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
// 更新上次访问时间为当前时间
t.last = now
}
return t.last
}
小结
漏桶
的不足之处在于其速率限制是固定的,即使请求容量允许的情况下,仍然无法处理流量突增的场景。例如 漏桶
请求容量为 10W, 速率限制为 1W,
某一个时刻流量突增,有 10W 个用户请求,但是只能有 1W 用户正常请求,剩余用户只能等待,此时就需要 令牌桶
算法来解决。
令牌桶
令牌桶
算法定期向桶中添加令牌,令牌的数目可以按照需要消耗的资源进行相应的调整,请求服务时需要从桶中获取令牌,
如果获取到令牌,可以正常访问,如果没有获取到令牌,可以选择等待,或者放弃。
算法实现
笔者选择的组件是 juju/ratelimit 作为研究 令牌桶
算法代码实现,版本为 v1.0.2
。
示例代码
package main
import (
"fmt"
"github.com/juju/ratelimit"
"time"
)
func main() {
// 每秒向桶内添加 1 个 token
// 桶的 token 容量为 10 个
b := ratelimit.NewBucketWithQuantum(time.Second, 10, 1)
fmt.Printf("bucket capacity = %d\n", b.Capacity()) // 获取桶的令牌容量
fmt.Printf("bucket rate = %f\n", b.Rate()) // 获取添加令牌速率
fmt.Printf("bucket available = %d\n", b.Available()) // 获取桶当前可用令牌数量
n := b.TakeAvailable(5) // 获取 5 个令牌
fmt.Printf("tokens number is %d\n", n)
fmt.Printf("bucket available = %d\n", b.Available()) // 获取桶当前可用令牌数量
time.Sleep(3 * time.Second)
fmt.Printf("bucket available = %d\n", b.Available()) // 获取桶当前可用令牌数量
}
运行代码
$ go run main.go
# 输出如下
bucket capacity = 10
bucket rate = 1.000000
bucket available = 10
tokens number is 5
bucket available = 5
bucket available = 8
从代码的输出结果中可以看到,初始化时 令牌桶
的容量为 10,取走 5 个令牌之后,可用的剩下 5 个,然后程序休眠 3 秒,
因为每秒放入一个令牌,所以最终 令牌桶
的可用令牌数量为 8 个。
接口
Clock
通过时间来计算请求速率限制。
type Clock interface {
Now() time.Time
Sleep(d time.Duration)
}
realClock
内部实现是在标准库的 time
包上面封装了一层。
type realClock struct{}
func (realClock) Now() time.Time {
return time.Now()
}
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
}
令牌桶对象
type Bucket struct {
clock Clock
startTime time.Time // 令牌桶开始计时时间
capacity int64 // 令牌桶容量
quantum int64 // 定期添加令牌数量
fillInterval time.Duration // 定期间隔时间
mu sync.Mutex // 互斥锁
availableTokens int64 // 令牌桶内的令牌总数量
latestTick int64 // 上次获取令牌时间片
}
创建令牌桶
func NewBucketWithQuantum(fillInterval time.Duration, capacity, quantum int64) *Bucket {
return NewBucketWithQuantumAndClock(fillInterval, capacity, quantum, nil)
}
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
// 参数检测
...
return &Bucket{
clock: clock,
startTime: clock.Now(),
latestTick: 0,
fillInterval: fillInterval,
capacity: capacity,
quantum: quantum,
availableTokens: capacity,
}
}
创建新的 令牌桶
时,会初始化 startTime
字段为当前时间,latestTick
时间片为 0, availableTokens
字段和 capacity
字段一致。
获取令牌桶容量
func (tb *Bucket) Capacity() int64 {
return tb.capacity
}
因为 令牌桶
的容量在创建时就初始化完成且不可修改,所以直接返回 capacity
字段即可。
获取令牌桶添加速率
Rate
方法返回 每秒
向 令牌桶
添加令牌的个数,计算时会将秒转换为纳秒单位。
func (tb *Bucket) Rate() float64 {
return 1e9 * float64(tb.quantum) / float64(tb.fillInterval)
}
currentTick
方法返回从 令牌桶
开始计时到当前的时间片。
func (tb *Bucket) currentTick(now time.Time) int64 {
return int64(now.Sub(tb.startTime) / tb.fillInterval)
}
获取桶内可用令牌数量
Available
方法返回 令牌桶
内可用的的令牌个数,因为存在并发的情况,所以 函数返回值不可作为调用获取令牌方法时的参数。
func (tb *Bucket) Available() int64 {
return tb.available(tb.clock.Now())
}
获取令牌
TakeAvailable
方法从 令牌桶
内获取指定数量的令牌 (无阻塞),返回获取到的令牌数量,如果返回 0, 说明没有可用的令牌。
func (tb *Bucket) TakeAvailable(count int64) int64 {
tb.mu.Lock() // 获取令牌期间加锁
defer tb.mu.Unlock()
return tb.takeAvailable(tb.clock.Now(), count)
}
takeAvailable
方法是获取令牌的具体实现,新增了一个便于测试的 time.Time
参数。
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
...
// 根据时间片调整桶内可用令牌数量
tb.adjustavailableTokens(tb.currentTick(now))
if tb.availableTokens <= 0 {
// 没有可用的令牌
return 0
}
if count > tb.availableTokens {
// 获取令牌数量大于可用的令牌数量时,返回可用令牌数量
count = tb.availableTokens
}
// 减少桶内可用令牌数量
tb.availableTokens -= count
return count
}
adjustavailableTokens
方法根据时间片调整桶内可用令牌数量。
func (tb *Bucket) adjustavailableTokens(tick int64) {
// 更新桶的上次请求时间片
lastTick := tb.latestTick
tb.latestTick = tick
if tb.availableTokens >= tb.capacity {
// 桶内可用令牌大于等于桶的容量,直接返回
return
}
// (tick - lastTick) * tb.quantum = 自上次请求后应该新增的令牌数量
// 更新桶内令牌数量
tb.availableTokens += (tick - lastTick) * tb.quantum
if tb.availableTokens > tb.capacity {
// 如果桶内可用令牌大于桶的容量,以桶的容量为准
tb.availableTokens = tb.capacity
}
return
}
小结
通过对源代码的实现分析,可以看到 juju/ratelimit
的功能实现非常简洁,最重要的是,没有使用额外的 goroutine
去完成 定期向桶内添加令牌
这个操作,
而是在获取时去实时计算,简化了内部实现逻辑的同时,非常便于扩展和测试。
总结
本文通过对两个高质量 (Github starts > 2500) 的开源组件的源代码进行分析,认识并理解了 漏桶
和 令牌桶
的差别以及具体实现。
从算法的实现角度来讲,两个算法的核心都基于 时间片
,漏桶
算法中的 时间片
作为单个请求的时间间隔,
而 令牌桶
算法中的 时间片
作为定期向桶内添加令牌的时间基准。
漏桶
算法和 令牌桶
算法看起来很像,不过还是有一定区别。漏桶
流出的速率固定,而 令牌桶
只要在桶中有令牌,那就可以获取并请求服务。
也就是说,令牌桶
允许一定程度的并发超过其速率限制,例如某一个时刻流量突增,有 10W 个用户请求,只要令牌桶中有 10W 个令牌,那么这 10W 个请求全都会放过去,
但是 漏桶
不能超过其速率限制。令牌桶
在桶中没有令牌的情况下会退化为 漏桶
算法 (只能定期添加,速率变成了固定的)。
依然存在的问题
无论是 漏桶
算法还是 令牌桶
算法,都需要在初始化时指定一个固定值作为桶的 “容量参数”,但在现代的微服务架构中,一个服务的负载能力往往是会不断变化的:
- 随着新增代码带来的变化
- 随着服务依赖的下游性能变化而变化
- 随着服务部署所在节点 (CPU/磁盘) 性能变化而变化
- 随着服务部署节点数变化而变化
- 随着业务需求变化而变化
- 随着时间段变化而变化
即使参数值是从配置中心获取的,但是依然无法动态修改,而且参数值依赖开发/运维人员的个人经验判断,无法自动化这个流程意味着容易出现误操作,从而引发 Bug。
写到这里,笔者脑海中浮现的第一个解决方案是:结合 K8S 的自动扩容, 对单个 Pod 中的服务进行压测,然后根据压测结果计算出合理的漏桶和令牌痛的参数值,保存在 Pod 配置文件或者服务配置中心, 在服务初始化时从配置中读取参数值然后注入到漏桶或令牌桶即可。
更多方案欢迎大家在评论区积极讨论 :-)