从并发模式看 Go channel 使用技巧

共 534字,需浏览 2分钟

 ·

2020-08-09 05:42

最近重看MinIO的源代码,发现纠删码模式下读取数据盘的时候,使用了更简单的并发读取方式,以前看的时候没发现,查了下Git历史记录,发现是19年新改的,新的使用channel做标记的方式的确非常巧妙,简化了代码逻辑,值得我们学习。所以今天就开篇文章,介绍下channel在并发下的两个使用技巧。

赢者为王模式

这种并发模式并不稀奇,相信很多朋友都用到过。它的核心思想就是同时开几个协程做同样的事情,谁先搞定,我们就用谁的结果。在Go语言的channel支持下,我们很容易实现这种并发方式。

假设我们把同一份资源,存储在网络上的5个服务器上(镜像、备份等),然后我们现在需要获取这个资源,我们就可以同时开5个协程,访问这5个服务器上的资源,谁先获取到,我们就用谁的,这样就可以最快速度获取,排除掉网络慢的服务器。

func main() {
    txtResult := make(chan string5)
    go func() {txtResult <- getTxt("res1.flysnow.org")}()
    go func() {txtResult <- getTxt("res2.flysnow.org")}()
    go func() {txtResult <- getTxt("res3.flysnow.org")}()
    go func() {txtResult <- getTxt("res4.flysnow.org")}()
    go func() {txtResult <- getTxt("res5.flysnow.org")}()
    println(<-txtResult)
}

func getTxt(host string) string{
    //省略网络访问逻辑,直接返回模拟结果
    //http.Get(host+"/1.txt")
    return host+":模拟结果"
}

其中getTxt没有真实实现,只是一个模拟,但是通过以上示例已经可以说明赢者为王这种并发模式的使用。这种并发模式适合多个协程对同一种资源的读取,更概括的讲就是做同一件事情,只要有一个协程干成了就OK了。这种模式的优点主要有两个:1.可以最大程度减少耗时;提高成功率。

最终成功模式

这种并发模式我们自己可能遇到过,但是可能不是叫这个名字,这个名字是我自己起的,我觉得比较贴切。比如同时并发的从10个文件中成功读取任意5个文件,你可以开启5个协程,也可以开启3个,都随意,但是必须是成功读取了5个才算成功,否则就是失败。

这种模式MinIO也遇到了,它的解决方式就是我在开篇提到的非常好的技巧,现在我们就来介绍这种技巧。在介绍这种技巧前,我们先列举下其他的思路。

第一种思路:
先并发获取,存放起来,然后再一个个判断是否获取成功,如果有的没有成功再重新获取,而且获取的文件不能重复。这种方式是取到结果后进行判断是否成功,然后根据情况再决定是否重新获取,要去重,要判断,业务逻辑比较复杂。

第二种思路:
并发的时候就保证成功,里面可能是个for循环,直到成功为止,然后再返回结果。这种思路缺陷也很明显,如果这个文件损坏,那么就会一直死循环下去,要避免死循环,就要加上重试次数。

而MinIO的实现方式比较巧妙,它也是多协程,但是发现如果有文件读取不成功,他会通过channel的方式标记,换一个文件读取。因为一共10个文件呢,这个不行,换一个,不能在一个文件上等死,只要成功读取5个就可以了。

现在我们看下MinIO的这段代码,代码比较长,我尽可能删除一些无用的,但是为了保证可读性,还是会长一些,大家耐心看完,就学到了。

// Read reads from readers in parallel. Returns p.dataBlocks number of bufs.
func (p *parallelReader) Read(dst [][]byte) ([][]byte, error) {
    newBuf := dst
    //省略不太相关代码
    var newBufLK sync.RWMutex

    //省略无关
    //channel开始创建,要发挥作用了。这里记住几个数字:
    //readTriggerCh大小是10,p.dataBlocks大小是5
    readTriggerCh := make(chan boollen(p.readers))
    for i := 0; i < p.dataBlocks; i++ {
        // Setup read triggers for p.dataBlocks number of reads so that it reads in parallel.
        readTriggerCh <- true
    }

    healRequired := int32(0// Atomic bool flag.
    readerIndex := 0
    var wg sync.WaitGroup
    // readTrigger 为 true, 意味着需要用disk.ReadAt() 读取下一个数据
    // readTrigger 为 false, 意味着读取成功了,不再需要读取
    for readTrigger := range readTriggerCh {
        newBufLK.RLock()
        canDecode := p.canDecode(newBuf)
        newBufLK.RUnlock()
        //判断是否有5个成功的,如果有,退出for循环
        if canDecode {
            break
        }
        //读取次数上限,不能大于10
        if readerIndex == len(p.readers) {
            break
        }
        //成功了,退出本次读取
        if !readTrigger {
            continue
        }
        wg.Add(1)
        //并发读取数据
        go func(i int) {
            defer wg.Done()
            //省略不太相关代码
            _, err := rr.ReadAt(p.buf[bufIdx], p.offset)
            if err != nil {
                //省略不太相关代码
                // 失败了,标记为true,触发下一个读取.
                readTriggerCh <- true
                return
            }
            newBufLK.Lock()
            newBuf[bufIdx] = p.buf[bufIdx]
            newBufLK.Unlock()
            // 成功了,标记为false,不再读取
            readTriggerCh <- false
        }(readerIndex)
        //控制次数,同时用来作为索引获取和存储数据
        readerIndex++
    }
    wg.Wait()

    //最终结果判断,如果OK了就正确返回,如果有失败的,返回error信息。
    if p.canDecode(newBuf) {
        p.offset += p.shardSize
        if healRequired != 0 {
            return newBuf, errHealRequired
        }
        return newBuf, nil
    }

    return nil, errErasureReadQuorum
}

以上代码虽然长,但是我做了注释,也比较容易理解了。现在再对这段逻辑进行解释下:

  1. 前提是从10个数据里读取任意5个

  2. 初始化的chan大小是10,但是通过for循环只存放了5个true

  3. 然后对chan循环读取数据,如果是true就开启go协程获取数据,如果是false就终止这次循环

  4. 当前在这之前还会判断下是否已经成功获取了5个,如果是的话,直接跳出整个for循环

  5. 通过readerIndex每次尝试获取一个数据,如果成功一个false到chan中,如果失败则塞个true

  6. 这样不成功的readerIndex不再尝试读取,失败了就通过true标记尝试读取下一个readerIndex

  7. 通过chan这种巧妙的方式不断循环,直到成功读取5个,或者把10个数据都读一遍为止

  8. 最终再基于是否成功读取到5个数据,做最终的判断,是返回成功数据,还是错误

利用channel来做标记和循环取数据,是一种非常好的方式,简化了代码逻辑,整体看起来非常清晰了,有兴趣的朋友可以看下MinIO原来的代码,感受会更强烈。

小结

以上主要是两种使用channel的技巧,这些技巧一些会靠自己熟能生巧,一些需要看别人的源代码学习,而阅读开源代码是一个非常不错的途径。




推荐阅读



学习交流 Go 语言,扫码回复「进群」即可


站长 polarisxu

自己的原创文章

不限于 Go 技术

职场和创业经验


Go语言中文网

每天为你

分享 Go 知识

Go爱好者值得关注


浏览 32
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报