蛮荆

Go 并发模式

2023-04-18

前言

文章代码量较多,如果是初次阅读,建议了解每种模式的基础用法即可,工作中有实际的应用开发场景时,再回来研究代码细节

经典模式 (pipeline + selector)

这里有一个来自 Go 官方博客的例子,通过管道筛选质数并打印。

质数筛选器

第一个版本

package main

import "fmt"

// 生成数字并发送到通道
func generate(ch chan int) {
    for i := 2; ; i++ {
        ch <- i
    }
}

// 通过参数质数过滤 in 通道传递的数字
func filter(in, out chan int, prime int) {
    for {
        i := <-in 
        if i%prime != 0 {
            out <- i 
        }
    }
}

func main() {
    ch := make(chan int) 
    go generate(ch)      
	
    for {
		// 不断生成数字和筛选管道
        prime := <-ch
        fmt.Print(prime, " ")
        ch1 := make(chan int)
        go filter(ch, ch1, prime)
        ch = ch1
    }
}

第二个版本

第二个版本在之前的基础上进行了改进:sieve, generate, filter 改为工厂函数,创建通道并返回,而且使用了协程的 lambda 函数。 main 函数变得更加短小清晰:调用 sieve() 返回包含质数的通道,然后打印即可。

package main

import (
    "fmt"
)

func generate() chan int {
    ch := make(chan int)
    go func() {
        for i := 2; ; i++ {
            ch <- i
        }
    }()
    return ch
}

func filter(in chan int, prime int) chan int {
    out := make(chan int)
    go func() {
        for {
            if i := <-in; i%prime != 0 {
                out <- i
            }
        }
    }()
    return out
}

func sieve() chan int {
    out := make(chan int)
    go func() {
        ch := generate()
        for {
            prime := <-ch
            ch = filter(ch, prime)
            out <- prime
        }
    }()
    return out
}

func main() {
    primes := sieve()
    for {
        fmt.Println(<-primes)
    }
}

质数筛选器动图

FanIn FanOut

Fan-In: 1 个 goroutine 从多个通道读取数据 (一对多) Fan-Out: 多个 goroutine 从 1 个通道读取数据 (多对一)

图片来源: https://jguer.space/posts/go-fanout-context/

官网上有一个经典用例 质数求和,同时用到了这两种模式。

package main

import (
	"math"
	"sync"
	"time"
)

func echo(numbs []int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range numbs {
			out <- n
		}
		close(out)
	}()
	return out
}

// 求和函数
func sum(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		res := 0
		for n := range in {
			res += n
		}
		out <- res
		close(out)
	}()
	return out
}

func makeRange(min, max int) []int {
	a := make([]int, max-min+1)
	for i := range a {
		a[i] = min + i
	}
	return a
}

func isPrime(value int) bool {
	for i := 2; i <= int(math.Floor(float64(value)/2)); i++ {
		if value%i == 0 {
			return false
		}
	}
	return value > 1
}

func prime(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			if isPrime(n) {
				out <- n
			}
		}
		close(out)
	}()
	return out
}

func merge(cs []<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)
	wg.Add(len(cs))

	for _, c := range cs {
		go func(c <-chan int) {
			for n := range c {
				out <- n
			}
			wg.Done()
		}(c)
	}

	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	nums := makeRange(1, 10)
	in := echo(nums)

	// Fan-Out
	var cs [4]<-chan int
	for i := range cs {
		cs[i] = sum(prime(in))
	}

	// Fan-In
	out := sum(merge(cs[:]))
	println(<-out)

	time.Sleep(time.Second) // 等待 out 通道关闭
}

Pipeline

管道 (pipeline) 是由通道连接的一系列阶段,其中每个阶段都是一组运行相同功能的 goroutine,例如 Linux 中的管道命令:

$ ps –ef | grep systemd | awk '{print $2}'

图片来源: https://medium.com/@eric.g.yuan/go-concurrency-patterns-pipeline-2845d84bd92d

package main

import "fmt"

func main() {
    // 数值生成器管道
	generator := func(done <-chan int, nums []int) <-chan int {
		intStream := make(chan int)

		go func() {
			defer close(intStream)

			for _, n := range nums {
				select {
				case <-done:
					return
				case intStream <- n:
				}
			}
		}()

		return intStream
	}

	// 数值相乘管道
	multiply := func(done <-chan int, intStream <-chan int, multiplier int) <-chan int {
		multiplyStream := make(chan int)

		go func() {
			defer close(multiplyStream)

			for i := range intStream {
				select {
				case <-done:
					return
				case multiplyStream <- i * multiplier:
				}
			}

		}()

		return multiplyStream
	}

	// 数值相加管道
	add := func(done <-chan int, intStream <-chan int, addition int) <-chan int {
		addedStream := make(chan int)

		go func() {
			defer close(addedStream)

			for i := range intStream {
				select {
				case <-done:
					return
				case addedStream <- i + addition:
				}
			}

		}()

		return addedStream
	}

	done := make(chan int)
	defer close(done)

	intStream := generator(done, []int{1, 2, 3})
	// 执行了类似 Linux 中的管道命令: generator | multiply | add
	pipeline := add(done, multiply(done, intStream, 2), 1)

	for p := range pipeline {
		fmt.Println(p)
	}
}

Generate

生成者模式 (Generate)yield 一样,生成一系列连续的值。

package main

func Count(start int, end int) chan int {
    ch := make(chan int)

    go func(ch chan int) {
        for i := start; i <= end; i++ {
            ch <- i
        }

		close(ch)
	}(ch)

	return ch
}

func main() {
	for i := range Count(1, 5) {
		println(i)
	}
}

// $ go run main.go
// 1
// 2
// 3
// 4
// 5

生产者/消费者

package main

import "fmt"

// 生产者
func produce(start, count int, out chan<- int) {
	for i := 0; i < count; i++ {
		out <- start
		start = start + count
	}
	close(out)
}

// 消费者
func consume(in <-chan int, done chan<- bool) {
	for num := range in {
		fmt.Printf("%d\n", num)
	}
	done <- true
}

func main() {
	numChan := make(chan int)
	done := make(chan bool)
	go produce(0, 10, numChan)
	go consume(numChan, done)

	<-done
}

// $ go run ma1in.go
// 输出如下
// 0
// 10
// 20
// 30
// 40
// 50
// 60
// 70
// 80
// 90

信号量模式

信号量 是一种同步原语,对数量有限的资源的访问进行控制。

接口

type Interface interface {
	Acquire() error
	Release() error
}

资源信号量

package main

var (
	ErrNoTickets      = errors.New("semaphore: could not aquire semaphore")
	ErrIllegalRelease = errors.New("semaphore: can't release the semaphore without acquiring it first")
)

type implementation struct {
	sem     chan struct{}
	timeout time.Duration
}

// Acquire 请求资源
func (s *implementation) Acquire() error {
	select {
	case s.sem <- struct{}{}:
		return nil
	case <-time.After(s.timeout):
		return ErrNoTickets
	}
}

// Release 释放资源
func (s *implementation) Release() error {
	select {
	case _ = <-s.sem:
		return nil
	case <-time.After(s.timeout):
		return ErrIllegalRelease
	}

	return nil
}

func New(tickets int, timeout time.Duration) Interface {
	return &implementation{
		sem:     make(chan struct{}, tickets),
		timeout: timeout,
	}
}

超时信号

tickets, timeout := 1, 3*time.Second
s := semaphore.New(tickets, timeout)

if err := s.Acquire(); err != nil {
    panic(err)
}

// do something

if err := s.Release(); err != nil {
    panic(err)
}

无超时信号 (非阻塞)

tickets, timeout := 0, 0
s := semaphore.New(tickets, timeout)

if err := s.Acquire(); err != nil {
    if err != semaphore.ErrNoTickets {
        panic(err)
    }
	
    os.Exit(1)
}

流式计算

π 的计算公式

package main

import (
	"fmt"
	"math"
	"runtime"
)

const NCPU = 2

func main() {
	runtime.GOMAXPROCS(2)
	fmt.Println(CalculatePi(5000))
}

func CalculatePi(end int) float64 {
	ch := make(chan float64)

	for i := 0; i < NCPU; i++ {
		// 启动 2 个 goroutine
		// 1 个 goroutine 计算 k (0 -> 2500)
		// 1 个 goroutine 计算 k (2500 -> 5000)
		go term(ch, i*end/NCPU, (i+1)*end/NCPU)
	}

	result := 0.0
	for i := 0; i < NCPU; i++ {
		result += <-ch
	}
	return result
}

// 计算公式展开式
func term(ch chan float64, start, end int) {
	result := 0.0
	for i := start; i < end; i++ {
		x := float64(i)
		result += 4 * (math.Pow(-1, x) / (2.0*x + 1.0))
	}
	ch <- result
}

// $ go run ma1in.go
// 输出如下
// 3.1413926535917938
*/

简单 Master/Worker

对于任何可以建模为 Master-Worker 的问题,各个 Worker 通道和 Master 通信,如果系统是分布式部署的,各个工作节点充当 Worker, 中央节点MasterWorker 之间使用 RPC 等协议进行通信。

package main

func main() {
    pending, done := make(chan *Task), make(chan *Task)
    go sendWork(pending)       
	
    for i := 0; i < N; i++ {   
        go Worker(pending, done)
    }
	
    consumeWork(done)          
}

func Worker(in, out chan *Task) {
    for {
        t := <-in
        process(t)
        out <- t
    }
}

Futures 模式

Futures 模式是指在使用某一个值之前需要先对其进行计算。这时可以在另一个 goroutine 进行该值的计算,到该值真正使用时就已经计算完毕了。 Futures 模式通过闭包和通道可以很容易实现,类似于生成器,不同地方在于 Futures 需要返回一个值。

假设我们有一个矩阵类型,我们需要计算两个矩阵 A 和 B 乘积的逆,首先我们通过函数 Inverse(M) 分别对其进行求逆运算,再将结果相乘。

func InverseProduct(a Matrix, b Matrix) {
    a_inv_future := InverseFuture(a)   
    b_inv_future := InverseFuture(b)   
    a_inv := <-a_inv_future
    b_inv := <-b_inv_future
    return Product(a_inv, b_inv)
}

InverseFuture 函数以 goroutine 的形式起了一个闭包,该闭包会将矩阵求逆结果放入到 future 通道中:

func InverseFuture(a Matrix) chan Matrix {
    future := make(chan Matrix)
    go func() {
        future <- Inverse(a)
    }()
    return future
}

当开发一个计算密集型库时,使用 Futures 模式设计API接口是很有意义的。在你的包使用 Futures 模式,且能保持友好的 API 接口。 此外,Futures 可以通过一个异步的 API 暴露出来。这样就可以用最小的成本将包中的并行计算移到用户代码中。

限制并发请求处理数量

使用带缓冲区的通道很容易实现,其缓冲区容量就是同时处理请求的最大数量。程序中超过 MAXREQS 的请求将不会被同时处理, 因为当 sem 通道表示缓冲区已满时,handle 函数会阻塞且不再处理其他请求,直到某个请求从 sem 通道中被移除。

package main

const MAXREQS = 50

var sem = make(chan int, MAXREQS)

type Request struct {
    a, b   int
    replyc chan int
}

func process(r *Request) {
    // do something
}

// 一进一出,一来一回,很巧妙
func handle(r *Request) {
    sem <- 1 // doesn't matter what we put in it
    process(r)
    <-sem // one empty place in the buffer: the next request can start
}

func server(service chan *Request) {
    for {
        request := <-service
        go handle(request)
    }
}

func main() {
    service := make(chan *Request)
    go server(service)
}

状态模式

假设我们需要处理一些数量巨大且互不相关的数据项,它们从一个 in 通道被传递进来,当我们处理完以后又要将它们放入另一个 out 通道, 就像一个工厂流水线一样。处理每个数据项也可能包含许多步骤:Preprocess(预处理) / StepA(步骤A) / StepB(步骤B) / ... / PostProcess(后处理)

让每一个处理步骤作为一个 goroutine 独立工作,每一个步骤从上一步的输出通道中获得输入数据。 这种方式仅有极少数时间会被浪费,而大部分时间所有的步骤都在一直执行中。

单纯从流程描述的话,很像设计模式里面的 “状态模式”,核心是下一个数据依赖于上一个数据处理完成,通道的缓冲区大小可以调整优化。

func ParallelProcessData (in <-chan *Data, out chan<- *Data) {
    preOut := make(chan *Data, 100)
    stepAOut := make(chan *Data, 100)
    stepBOut := make(chan *Data, 100)
    stepCOut := make(chan *Data, 100)
	
    go PreprocessData(in, preOut)
    go ProcessStepA(preOut,StepAOut)
    go ProcessStepB(StepAOut,StepBOut)
    go ProcessStepC(StepBOut,StepCOut)
    go PostProcessData(StepCOut,out)
}

链式调用

package main

// 反向执行过程很像递归
func f(left, right chan int) {
	left <- 1 + <-right
}

func main() {
	leftmost := make(chan int)
	var left, right chan int = nil, leftmost

	for i := 0; i < 100000; i++ {
		left, right = right, make(chan int)
		go f(left, right)
	}

	right <- 0      // bang!
	x := <-leftmost // wait for completion
	println(x)      // 100000
}

当循环完成之后,一个 0 被写入到 最右边 的通道里,于是 100000goroutine 开始顺序执行。

Reference

转载申请

本作品采用 知识共享署名 4.0 国际许可协议 进行许可,转载时请注明原文链接,图片在使用时请保留全部内容,商业转载请联系作者获得授权。