从并发模式看 Go channel 使用技巧
最近重看MinIO的源代码,发现纠删码模式下读取数据盘的时候,使用了更简单的并发读取方式,以前看的时候没发现,查了下Git历史记录,发现是19年新改的,新的使用channel做标记的方式的确非常巧妙,简化了代码逻辑,值得我们学习。所以今天就开篇文章,介绍下channel在并发下的两个使用技巧。
赢者为王模式
这种并发模式并不稀奇,相信很多朋友都用到过。它的核心思想就是同时开几个协程做同样的事情,谁先搞定,我们就用谁的结果。在Go语言的channel支持下,我们很容易实现这种并发方式。
假设我们把同一份资源,存储在网络上的5个服务器上(镜像、备份等),然后我们现在需要获取这个资源,我们就可以同时开5个协程,访问这5个服务器上的资源,谁先获取到,我们就用谁的,这样就可以最快速度获取,排除掉网络慢的服务器。
func main() {
txtResult := make(chan string, 5)
go func() {txtResult <- getTxt("res1.flysnow.org")}()
go func() {txtResult <- getTxt("res2.flysnow.org")}()
go func() {txtResult <- getTxt("res3.flysnow.org")}()
go func() {txtResult <- getTxt("res4.flysnow.org")}()
go func() {txtResult <- getTxt("res5.flysnow.org")}()
println(<-txtResult)
}
func getTxt(host string) string{
//省略网络访问逻辑,直接返回模拟结果
//http.Get(host+"/1.txt")
return host+":模拟结果"
}
其中getTxt
没有真实实现,只是一个模拟,但是通过以上示例已经可以说明赢者为王这种并发模式的使用。这种并发模式适合多个协程对同一种资源的读取,更概括的讲就是做同一件事情,只要有一个协程干成了就OK了。这种模式的优点主要有两个:1.可以最大程度减少耗时;提高成功率。
最终成功模式
这种并发模式我们自己可能遇到过,但是可能不是叫这个名字,这个名字是我自己起的,我觉得比较贴切。比如同时并发的从10个文件中成功读取任意5个文件,你可以开启5个协程,也可以开启3个,都随意,但是必须是成功读取了5个才算成功,否则就是失败。
这种模式MinIO也遇到了,它的解决方式就是我在开篇提到的非常好的技巧,现在我们就来介绍这种技巧。在介绍这种技巧前,我们先列举下其他的思路。
第一种思路:
先并发获取,存放起来,然后再一个个判断是否获取成功,如果有的没有成功再重新获取,而且获取的文件不能重复。这种方式是取到结果后进行判断是否成功,然后根据情况再决定是否重新获取,要去重,要判断,业务逻辑比较复杂。
第二种思路:
并发的时候就保证成功,里面可能是个for循环,直到成功为止,然后再返回结果。这种思路缺陷也很明显,如果这个文件损坏,那么就会一直死循环下去,要避免死循环,就要加上重试次数。
而MinIO的实现方式比较巧妙,它也是多协程,但是发现如果有文件读取不成功,他会通过channel的方式标记,换一个文件读取。因为一共10个文件呢,这个不行,换一个,不能在一个文件上等死,只要成功读取5个就可以了。
现在我们看下MinIO的这段代码,代码比较长,我尽可能删除一些无用的,但是为了保证可读性,还是会长一些,大家耐心看完,就学到了。
// Read reads from readers in parallel. Returns p.dataBlocks number of bufs.
func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
newBuf := dst
//省略不太相关代码
var newBufLK sync.RWMutex
//省略无关
//channel开始创建,要发挥作用了。这里记住几个数字:
//readTriggerCh大小是10,p.dataBlocks大小是5
readTriggerCh := make(chan bool, len(p.readers))
for i := 0; i < p.dataBlocks; i++ {
// Setup read triggers for p.dataBlocks number of reads so that it reads in parallel.
readTriggerCh <- true
}
healRequired := int32(0) // Atomic bool flag.
readerIndex := 0
var wg sync.WaitGroup
// readTrigger 为 true, 意味着需要用disk.ReadAt() 读取下一个数据
// readTrigger 为 false, 意味着读取成功了,不再需要读取
for readTrigger := range readTriggerCh {
newBufLK.RLock()
canDecode := p.canDecode(newBuf)
newBufLK.RUnlock()
//判断是否有5个成功的,如果有,退出for循环
if canDecode {
break
}
//读取次数上限,不能大于10
if readerIndex == len(p.readers) {
break
}
//成功了,退出本次读取
if !readTrigger {
continue
}
wg.Add(1)
//并发读取数据
go func(i int) {
defer wg.Done()
//省略不太相关代码
_, err := rr.ReadAt(p.buf[bufIdx], p.offset)
if err != nil {
//省略不太相关代码
// 失败了,标记为true,触发下一个读取.
readTriggerCh <- true
return
}
newBufLK.Lock()
newBuf[bufIdx] = p.buf[bufIdx]
newBufLK.Unlock()
// 成功了,标记为false,不再读取
readTriggerCh <- false
}(readerIndex)
//控制次数,同时用来作为索引获取和存储数据
readerIndex++
}
wg.Wait()
//最终结果判断,如果OK了就正确返回,如果有失败的,返回error信息。
if p.canDecode(newBuf) {
p.offset += p.shardSize
if healRequired != 0 {
return newBuf, errHealRequired
}
return newBuf, nil
}
return nil, errErasureReadQuorum
}
以上代码虽然长,但是我做了注释,也比较容易理解了。现在再对这段逻辑进行解释下:
前提是从10个数据里读取任意5个
初始化的chan大小是10,但是通过for循环只存放了5个true
然后对chan循环读取数据,如果是true就开启go协程获取数据,如果是false就终止这次循环
当前在这之前还会判断下是否已经成功获取了5个,如果是的话,直接跳出整个for循环
通过readerIndex每次尝试获取一个数据,如果成功塞一个false到chan中,如果失败则塞个true
这样不成功的readerIndex不再尝试读取,失败了就通过true标记尝试读取下一个readerIndex
通过chan这种巧妙的方式不断循环,直到成功读取5个,或者把10个数据都读一遍为止
最终再基于是否成功读取到5个数据,做最终的判断,是返回成功数据,还是错误
利用channel来做标记和循环取数据,是一种非常好的方式,简化了代码逻辑,整体看起来非常清晰了,有兴趣的朋友可以看下MinIO原来的代码,感受会更强烈。
小结
以上主要是两种使用channel的技巧,这些技巧一些会靠自己熟能生巧,一些需要看别人的源代码学习,而阅读开源代码是一个非常不错的途径。
推荐阅读
站长 polarisxu
自己的原创文章
不限于 Go 技术
职场和创业经验
Go语言中文网
每天为你
分享 Go 知识
Go爱好者值得关注