Go 并发模式
前言
文章代码量较多,如果是初次阅读,建议了解每种模式的基础用法即可,工作中有实际的应用开发场景时,再回来研究代码细节。
经典模式 (pipeline + selector)
这里有一个来自 Go 官方博客的例子,通过管道筛选质数并打印。
第一个版本
package main
import "fmt"
// 生成数字并发送到通道
func generate(ch chan int) {
for i := 2; ; i++ {
ch <- i
}
}
// 通过参数质数过滤 in 通道传递的数字
func filter(in, out chan int, prime int) {
for {
i := <-in
if i%prime != 0 {
out <- i
}
}
}
func main() {
ch := make(chan int)
go generate(ch)
for {
// 不断生成数字和筛选管道
prime := <-ch
fmt.Print(prime, " ")
ch1 := make(chan int)
go filter(ch, ch1, prime)
ch = ch1
}
}
第二个版本
第二个版本在之前的基础上进行了改进:sieve
, generate
, filter
改为工厂函数,创建通道并返回,而且使用了协程的 lambda
函数。
main
函数变得更加短小清晰:调用 sieve()
返回包含质数的通道,然后打印即可。
package main
import (
"fmt"
)
func generate() chan int {
ch := make(chan int)
go func() {
for i := 2; ; i++ {
ch <- i
}
}()
return ch
}
func filter(in chan int, prime int) chan int {
out := make(chan int)
go func() {
for {
if i := <-in; i%prime != 0 {
out <- i
}
}
}()
return out
}
func sieve() chan int {
out := make(chan int)
go func() {
ch := generate()
for {
prime := <-ch
ch = filter(ch, prime)
out <- prime
}
}()
return out
}
func main() {
primes := sieve()
for {
fmt.Println(<-primes)
}
}
FanIn FanOut
Fan-In: 1 个 goroutine 从多个通道读取数据 (一对多) Fan-Out: 多个 goroutine 从 1 个通道读取数据 (多对一)
官网上有一个经典用例 质数求和
,同时用到了这两种模式。
package main
import (
"math"
"sync"
"time"
)
func echo(numbs []int) <-chan int {
out := make(chan int)
go func() {
for _, n := range numbs {
out <- n
}
close(out)
}()
return out
}
// 求和函数
func sum(in <-chan int) <-chan int {
out := make(chan int)
go func() {
res := 0
for n := range in {
res += n
}
out <- res
close(out)
}()
return out
}
func makeRange(min, max int) []int {
a := make([]int, max-min+1)
for i := range a {
a[i] = min + i
}
return a
}
func isPrime(value int) bool {
for i := 2; i <= int(math.Floor(float64(value)/2)); i++ {
if value%i == 0 {
return false
}
}
return value > 1
}
func prime(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if isPrime(n) {
out <- n
}
}
close(out)
}()
return out
}
func merge(cs []<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
wg.Add(len(cs))
for _, c := range cs {
go func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
nums := makeRange(1, 10)
in := echo(nums)
// Fan-Out
var cs [4]<-chan int
for i := range cs {
cs[i] = sum(prime(in))
}
// Fan-In
out := sum(merge(cs[:]))
println(<-out)
time.Sleep(time.Second) // 等待 out 通道关闭
}
Pipeline
管道 (pipeline)
是由通道连接的一系列阶段,其中每个阶段都是一组运行相同功能的 goroutine
,例如 Linux
中的管道命令:
$ ps –ef | grep systemd | awk '{print $2}'
package main
import "fmt"
func main() {
// 数值生成器管道
generator := func(done <-chan int, nums []int) <-chan int {
intStream := make(chan int)
go func() {
defer close(intStream)
for _, n := range nums {
select {
case <-done:
return
case intStream <- n:
}
}
}()
return intStream
}
// 数值相乘管道
multiply := func(done <-chan int, intStream <-chan int, multiplier int) <-chan int {
multiplyStream := make(chan int)
go func() {
defer close(multiplyStream)
for i := range intStream {
select {
case <-done:
return
case multiplyStream <- i * multiplier:
}
}
}()
return multiplyStream
}
// 数值相加管道
add := func(done <-chan int, intStream <-chan int, addition int) <-chan int {
addedStream := make(chan int)
go func() {
defer close(addedStream)
for i := range intStream {
select {
case <-done:
return
case addedStream <- i + addition:
}
}
}()
return addedStream
}
done := make(chan int)
defer close(done)
intStream := generator(done, []int{1, 2, 3})
// 执行了类似 Linux 中的管道命令: generator | multiply | add
pipeline := add(done, multiply(done, intStream, 2), 1)
for p := range pipeline {
fmt.Println(p)
}
}
Generate
生成者模式 (Generate)
像 yield
一样,生成一系列连续的值。
package main
func Count(start int, end int) chan int {
ch := make(chan int)
go func(ch chan int) {
for i := start; i <= end; i++ {
ch <- i
}
close(ch)
}(ch)
return ch
}
func main() {
for i := range Count(1, 5) {
println(i)
}
}
// $ go run main.go
// 1
// 2
// 3
// 4
// 5
生产者/消费者
package main
import "fmt"
// 生产者
func produce(start, count int, out chan<- int) {
for i := 0; i < count; i++ {
out <- start
start = start + count
}
close(out)
}
// 消费者
func consume(in <-chan int, done chan<- bool) {
for num := range in {
fmt.Printf("%d\n", num)
}
done <- true
}
func main() {
numChan := make(chan int)
done := make(chan bool)
go produce(0, 10, numChan)
go consume(numChan, done)
<-done
}
// $ go run ma1in.go
// 输出如下
// 0
// 10
// 20
// 30
// 40
// 50
// 60
// 70
// 80
// 90
信号量模式
信号量
是一种同步原语,对数量有限的资源的访问进行控制。
接口
type Interface interface {
Acquire() error
Release() error
}
资源信号量
package main
var (
ErrNoTickets = errors.New("semaphore: could not aquire semaphore")
ErrIllegalRelease = errors.New("semaphore: can't release the semaphore without acquiring it first")
)
type implementation struct {
sem chan struct{}
timeout time.Duration
}
// Acquire 请求资源
func (s *implementation) Acquire() error {
select {
case s.sem <- struct{}{}:
return nil
case <-time.After(s.timeout):
return ErrNoTickets
}
}
// Release 释放资源
func (s *implementation) Release() error {
select {
case _ = <-s.sem:
return nil
case <-time.After(s.timeout):
return ErrIllegalRelease
}
return nil
}
func New(tickets int, timeout time.Duration) Interface {
return &implementation{
sem: make(chan struct{}, tickets),
timeout: timeout,
}
}
超时信号
tickets, timeout := 1, 3*time.Second
s := semaphore.New(tickets, timeout)
if err := s.Acquire(); err != nil {
panic(err)
}
// do something
if err := s.Release(); err != nil {
panic(err)
}
无超时信号 (非阻塞)
tickets, timeout := 0, 0
s := semaphore.New(tickets, timeout)
if err := s.Acquire(); err != nil {
if err != semaphore.ErrNoTickets {
panic(err)
}
os.Exit(1)
}
流式计算
package main
import (
"fmt"
"math"
"runtime"
)
const NCPU = 2
func main() {
runtime.GOMAXPROCS(2)
fmt.Println(CalculatePi(5000))
}
func CalculatePi(end int) float64 {
ch := make(chan float64)
for i := 0; i < NCPU; i++ {
// 启动 2 个 goroutine
// 1 个 goroutine 计算 k (0 -> 2500)
// 1 个 goroutine 计算 k (2500 -> 5000)
go term(ch, i*end/NCPU, (i+1)*end/NCPU)
}
result := 0.0
for i := 0; i < NCPU; i++ {
result += <-ch
}
return result
}
// 计算公式展开式
func term(ch chan float64, start, end int) {
result := 0.0
for i := start; i < end; i++ {
x := float64(i)
result += 4 * (math.Pow(-1, x) / (2.0*x + 1.0))
}
ch <- result
}
// $ go run ma1in.go
// 输出如下
// 3.1413926535917938
*/
简单 Master/Worker
对于任何可以建模为 Master-Worker
的问题,各个 Worker
通道和 Master
通信,如果系统是分布式部署的,各个工作节点充当 Worker
,
中央节点Master
和 Worker
之间使用 RPC
等协议进行通信。
package main
func main() {
pending, done := make(chan *Task), make(chan *Task)
go sendWork(pending)
for i := 0; i < N; i++ {
go Worker(pending, done)
}
consumeWork(done)
}
func Worker(in, out chan *Task) {
for {
t := <-in
process(t)
out <- t
}
}
Futures 模式
Futures
模式是指在使用某一个值之前需要先对其进行计算。这时可以在另一个 goroutine
进行该值的计算,到该值真正使用时就已经计算完毕了。
Futures
模式通过闭包和通道可以很容易实现,类似于生成器,不同地方在于 Futures
需要返回一个值。
假设我们有一个矩阵类型,我们需要计算两个矩阵 A 和 B 乘积的逆,首先我们通过函数 Inverse(M)
分别对其进行求逆运算,再将结果相乘。
func InverseProduct(a Matrix, b Matrix) {
a_inv_future := InverseFuture(a)
b_inv_future := InverseFuture(b)
a_inv := <-a_inv_future
b_inv := <-b_inv_future
return Product(a_inv, b_inv)
}
InverseFuture
函数以 goroutine
的形式起了一个闭包,该闭包会将矩阵求逆结果放入到 future
通道中:
func InverseFuture(a Matrix) chan Matrix {
future := make(chan Matrix)
go func() {
future <- Inverse(a)
}()
return future
}
当开发一个计算密集型库时,使用 Futures
模式设计API接口是很有意义的。在你的包使用 Futures
模式,且能保持友好的 API 接口。
此外,Futures
可以通过一个异步的 API 暴露出来。这样就可以用最小的成本将包中的并行计算移到用户代码中。
限制并发请求处理数量
使用带缓冲区的通道很容易实现,其缓冲区容量就是同时处理请求的最大数量。程序中超过 MAXREQS
的请求将不会被同时处理,
因为当 sem
通道表示缓冲区已满时,handle
函数会阻塞且不再处理其他请求,直到某个请求从 sem
通道中被移除。
package main
const MAXREQS = 50
var sem = make(chan int, MAXREQS)
type Request struct {
a, b int
replyc chan int
}
func process(r *Request) {
// do something
}
// 一进一出,一来一回,很巧妙
func handle(r *Request) {
sem <- 1 // doesn't matter what we put in it
process(r)
<-sem // one empty place in the buffer: the next request can start
}
func server(service chan *Request) {
for {
request := <-service
go handle(request)
}
}
func main() {
service := make(chan *Request)
go server(service)
}
状态模式
假设我们需要处理一些数量巨大且互不相关的数据项,它们从一个 in
通道被传递进来,当我们处理完以后又要将它们放入另一个 out
通道,
就像一个工厂流水线一样。处理每个数据项也可能包含许多步骤:Preprocess(预处理) / StepA(步骤A) / StepB(步骤B) / ... / PostProcess(后处理)
。
让每一个处理步骤作为一个 goroutine
独立工作,每一个步骤从上一步的输出通道中获得输入数据。
这种方式仅有极少数时间会被浪费,而大部分时间所有的步骤都在一直执行中。
单纯从流程描述的话,很像设计模式里面的 “状态模式”,核心是下一个数据依赖于上一个数据处理完成,通道的缓冲区大小可以调整优化。
func ParallelProcessData (in <-chan *Data, out chan<- *Data) {
preOut := make(chan *Data, 100)
stepAOut := make(chan *Data, 100)
stepBOut := make(chan *Data, 100)
stepCOut := make(chan *Data, 100)
go PreprocessData(in, preOut)
go ProcessStepA(preOut,StepAOut)
go ProcessStepB(StepAOut,StepBOut)
go ProcessStepC(StepBOut,StepCOut)
go PostProcessData(StepCOut,out)
}
链式调用
package main
// 反向执行过程很像递归
func f(left, right chan int) {
left <- 1 + <-right
}
func main() {
leftmost := make(chan int)
var left, right chan int = nil, leftmost
for i := 0; i < 100000; i++ {
left, right = right, make(chan int)
go f(left, right)
}
right <- 0 // bang!
x := <-leftmost // wait for completion
println(x) // 100000
}
当循环完成之后,一个 0 被写入到 最右边
的通道里,于是 100000
个 goroutine
开始顺序执行。