手摸手Go 也谈sync.WaitGroup
最近因为工作上的事情更新会相对有点儿慢,这周末又加了天班。然后昨天好好休息了下,顺便翻了下《云雀叫了一整天》,看到一首小诗觉得不错分享给大家。
从前慢
木心
记得早先少年时
大家诚诚恳恳
说一句是一句
清早上火车站
长街黑暗无行人
卖豆浆的小店冒着热气
从前的日色变得慢
车,马,邮件都慢
一生只够爱一个人
从前的锁也好看
钥匙精美有样子
你锁了人家就懂了
小小一诗,人生尽在其中。
好了回归正题,Go sync包目前只剩sync.WaitGroup没分析了,今天起个大早赶上这一篇。
sync.WaitGroup是Go提供的一种允许一个goroutine等待一组goroutine完成任务的机制,类似于Java中的CountDownLatch。主goroutine调用Add方法设置需要等待的goroutine的数量,每个goroutine完成时调用Done方法。与此同时,wait方法用于阻塞主goroutine直到所有其他goroutine执行完毕。
基本使用
利用sync.WaitGroup完成一个多协程任务
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func init() {
rand.Seed(time.Now().Unix())
}
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 3; i++ {
wg.Add(1)
go func() {
time.Sleep(time.Duration(rand.Intn(5)) * time.Second)
fmt.Println(fmt.Sprintf("I'm finish my work at %s", time.Now().Format("2006-01-02 15:04:05")))
wg.Done()
}()
}
wg.Wait()
}
sync.WaitGroup源码分析
数据结构
// 第一次使用后不允许被拷贝
type WaitGroup struct {
noCopy noCopy
//64位值: 高32位为计数器 低32位为等待计数
//64位原子操作要求64位对齐,但是32位编译器无法保证这一点。所以我们分配了12字节,
//其中对齐的8字节作存储state 剩下的4字节存储sema
state1 [3]uint32
}
sync.WaitGroup结构比较简单,只包含一个防止拷贝的noCopy字段和一个长度为3的uint32数组。其核心在于对state1这个字段的操作,其字段含义体现在state()方法:
// 从wg.state1返回指向state和sema的指针
func (wg *WaitGroup) state() (statep *uint64, semap *uint32) {
if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
return (*uint64)(unsafe.Pointer(&wg.state1)), &wg.state1[2]
} else {
return (*uint64)(unsafe.Pointer(&wg.state1[1])), &wg.state1[0]
}
}
可见state1字段存储了statep和semap,至于if-else的逻辑是因为原子操作需要数据8字节对齐,否则程序会panic。故而WaitGroup会选择使用uintptr(unsafe.Pointer(&wg.state1))%8 == 0先判断是否是8字节对齐,如果不是则拿4个字节做下padding。(因为目前大多数平台CPU字长都是8或4字节)关于内存对齐,如果你还不清楚,那你一定是没读过之前那篇《手摸手Go 你的内存对齐了吗》

字段state1剥离出了statep和semap
statep表示当前WaitGroup当前的状态,它的高32位为counter表示计数器,低32位waiters表示Wait等待的goroutine数量semap表示信号量,调用Wait的goroutine会被阻塞到这个信号量上

其核心逻辑如上图,接下来看源码
操作方法
WaitGroup提供了三个方法
func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
Add 负责修改counter值以及释放阻塞在semp信号量的goroutine Done通过调用Add递减counter值 Wait阻塞等待counter值为0为止
Add
Add操作入参delta可正可负,根据delta值更新counter。
当counter为0时,所有等待时阻塞的goroutine会被释放 如果counter为负数 则Add会发生panic
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
//累加计数器
state := atomic.AddUint64(statep, uint64(delta)<<32)
v := int32(state >> 32) //计数器
w := uint32(state) //等待的goroutine数量
if v < 0 {//counter不能为负数
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// Add不得与Wait同时进行
// 如果看到counter==0,则Wait不会增加等待者数量
// 仍然进行廉价的完整性检查以检测WaitGroup的滥用
if *statep != state { //表明Add和Wait方法同时调用了
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// 重置waiters count 为 0.
*statep = 0
for ; w != 0; w-- {
//唤醒阻塞在semap上的goroutine
runtime_Semrelease(semap, false, 0)
}
}
*statep!=state到这个检查点一定是counter==0并且waiters>0,且*statep!=state就panic,表明sync.WaitGroup不允许在waiters>0未执行完Wait方法过程中调用Add()或Wait()方法修改statep。
总结来说:当counter为零时delta为正数的调用必须在wait方法调用之前发生。当counter大于零时delta为正数或负数时的调用 随时都可能发生。通常,这意味着对Add的调用应该在创建goroutine或要等待的其他事件的语句之前执行。如果使用WaitGroup来等待几个独立的事件集,则必须在所有先前的Wait调用返回之后再进行新的Add调用。
Done
通过调用Add(-1),递减counter值
// Done decrements the WaitGroup counter by one.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
Wait
Wait 会一直阻塞到WaitGroup的counter为0为止
func (wg *WaitGroup) Wait() {
statep, semap := wg.state()
for {
state := atomic.LoadUint64(statep)
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
// counter为0 则不需要等待
return
}
// 增加等待者数量
if atomic.CompareAndSwapUint64(statep, state, state+1) {
runtime_Semacquire(semap)
if *statep != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}
Wait方法先是从state1中获取statep和semap进入一个for的无限循环, atomic.LoadUint64加载statep,从而获取高32位的counter和低32位的waiters。如果 counter==0,表示无需等待 直接返回如果 counter!=0,尝试将semap+1,如果失败则回到步骤一继续执行如果 atomic.CompareAndSwapUint64(statep, state, state+1)成功,则调用runtime_Semacquire(semap)将当前goroutine阻塞在信号量semap上。检查 *statep != 0则表明Wait方法未执行完毕前,WaitGroup又被复用了,此时会panic。
总结
WaitGroup源码还是比较简单的,通过原子操作state1和信号量来协调goroutine工作。其中state1的设计也可以说是内存对齐的一个最佳实践。通过阅读源码也掌握了使用WaitGroup的正确姿势:
Add()和Done()均可修改WaitGroup的计数数,但是要保证计数不会修改为负数,否则会发生panicWait()方法必须等待全部Add()方法调用完毕之后再调用,否则也可能导致panicWaitGroup是可以重复使用的。但前提是上一次的goroutine都调用Wait完毕后才能继续复用。
