蛮荆

微服务中的熔断算法

2023-07-24

雪崩效应

在微服务系统中,整个系统是以一系列功能独立的微服务组成,如果某一个服务,因为流量异常或者内部其他原因,导致响应异常,那么该服务会影响到其下游服务, 从而发生一系列连锁反应,最终导致整个系统崩溃,这就是微服务中的 雪崩效应

例如:当前系统中有 ABC 三个服务,服务 A 是上游,服务 B 是中游,服务 C 是下游。 一旦下游服务 C 变得不可用,积压了大量请求,服务 B 的请求也随之阻塞,资源逐渐耗尽,使得服务 B 也变得不可用。 最后,服务 A 也变为不可用,整个系统链路崩溃。

图片来源: https://klotzandrew.com/blog/api-health-checks-for-graceful-or-cascading-failure

熔断

熔断 机制是微服务调用链路中的的自我保护机制,当链路中某个服务响应时间过长甚至不可用时,会进行服务 熔断,快速返回错误响应,停止 级联故障,避免 雪崩效应

熔断、限流、降级区别

限流 是针对服务请求数量的一种自我保护机制,当请求数量超出服务负载时,自动丢弃新的请求,是系统高可用架构的第一步。 服务有了 限流 之后,为什么还需要 熔断 呢? 限流 面向的是上游的服务,而 熔断 面向的是下游的服务。

降级 通过将不重要的服务暂停,提高系统负载能力。例如电商的 秒杀 场景中,可以暂停 用户好友关系, 用户信息 等服务。

触发条件 面向目标
熔断 下游服务不可用 下游
降级 服务自身负载高 自身
限流 上游服务请求多 上游

hystrix-go

HystrixNetflix 开源的由 Java 开发的 熔断器 组件,对应的 Go 版本为 afex/hystrix-go, 笔者选择该组件作为研究 熔断 算法代码实现。

三个状态

状态 说明
关闭 熔断关闭时所有的请求都会被接收
开启 熔断开启时所有的请求都会被拒绝
半打开          熔断开启时一段时间之后,尝试接收一个请求,确定下游服务是否已恢复,如果这个请求正常返回,熔断自动关闭状态,否则熔断回退到开启状态

图片来源: https://medium.com/@narengowda/what-is-circuitbreaking-in-microservices-2053f4f66882

这里需要注意的是: afex/hystrix-go 的实现中并没有 半打开 的状态,也就是说,一旦 熔断 开启后, 只能等待配置的时间之后,才能去主动判定下游服务是否已经恢复,继而恢复请求。笔者认为这个不是重要的部分,如果读者比较介意的话, 可以参考 引用部分 的另外两个 熔断 开源组件。

示例代码

主流程代码

package hystrix

import (
	"fmt"
	"net/http"
	"sync/atomic"
	"testing"
	"time"

	"github.com/afex/hystrix-go/hystrix"
	"github.com/gin-gonic/gin"
	"github.com/go-resty/resty/v2"
)

func server() {
	r := gin.Default()

	// 服务请求计数器
	var count int64

	r.GET("/ping", func(ctx *gin.Context) {
		// 模拟服务故障,前两次请求返回错误
		if atomic.AddInt64(&count, 1) < 3 {
			ctx.String(http.StatusInternalServerError, "pong")
			return
		}
		// 后面的请求正常返回
		ctx.String(http.StatusOK, "pong")
	})

	// 通过 http://localhost:8080/ping 访问
	_ = r.Run(":8080")
}

func TestQuickStart(t *testing.T) {
	// 启动服务
	go server()

	hystrix.ConfigureCommand("test-api-ping", hystrix.CommandConfig{
		// 执行命令超时时间, 默认值 1 秒
		Timeout: 0,

		// 最大并发请求量, 默认值 10
		MaxConcurrentRequests: 100,

		// 熔断开启前需要达到的最小请求数量, 默认值 20
		RequestVolumeThreshold: 5,

		// 熔断器开启后,重试服务是否恢复的等待时间,默认值 5 秒
		// 这里修改为 0.5 秒
		SleepWindow: 500,

		// 请求错误百分比阈值,超过阈值后熔断开启
		ErrorPercentThreshold: 20,
	})
	
	for i := 0; i < 20; i++ {
		_ = hystrix.Do("test-api-ping", func() error {
			resp, _ := resty.New().R().Get("http://localhost:8080/ping")
			if resp.IsError() {
				return fmt.Errorf("err code: %s", resp.Status())
			}
			return nil
		}, func(err error) error {
			fmt.Println("fallback err: ", err)
			return err
		})

		// 每次请求之间休眠 0.1 秒
		time.Sleep(100 * time.Millisecond)
	}
}

主流程代码逻辑描述

我们通过修改 hystrix 的默认配置,期望达到以下的熔断效果:

  • 前两个请求直接返回错误
  • 此时错误百分比达到 100%, 但是还未达到开启 熔断 的最小请求数量 (5)
  • 继续发出请求,接下来的 3 个请求全部成功,此时请求数量达到开启 熔断 的最小请求数量
  • 开启 熔断
  • 继续发出请求,接下来的 5 个请求全部返回 fallback error, 同时每次请求之间间隔 0.1 秒
  • 此时达到开启 熔断 后,重试服务是否恢复的等待时间 (0.5 秒)
  • 继续发出请求,接下来的 5 个请求全部成功

运行测试

$ go test -v -count=1 .

# 输出如下
=== RUN   TestQuickStart

...

[GIN-debug] Listening and serving HTTP on :8080

# 前两个请求错误
[GIN] 2023/03/03 - 12:02:03 | 500 |        16.7µs |       127.0.0.1 | GET      "/ping"
fallback err:  err code: 500 Internal Server Error
[GIN] 2023/03/03 - 12:02:03 | 500 |        22.6µs |       127.0.0.1 | GET      "/ping"
fallback err:  err code: 500 Internal Server Error
[GIN] 2023/03/03 - 12:02:04 | 200 |        33.9µs |       127.0.0.1 | GET      "/ping"
[GIN] 2023/03/03 - 12:02:04 | 200 |        20.9µs |       127.0.0.1 | GET      "/ping"
[GIN] 2023/03/03 - 12:02:04 | 200 |        22.6µs |       127.0.0.1 | GET      "/ping"

# 熔断开启
fallback err:  hystrix: circuit open
fallback err:  hystrix: circuit open
fallback err:  hystrix: circuit open
fallback err:  hystrix: circuit open
fallback err:  hystrix: circuit open

# 熔断重试
[GIN] 2023/03/03 - 12:02:04 | 200 |        22.6µs |       127.0.0.1 | GET      "/ping"
[GIN] 2023/03/03 - 12:02:04 | 200 |        22.3µs |       127.0.0.1 | GET      "/ping"
[GIN] 2023/03/03 - 12:02:05 | 200 |        16.3µs |       127.0.0.1 | GET      "/ping"
[GIN] 2023/03/03 - 12:02:05 | 200 |        18.9µs |       127.0.0.1 | GET      "/ping"
[GIN] 2023/03/03 - 12:02:05 | 200 |        26.5µs |       127.0.0.1 | GET      "/ping"
--- PASS: TestQuickStart (1.52s)
PASS
ok      Golang-Patterns/hystrix 1.532s

通过测试的数据结果,我们可以看到,熔断 执行流程和上面描述的逻辑一致,接下来,我们研究一下 hystrix-go 的内部实现。

算法实现

熔断配置对象

各字段代表的含义,请参照刚才示例代码中的注释。

type CommandConfig struct {
	Timeout                int `json:"timeout"`
	MaxConcurrentRequests  int `json:"max_concurrent_requests"`
	RequestVolumeThreshold int `json:"request_volume_threshold"`
	SleepWindow            int `json:"sleep_window"`
	ErrorPercentThreshold  int `json:"error_percent_threshold"`
}

创建熔断配置

ConfigureCommand 方法根据参数创建一个 熔断 配置对象,如果对象中的某些字段参数未提供,则适用默认值替代。

func ConfigureCommand(name string, config CommandConfig) {
    settingsMutex.Lock()
    defer settingsMutex.Unlock()

	timeout := DefaultTimeout
	if config.Timeout != 0 {
		timeout = config.Timeout
	}

	max := DefaultMaxConcurrent
    ...

	volume := DefaultVolumeThreshold
    ...

	sleep := DefaultSleepWindow
    ...

	errorPercent := DefaultErrorPercentThreshold
    ...

	circuitSettings[name] = &Settings{
        ...
	}
}

熔断器对象

CircuitBreaker 表示单个请求对应的 熔断器 对象,对象可以验证请求是否触发了 熔断 机制,以及是否应该拒绝该请求继续访问。

type CircuitBreaker struct {
	Name                   string
	open                   bool
	forceOpen              bool
	mutex                  *sync.RWMutex
	openedOrLastTestedTime int64

	executorPool *executorPool
	metrics      *metricExchange
}

GetCircuit 函数

GetCircuit 函数根据名称返回对应的 熔断器 对象以及该名称函数调用是否触发了 熔断 机制。

func GetCircuit(name string) (*CircuitBreaker, bool, error) {
	circuitBreakersMutex.RLock()
	_, ok := circuitBreakers[name]
	if !ok {
		circuitBreakersMutex.RUnlock()
		// 从读写锁切换到写锁
		circuitBreakersMutex.Lock()
		defer circuitBreakersMutex.Unlock()
        // 双重检测,防止在加锁期间其他 goroutine 更新了对象
		if cb, ok := circuitBreakers[name]; ok {
			// 代码执行到这里,说明在加锁期间其他 goroutine 触发了熔断机制
			// 所以第二个返回值返回 false
			return cb, false, nil
		}
		circuitBreakers[name] = newCircuitBreaker(name)
	} else {
		defer circuitBreakersMutex.RUnlock()
	}

	return circuitBreakers[name], !ok, nil
}

Do 函数

Do 函数以阻塞的方式运行参数函数,直到函数返回成功或者错误 (包括触发了熔断),具体的执行工作是由 DoC 函数和 GoC 函数完成的。

func Do(name string, run runFunc, fallback fallbackFunc) error {
	// 包装 run 参数函数
	runC := func(ctx context.Context) error {
		return run()
	}
	var fallbackC fallbackFuncC
	if fallback != nil {
		// 包装 fallback 参数函数
		fallbackC = func(ctx context.Context, err error) error {
			return fallback(err)
		}
	}
	return DoC(context.Background(), name, runC, fallbackC)
}

DoC 函数

DoC 函数负责参数的执行前封装工作,Goc 函数负责参数的具体执行工作。

func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
	// 接收通道缓冲为 1
	// 函数的执行结果只可能是下面一种: 
	//  返回成功
	//  返回错误 (包括触发了熔断)
	done := make(chan struct{}, 1)

	// 将 run 参数函数的执行过程包装一下
	// 如果函数返回成功,就发送信号给通道,否则返回错误
	r := func(ctx context.Context) error {
		err := run(ctx)
		if err != nil {
			return err
		}

		done <- struct{}{}
		return nil
	}

    // 将 fallback 参数函数的执行过程包装一下
    // 如果函数返回错误,就发送信号给通道
	f := func(ctx context.Context, e error) error {
		err := fallback(ctx, e)
		if err != nil {
			return err
		}

		done <- struct{}{}
		return nil
	}

	// 执行参数函数,委托给 GoC 函数执行
	var errChan chan error
	if fallback == nil {
		errChan = GoC(ctx, name, r, nil)
	} else {
		errChan = GoC(ctx, name, r, f)
	}

	select {
	case <-done:
		// 代码执行到这里,说明函数返回成功
		return nil
	case err := <-errChan:
        // 代码执行到这里,说明函数返回错误
		return err
	}
}

GoC 函数

GoC 运行参数函数,同时跟踪该函数历史运行情况。如果参数函数触发了 熔断 开启,则必须等待服务恢复。

func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
	cmd := &command{
		run:      run,
		fallback: fallback,
		start:    time.Now(),
		errChan:  make(chan error, 1),
		finished: make(chan bool, 1),
	}
	
	circuit, _, err := GetCircuit(name)
	if err != nil {
		// 获取该请求对应的熔断器对象发生错误时,直接返回
		cmd.errChan <- err
		return cmd.errChan
	}
	
    ...
	
	// 当请求返回时,归还请求 ticket 到池中
	returnTicket := func() {
        ...
		cmd.circuit.executorPool.Return(cmd.ticket)
	}
	
	// 该对象由两个 goroutine 共享
	// 两个 goroutine : 
	//  一个负责具体的函数执行 (熔断有可能已经开启了)
	//  一个负责监听函数执行结果
	// 不管哪个先执行完成,报告熔断器相关 metric
	returnOnce := &sync.Once{}

	go func() {
		defer func() { cmd.finished <- true }()

		// 熔断已经开启了
		// 拒绝当前请求,等待服务恢复
		if !cmd.circuit.AllowRequest() {
			...
			returnOnce.Do(func() {
				cmd.errorWithFallback(ctx, ErrCircuitOpen)
			})
			return
		}

		// 当后端 (被请求方,一般指上游服务) 服务不稳定时,请求会花费更多时间,但不一定每次都会失败
		// 当请求变慢但 QPS 不变时,需要限制并发数量,降低后端服务的负载
		cmd.Lock()
		select {
		case cmd.ticket = <-circuit.executorPool.Tickets:
			// 请求量没有超过并发限制
			...
			cmd.Unlock()
		default:
            // 请求量超过了并发限制,返回错误
			...
			returnOnce.Do(func() {
				cmd.errorWithFallback(ctx, ErrMaxConcurrency)
			})
			return
		}

		runErr := run(ctx)  // 执行参数函数
		returnOnce.Do(func() {
			if runErr != nil {
				cmd.errorWithFallback(ctx, runErr)
				return
			}
			cmd.reportEvent("success")
		})
	}()

	go func() {
		timer := time.NewTimer(getSettings(name).Timeout)
		defer timer.Stop()

		select {
		case <-cmd.finished:
			// 上面的 goroutine 已经上报数据,这里无需再上报
		case <-ctx.Done():
			// 上下文错误
			returnOnce.Do(func() {
				cmd.errorWithFallback(ctx, ctx.Err())
			})
		case <-timer.C:
			// 执行超时
			returnOnce.Do(func() {
				cmd.errorWithFallback(ctx, ErrTimeout)
			})
		}
	}()

	return cmd.errChan
}

Do 函数调用链路

Do 函数调用链路


最后,我们来看一下 熔断 开启与状态判断机制的内部实现。


AllowRequest 方法

AllowRequest 方法在具体的请求执行之前,先判断 熔断 是否已经开启,当 熔断 开启时,熔断 时间超过等待外部服务恢复时间时返回 ture

func (circuit *CircuitBreaker) AllowRequest() bool {
	return !circuit.IsOpen() || circuit.allowSingleTest()
}

IsOpen 方法

IsOpen 方法在具体的请求执行之前,根据 熔断 是否以开启确定是否已经拒绝请求执行。

func (circuit *CircuitBreaker) IsOpen() bool {
	circuit.mutex.RLock()
	o := circuit.forceOpen || circuit.open
	circuit.mutex.RUnlock()

	if o {
		// 熔断已开启
		return true
	}

	if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold {
		// 未达到开启熔断需要的最小请求数量 
		return false
	}

	if !circuit.metrics.IsHealthy(time.Now()) {
		// 请求失败百分比超过阈值,开启熔断
		circuit.setOpen()
		return true
	}

	return false
}

allowSingleTest 方法

allowSingleTest 方法判断 熔断 时间是否已经超过重试服务是否恢复的等待时间。

func (circuit *CircuitBreaker) allowSingleTest() bool {
	...

	now := time.Now().UnixNano()
	openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime)
	if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() {
		...
		
		return true
	}

	return false
}

通过上面三个方法实现代码可以看到: 熔断 开启与状态判断是每次请求到来时实时判断的。

AllowRequest 方法调用链路

小结

本文描述了 熔断 的基本概念以及 熔断限流降级 之间的区别,同时借助开源的 afex/hystrix-go 组件源代码, 研究了如何使用 Go 语言实现一个 熔断器 组件,感兴趣的读者可以阅读下列文章,了解下其他开源 熔断器 组件是如何实现的。

Reference

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。