errgroup:并发任务 goroutine 的传播控制
共 11168字,需浏览 23分钟
·
2021-08-29 07:27
Go 语言为我们提供了丰富的并发原语,且大多数都位于 sync 包下。但其实 Go 官方在实验库 golang.org/x/sync
下还有其他的原语补充,今天我们来探讨一下该库下的原语之一:errgroup。在学习 errgroup 之前,我们先回顾一下 WaitGroup 存在什么不足。
初识 errgroup
在 sync.WaitGroup 源码解析一文中,我们知道 WaitGroup 主要用于控制任务组下的并发子任务。它的具体做法就是,子任务 goroutine 执行前通过 Add
方法添加任务数目,子任务 goroutine 结束时调用 Done
标记已完成任务数,主任务 goroutine 通过 Wait
方法等待所有的任务完成后才能执行后续逻辑。
1package main
2
3import (
4 "net/http"
5 "sync"
6)
7
8func main() {
9 var wg sync.WaitGroup
10 var urls = []string{
11 "http://www.golang.org/",
12 "http://www.baidu.com/",
13 "http://www.noexist11111111.com/",
14 }
15 for _, url := range urls {
16 wg.Add(1)
17 go func(url string) {
18 defer wg.Done()
19 resp, err := http.Get(url)
20 if err != nil {
21 return
22 }
23 resp.Body.Close()
24 }(url)
25 }
26 wg.Wait()
27}
在以上示例代码中,我们通过三个 goroutine 去并发的请求 url,直到所有的子任务 goroutine 均完成访问,主任务 goroutine 下的 wg.Wait
才会停止阻塞。
但在实际的项目代码中,子任务 goroutine 的执行并不总是顺风顺水,它们也许会产生 error。而 WaitGroup 并没有告诉我们在子 goroutine 发生错误时,如何将其抛给主任务 groutine。
这个时候可以考虑使用 errgroup
1package main
2
3import (
4 "fmt"
5 "net/http"
6
7 "golang.org/x/sync/errgroup"
8)
9
10func main() {
11 var urls = []string{
12 "http://www.golang.org/",
13 "http://www.baidu.com/",
14 "http://www.noexist11111111.com/",
15 }
16 g := new(errgroup.Group)
17 for _, url := range urls {
18 url := url
19 g.Go(func() error {
20 resp, err := http.Get(url)
21 if err != nil {
22 fmt.Println(err)
23 return err
24 }
25 fmt.Printf("get [%s] success: [%d] \n", url, resp.StatusCode)
26 return resp.Body.Close()
27 })
28 }
29 if err := g.Wait(); err != nil {
30 fmt.Println(err)
31 } else {
32 fmt.Println("All success!")
33 }
34}
结果如下
1get [http://www.baidu.com/] success: [200]
2Get "http://www.noexist11111111.com/": dial tcp 120.240.95.35:80: i/o timeout
3Get "http://www.golang.org/": dial tcp 172.217.160.81:80: i/o timeout
4Get "http://www.noexist11111111.com/": dial tcp 120.240.95.35:80: i/o timeout
可以看到,执行获取 www.golang.org 和 www.noexist11111111.com 两个 url 的子 groutine 均发生了错误,在主任务 goroutine 中成功捕获到了第一个错误信息。
除了 拥有 WaitGroup 的控制能力 和 错误传播 的功能之外,errgroup 还有最重要的 context 反向传播机制,我们来看一下它的设计。
errgroup 源码解析
errgroup 的设计非常精练,全部代码如下
1type Group struct {
2 cancel func()
3
4 wg sync.WaitGroup
5
6 errOnce sync.Once
7 err error
8}
9
10func WithContext(ctx context.Context) (*Group, context.Context) {
11 ctx, cancel := context.WithCancel(ctx)
12 return &Group{cancel: cancel}, ctx
13}
14
15func (g *Group) Wait() error {
16 g.wg.Wait()
17 if g.cancel != nil {
18 g.cancel()
19 }
20 return g.err
21}
22
23func (g *Group) Go(f func() error) {
24 g.wg.Add(1)
25
26 go func() {
27 defer g.wg.Done()
28
29 if err := f(); err != nil {
30 g.errOnce.Do(func() {
31 g.err = err
32 if g.cancel != nil {
33 g.cancel()
34 }
35 })
36 }
37 }()
38}
可以看到,errgroup 的实现依靠于结构体 Group,它通过封装 sync.WaitGroup,继承了 WaitGroup 的特性,在 Go()
方法中新起一个子任务 goroutine,并在 Wait()
方法中通过 sync.WaitGroup 的 Wait
进行阻塞等待。
同时 Group 利用 sync.Once 保证了它有且仅会保留第一个子 goroutine 错误。
最后,Group 通过嵌入 context.WithCancel 方法产生的 cancel
函数(对于 Context 不熟悉的读者,推荐阅读 理解Context机制 一文),能够在子 goroutine 发生错误时,及时通过调用 cancle 函数,将 Context 的取消信号及时传播出去。当然,这一特性需要用户代码的配合。
errgroup 上下文取消
在 errgroup 的文档(https://pkg.go.dev/golang.org/x/sync@v0.0.0-20210220032951-036812b2e83c/errgroup#example-Group-Pipeline)中,它基于 Go 官方文档的 pipeline( https://blog.golang.org/pipelines) ,实现了一个任务组 goroutine 中上下文取消(Context cancelation)演示的示例。但该 Demo 的前提知识略多,本文这里基于其思想,提供一个易于理解的使用示例。
1package main
2
3import (
4 "context"
5 "fmt"
6
7 "golang.org/x/sync/errgroup"
8)
9
10func main() {
11
12 g, ctx := errgroup.WithContext(context.Background())
13 dataChan := make(chan int, 20)
14
15 // 数据生产端任务子 goroutine
16 g.Go(func() error {
17 defer close(dataChan)
18 for i := 1; ; i++ {
19 if i == 10 {
20 return fmt.Errorf("data 10 is wrong")
21 }
22 dataChan <- i
23 fmt.Println(fmt.Sprintf("sending %d", i))
24 }
25 })
26
27 // 数据消费端任务子 goroutine
28 for i := 0; i < 3; i++ {
29 g.Go(func() error {
30 for j := 1; ; j++ {
31 select {
32 case <-ctx.Done():
33 return ctx.Err()
34 case number := <-dataChan:
35 fmt.Println(fmt.Sprintf("receiving %d", number))
36 }
37 }
38 })
39 }
40
41 // 主任务 goroutine 等待 pipeline 结束数据流
42 err := g.Wait()
43 if err != nil {
44 fmt.Println(err)
45 }
46 fmt.Println("main goroutine done!")
47}
在以上示例中,我们模拟了一个数据传送管道。在数据的生产与消费任务集中,有四个子任务 goroutine:一个生产数据的 goroutine,三个消费数据的 goroutine。当数据生产方存在错误数据时(数据等于 10 ),我们停止数据的生产与消费,并将错误抛出,回到 main goroutine 的执行逻辑中。
可以看到,因为 errgroup 中的 Context cancle 函数的嵌入,我们在子任务 goroutine 中也能反向控制任务上下文。
程序的某一次运行,输出结果如下
1receiving 1
2sending 1
3sending 2
4sending 3
5sending 4
6sending 5
7sending 6
8sending 7
9sending 8
10sending 9
11receiving 4
12receiving 5
13receiving 6
14receiving 2
15receiving 7
16receiving 3
17receiving 8
18receiving 9
19data 10 is wrong
20main goroutine done!
总结
errgroup 是 Go 官方的并发原语补充库,相对于标准库中提供的原语而言,显得没那么核心。这里总结一下 errgroup 的特性。
继承了 WaitGroup 的功能
错误传播:能够返回任务组中发生的第一个错误,但有且仅能返回该错误
context 信号传播:如果子任务 goroutine 中有循环逻辑,则可以添加 ctx.Done 逻辑,此时通过 context 的取消信号,提前结束子任务执行。
推荐阅读