微服务中的熔断算法
2023-07-24 Golang 微服务 Go 源码分析 读代码
雪崩效应
在微服务系统中,整个系统是以一系列功能独立的微服务组成,如果某一个服务,因为流量异常或者内部其他原因,导致响应异常,那么该服务会影响到其下游服务,
从而发生一系列连锁反应,最终导致整个系统崩溃,这就是微服务中的 雪崩效应
。
例如:当前系统中有 A
,B
,C
三个服务,服务 A
是上游,服务 B
是中游,服务 C
是下游。
一旦下游服务 C
变得不可用,积压了大量请求,服务 B
的请求也随之阻塞,资源逐渐耗尽,使得服务 B
也变得不可用。
最后,服务 A
也变为不可用,整个系统链路崩溃。
熔断
熔断
机制是微服务调用链路中的的自我保护机制,当链路中某个服务响应时间过长甚至不可用时,会进行服务 熔断
,快速返回错误响应,停止 级联故障
,避免 雪崩效应
。
熔断、限流、降级区别
限流
是针对服务请求数量的一种自我保护机制,当请求数量超出服务负载时,自动丢弃新的请求,是系统高可用架构的第一步。
服务有了 限流
之后,为什么还需要 熔断
呢? 限流
面向的是上游的服务,而 熔断
面向的是下游的服务。
降级
通过将不重要的服务暂停,提高系统负载能力。例如电商的 秒杀
场景中,可以暂停 用户好友关系
, 用户信息
等服务。
触发条件 | 面向目标 | |
---|---|---|
熔断 | 下游服务不可用 | 下游 |
降级 | 服务自身负载高 | 自身 |
限流 | 上游服务请求多 | 上游 |
hystrix-go
Hystrix 是 Netflix
开源的由 Java
开发的 熔断器
组件,对应的 Go
版本为 afex/hystrix-go,
笔者选择该组件作为研究 熔断
算法代码实现。
三个状态
状态 | 说明 |
---|---|
关闭 | 熔断关闭时所有的请求都会被接收 |
开启 | 熔断开启时所有的请求都会被拒绝 |
半打开 | 熔断开启时一段时间之后,尝试接收一个请求,确定下游服务是否已恢复,如果这个请求正常返回,熔断自动关闭状态,否则熔断回退到开启状态 |
这里需要注意的是: afex/hystrix-go 的实现中并没有 半打开
的状态,也就是说,一旦 熔断
开启后,
只能等待配置的时间之后,才能去主动判定下游服务是否已经恢复,继而恢复请求。笔者认为这个不是重要的部分,如果读者比较介意的话,
可以参考 引用部分 的另外两个 熔断
开源组件。
示例代码
主流程代码
package hystrix
import (
"fmt"
"net/http"
"sync/atomic"
"testing"
"time"
"github.com/afex/hystrix-go/hystrix"
"github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2"
)
func server() {
r := gin.Default()
// 服务请求计数器
var count int64
r.GET("/ping", func(ctx *gin.Context) {
// 模拟服务故障,前两次请求返回错误
if atomic.AddInt64(&count, 1) < 3 {
ctx.String(http.StatusInternalServerError, "pong")
return
}
// 后面的请求正常返回
ctx.String(http.StatusOK, "pong")
})
// 通过 http://localhost:8080/ping 访问
_ = r.Run(":8080")
}
func TestQuickStart(t *testing.T) {
// 启动服务
go server()
hystrix.ConfigureCommand("test-api-ping", hystrix.CommandConfig{
// 执行命令超时时间, 默认值 1 秒
Timeout: 0,
// 最大并发请求量, 默认值 10
MaxConcurrentRequests: 100,
// 熔断开启前需要达到的最小请求数量, 默认值 20
RequestVolumeThreshold: 5,
// 熔断器开启后,重试服务是否恢复的等待时间,默认值 5 秒
// 这里修改为 0.5 秒
SleepWindow: 500,
// 请求错误百分比阈值,超过阈值后熔断开启
ErrorPercentThreshold: 20,
})
for i := 0; i < 20; i++ {
_ = hystrix.Do("test-api-ping", func() error {
resp, _ := resty.New().R().Get("http://localhost:8080/ping")
if resp.IsError() {
return fmt.Errorf("err code: %s", resp.Status())
}
return nil
}, func(err error) error {
fmt.Println("fallback err: ", err)
return err
})
// 每次请求之间休眠 0.1 秒
time.Sleep(100 * time.Millisecond)
}
}
主流程代码逻辑描述
我们通过修改 hystrix
的默认配置,期望达到以下的熔断效果:
- 前两个请求直接返回错误
- 此时错误百分比达到
100%
, 但是还未达到开启熔断
的最小请求数量 (5) - 继续发出请求,接下来的
3
个请求全部成功,此时请求数量达到开启熔断
的最小请求数量 - 开启
熔断
- 继续发出请求,接下来的
5
个请求全部返回fallback error
, 同时每次请求之间间隔0.1 秒
- 此时达到开启
熔断
后,重试服务是否恢复的等待时间 (0.5 秒) - 继续发出请求,接下来的
5
个请求全部成功
运行测试
$ go test -v -count=1 .
# 输出如下
=== RUN TestQuickStart
...
[GIN-debug] Listening and serving HTTP on :8080
# 前两个请求错误
[GIN] 2023/03/03 - 12:02:03 | 500 | 16.7µs | 127.0.0.1 | GET "/ping"
fallback err: err code: 500 Internal Server Error
[GIN] 2023/03/03 - 12:02:03 | 500 | 22.6µs | 127.0.0.1 | GET "/ping"
fallback err: err code: 500 Internal Server Error
[GIN] 2023/03/03 - 12:02:04 | 200 | 33.9µs | 127.0.0.1 | GET "/ping"
[GIN] 2023/03/03 - 12:02:04 | 200 | 20.9µs | 127.0.0.1 | GET "/ping"
[GIN] 2023/03/03 - 12:02:04 | 200 | 22.6µs | 127.0.0.1 | GET "/ping"
# 熔断开启
fallback err: hystrix: circuit open
fallback err: hystrix: circuit open
fallback err: hystrix: circuit open
fallback err: hystrix: circuit open
fallback err: hystrix: circuit open
# 熔断重试
[GIN] 2023/03/03 - 12:02:04 | 200 | 22.6µs | 127.0.0.1 | GET "/ping"
[GIN] 2023/03/03 - 12:02:04 | 200 | 22.3µs | 127.0.0.1 | GET "/ping"
[GIN] 2023/03/03 - 12:02:05 | 200 | 16.3µs | 127.0.0.1 | GET "/ping"
[GIN] 2023/03/03 - 12:02:05 | 200 | 18.9µs | 127.0.0.1 | GET "/ping"
[GIN] 2023/03/03 - 12:02:05 | 200 | 26.5µs | 127.0.0.1 | GET "/ping"
--- PASS: TestQuickStart (1.52s)
PASS
ok Golang-Patterns/hystrix 1.532s
通过测试的数据结果,我们可以看到,熔断
执行流程和上面描述的逻辑一致,接下来,我们研究一下 hystrix-go
的内部实现。
算法实现
熔断配置对象
各字段代表的含义,请参照刚才示例代码中的注释。
type CommandConfig struct {
Timeout int `json:"timeout"`
MaxConcurrentRequests int `json:"max_concurrent_requests"`
RequestVolumeThreshold int `json:"request_volume_threshold"`
SleepWindow int `json:"sleep_window"`
ErrorPercentThreshold int `json:"error_percent_threshold"`
}
创建熔断配置
ConfigureCommand
方法根据参数创建一个 熔断
配置对象,如果对象中的某些字段参数未提供,则适用默认值替代。
func ConfigureCommand(name string, config CommandConfig) {
settingsMutex.Lock()
defer settingsMutex.Unlock()
timeout := DefaultTimeout
if config.Timeout != 0 {
timeout = config.Timeout
}
max := DefaultMaxConcurrent
...
volume := DefaultVolumeThreshold
...
sleep := DefaultSleepWindow
...
errorPercent := DefaultErrorPercentThreshold
...
circuitSettings[name] = &Settings{
...
}
}
熔断器对象
CircuitBreaker
表示单个请求对应的 熔断器
对象,对象可以验证请求是否触发了 熔断
机制,以及是否应该拒绝该请求继续访问。
type CircuitBreaker struct {
Name string
open bool
forceOpen bool
mutex *sync.RWMutex
openedOrLastTestedTime int64
executorPool *executorPool
metrics *metricExchange
}
GetCircuit 函数
GetCircuit
函数根据名称返回对应的 熔断器
对象以及该名称函数调用是否触发了 熔断
机制。
func GetCircuit(name string) (*CircuitBreaker, bool, error) {
circuitBreakersMutex.RLock()
_, ok := circuitBreakers[name]
if !ok {
circuitBreakersMutex.RUnlock()
// 从读写锁切换到写锁
circuitBreakersMutex.Lock()
defer circuitBreakersMutex.Unlock()
// 双重检测,防止在加锁期间其他 goroutine 更新了对象
if cb, ok := circuitBreakers[name]; ok {
// 代码执行到这里,说明在加锁期间其他 goroutine 触发了熔断机制
// 所以第二个返回值返回 false
return cb, false, nil
}
circuitBreakers[name] = newCircuitBreaker(name)
} else {
defer circuitBreakersMutex.RUnlock()
}
return circuitBreakers[name], !ok, nil
}
Do 函数
Do
函数以阻塞的方式运行参数函数,直到函数返回成功或者错误 (包括触发了熔断),具体的执行工作是由 DoC
函数和 GoC
函数完成的。
func Do(name string, run runFunc, fallback fallbackFunc) error {
// 包装 run 参数函数
runC := func(ctx context.Context) error {
return run()
}
var fallbackC fallbackFuncC
if fallback != nil {
// 包装 fallback 参数函数
fallbackC = func(ctx context.Context, err error) error {
return fallback(err)
}
}
return DoC(context.Background(), name, runC, fallbackC)
}
DoC 函数
DoC
函数负责参数的执行前封装工作,Goc
函数负责参数的具体执行工作。
func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
// 接收通道缓冲为 1
// 函数的执行结果只可能是下面一种:
// 返回成功
// 返回错误 (包括触发了熔断)
done := make(chan struct{}, 1)
// 将 run 参数函数的执行过程包装一下
// 如果函数返回成功,就发送信号给通道,否则返回错误
r := func(ctx context.Context) error {
err := run(ctx)
if err != nil {
return err
}
done <- struct{}{}
return nil
}
// 将 fallback 参数函数的执行过程包装一下
// 如果函数返回错误,就发送信号给通道
f := func(ctx context.Context, e error) error {
err := fallback(ctx, e)
if err != nil {
return err
}
done <- struct{}{}
return nil
}
// 执行参数函数,委托给 GoC 函数执行
var errChan chan error
if fallback == nil {
errChan = GoC(ctx, name, r, nil)
} else {
errChan = GoC(ctx, name, r, f)
}
select {
case <-done:
// 代码执行到这里,说明函数返回成功
return nil
case err := <-errChan:
// 代码执行到这里,说明函数返回错误
return err
}
}
GoC 函数
GoC
运行参数函数,同时跟踪该函数历史运行情况。如果参数函数触发了 熔断
开启,则必须等待服务恢复。
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
cmd := &command{
run: run,
fallback: fallback,
start: time.Now(),
errChan: make(chan error, 1),
finished: make(chan bool, 1),
}
circuit, _, err := GetCircuit(name)
if err != nil {
// 获取该请求对应的熔断器对象发生错误时,直接返回
cmd.errChan <- err
return cmd.errChan
}
...
// 当请求返回时,归还请求 ticket 到池中
returnTicket := func() {
...
cmd.circuit.executorPool.Return(cmd.ticket)
}
// 该对象由两个 goroutine 共享
// 两个 goroutine :
// 一个负责具体的函数执行 (熔断有可能已经开启了)
// 一个负责监听函数执行结果
// 不管哪个先执行完成,报告熔断器相关 metric
returnOnce := &sync.Once{}
go func() {
defer func() { cmd.finished <- true }()
// 熔断已经开启了
// 拒绝当前请求,等待服务恢复
if !cmd.circuit.AllowRequest() {
...
returnOnce.Do(func() {
cmd.errorWithFallback(ctx, ErrCircuitOpen)
})
return
}
// 当后端 (被请求方,一般指上游服务) 服务不稳定时,请求会花费更多时间,但不一定每次都会失败
// 当请求变慢但 QPS 不变时,需要限制并发数量,降低后端服务的负载
cmd.Lock()
select {
case cmd.ticket = <-circuit.executorPool.Tickets:
// 请求量没有超过并发限制
...
cmd.Unlock()
default:
// 请求量超过了并发限制,返回错误
...
returnOnce.Do(func() {
cmd.errorWithFallback(ctx, ErrMaxConcurrency)
})
return
}
runErr := run(ctx) // 执行参数函数
returnOnce.Do(func() {
if runErr != nil {
cmd.errorWithFallback(ctx, runErr)
return
}
cmd.reportEvent("success")
})
}()
go func() {
timer := time.NewTimer(getSettings(name).Timeout)
defer timer.Stop()
select {
case <-cmd.finished:
// 上面的 goroutine 已经上报数据,这里无需再上报
case <-ctx.Done():
// 上下文错误
returnOnce.Do(func() {
cmd.errorWithFallback(ctx, ctx.Err())
})
case <-timer.C:
// 执行超时
returnOnce.Do(func() {
cmd.errorWithFallback(ctx, ErrTimeout)
})
}
}()
return cmd.errChan
}
Do 函数调用链路
最后,我们来看一下 熔断
开启与状态判断机制的内部实现。
AllowRequest 方法
AllowRequest
方法在具体的请求执行之前,先判断 熔断
是否已经开启,当 熔断
开启时,熔断
时间超过等待外部服务恢复时间时返回 ture
。
func (circuit *CircuitBreaker) AllowRequest() bool {
return !circuit.IsOpen() || circuit.allowSingleTest()
}
IsOpen 方法
IsOpen
方法在具体的请求执行之前,根据 熔断
是否以开启确定是否已经拒绝请求执行。
func (circuit *CircuitBreaker) IsOpen() bool {
circuit.mutex.RLock()
o := circuit.forceOpen || circuit.open
circuit.mutex.RUnlock()
if o {
// 熔断已开启
return true
}
if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
// 未达到开启熔断需要的最小请求数量
return false
}
if !circuit.metrics.IsHealthy(time.Now()) {
// 请求失败百分比超过阈值,开启熔断
circuit.setOpen()
return true
}
return false
}
allowSingleTest 方法
allowSingleTest
方法判断 熔断
时间是否已经超过重试服务是否恢复的等待时间。
func (circuit *CircuitBreaker) allowSingleTest() bool {
...
now := time.Now().UnixNano()
openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)
if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
...
return true
}
return false
}
通过上面三个方法实现代码可以看到: 熔断
开启与状态判断是每次请求到来时实时判断的。
小结
本文描述了 熔断
的基本概念以及 熔断
和 限流
、降级
之间的区别,同时借助开源的 afex/hystrix-go 组件源代码,
研究了如何使用 Go
语言实现一个 熔断器
组件,感兴趣的读者可以阅读下列文章,了解下其他开源 熔断器
组件是如何实现的。