golang channel 使用总结


Do not communicate by sharing memory; instead, share memory by communicating.
| 操作 | nil channel | closed channel | not-closed non-nil channel | 
|---|---|---|---|
| close | panic | panic | 成功 close | 
写 ch <- | 一直阻塞 | panic | 阻塞或成功写入数据 | 
读 <- ch | 一直阻塞 | 读取对应类型零值 | 阻塞或成功读取数据 | 
// ok is false when ch is closed
v, ok := <-ch
内部实现
$GOROOT/src/runtime/chan.go 里),维护了 3 个队列:读等待协程队列 recvq,维护了阻塞在读此 channel 的协程列表 写等待协程队列 sendq,维护了阻塞在写此 channel 的协程列表 缓冲数据队列 buf,用环形队列实现,不带缓冲的 channel 此队列 size 则为 0 

当 buf 非空时,此时 recvq 必为空,buf 弹出一个元素给读协程,读协程获得数据后继续执行,此时若 sendq 非空,则从 sendq 中弹出一个写协程转入 running 状态,待写数据入队列 buf ,此时读取操作 <- ch未阻塞;当 buf 为空但 sendq 非空时(不带缓冲的 channel),则从 sendq 中弹出一个写协程转入 running 状态,待写数据直接传递给读协程,读协程继续执行,此时读取操作 <- ch未阻塞;当 buf 为空并且 sendq 也为空时,读协程入队列 recvq 并转入 blocking 状态,当后续有其他协程往 channel 写数据时,读协程才会重新转入 running 状态,此时读取操作 <- ch阻塞。
当队列 recvq 非空时,此时队列 buf 必为空,从 recvq 弹出一个读协程接收待写数据,此读协程此时结束阻塞并转入 running 状态,写协程继续执行,此时写入操作 ch <-未阻塞;当队列 recvq 为空但 buf 未满时,此时 sendq 必为空,写协程的待写数据入 buf 然后继续执行,此时写入操作 ch <-未阻塞;当队列 recvq 为空并且 buf 为满时,此时写协程入队列 sendq 并转入 blokcing 状态,当后续有其他协程从 channel 中读数据时,写协程才会重新转入 running 状态,此时写入操作 ch <-阻塞。
当队列 recvq 非空时,此时 buf 必为空,recvq 中的所有协程都将收到对应类型的零值然后结束阻塞状态; 当队列 sendq 非空时,此时 buf 必为满,sendq 中的所有协程都会产生 panic ,在 buf 中数据仍然会保留直到被其他协程读取。 
使用场景
futures / promises
package main
import (
    "io/ioutil"
    "log"
    "net/http"
)
// RequestFuture, http request promise.
func RequestFuture(url string) <-chan []byte {
    c := make(chan []byte, 1)
    go func() {
        var body []byte
        defer func() {
            c <- body
        }()
        res, err := http.Get(url)
        if err != nil {
            return
        }
        defer res.Body.Close()
        body, _ = ioutil.ReadAll(res.Body)
    }()
    return c
}
func main() {
    future := RequestFuture("https://api.github.com/users/octocat/orgs")
    body := <-future
    log.Printf("reponse length: %d", len(body))
}
条件变量 (condition variable)
strct {} 作为 channel 的类型。一对一通知
pthread_cond_signal() 的功能,用来在一个协程中通知另个某一个协程事件发生:package main
import (
    "fmt"
    "time"
)
func main() {
    ch := make(chan struct{})
    nums := make([]int, 100)
    go func() {
        time.Sleep(time.Second)
        for i := 0; i < len(nums); i++ {
            nums[i] = i
        }
        // send a finish signal
        ch <- struct{}{}
    }()
    // wait for finish signal
    <-ch
    fmt.Println(nums)
}
广播通知
pthread_cond_broadcast() 的功能。利用从已关闭的 channel 读取数据时总是非阻塞的特性,可以实现在一个协程中向其他多个协程广播某个事件发生的通知:package main
import (
    "fmt"
    "time"
)
func main() {
    N := 10
    exit := make(chan struct{})
    done := make(chan struct{}, N)
    // start N worker goroutines
    for i := 0; i < N; i++ {
        go func(n int) {
            for {
                select {
                // wait for exit signal
                case <-exit:
                    fmt.Printf("worker goroutine #%d exit\n", n)
                    done <- struct{}{}
                    return
                case <-time.After(time.Second):
                    fmt.Printf("worker goroutine #%d is working...\n", n)
                }
            }
        }(i)
    }
    time.Sleep(3 * time.Second)
    // broadcast exit signal
    close(exit)
    // wait for all worker goroutines exit
    for i := 0; i < N; i++ {
        <-done
    }
    fmt.Println("main goroutine exit")
}
信号量
package main
import (
    "log"
    "math/rand"
    "time"
)
type Seat int
type Bar chan Seat
func (bar Bar) ServeConsumer(customerId int) {
    log.Print("-> consumer#", customerId, " enters the bar")
    seat := <-bar // need a seat to drink
    log.Print("consumer#", customerId, " drinks at seat#", seat)
    time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))
    log.Print("<- consumer#", customerId, " frees seat#", seat)
    bar <- seat // free the seat and leave the bar
}
func main() {
    rand.Seed(time.Now().UnixNano())
    bar24x7 := make(Bar, 10) // the bar has 10 seats
    // Place seats in an bar.
    for seatId := 0; seatId < cap(bar24x7); seatId++ {
        bar24x7 <- Seat(seatId) // none of the sends will block
    }
    // a new consumer try to enter the bar for each second
    for customerId := 0; ; customerId++ {
        time.Sleep(time.Second)
        go bar24x7.ServeConsumer(customerId)
    }
}
互斥量
互斥量相当于二元信号里,所以 cap 为 1 的 channel 可以当成互斥量使用:
package main
import "fmt"
func main() {
    mutex := make(chan struct{}, 1) // the capacity must be one
    counter := 0
    increase := func() {
        mutex <- struct{}{} // lock
        counter++
        <-mutex // unlock
    }
    increase1000 := func(done chan<- struct{}) {
        for i := 0; i < 1000; i++ {
            increase()
        }
        done <- struct{}{}
    }
    done := make(chan struct{})
    go increase1000(done)
    <-done; <-done
    fmt.Println(counter) // 2000
}
关闭 channel
func isClosed(ch chan int) bool {
    select {
    case <-ch:
        return true
    default:
    }
    return false
}
isClosed() 得到当前 channel 当前还未关闭,如果试图往 channel 里写数据,仍然可能会发生 panic ,因为在调用 isClosed() 后,其他协程可能已经把 channel 关闭了。关闭 channel 时应该注意以下准则:不要在读取端关闭 channel ,因为写入端无法知道 channel 是否已经关闭,往已关闭的 channel 写数据会 panic ; 有多个写入端时,不要再写入端关闭 channle ,因为其他写入端无法知道 channel 是否已经关闭,关闭已经关闭的 channel 会发生 panic ; 如果只有一个写入端,可以在这个写入端放心关闭 channel 。 
sync 包来做关闭 channel 时的协程同步,不过使用起来也稍微复杂些。下面介绍一种优雅些的做法。一写多读
for range 把 channel 中数据遍历完就可以了,当 channel 关闭时,for range 仍然会将 channel 缓冲中的数据全部遍历完然后再退出循环:package main
import (
    "fmt"
    "sync"
)
func main() {
    wg := &sync.WaitGroup{}
    ch := make(chan int, 100)
    send := func() {
        for i := 0; i < 100; i++ {
            ch <- i
        }
        // signal sending finish
        close(ch)
    }
    recv := func(id int) {
        defer wg.Done()
        for i := range ch {
            fmt.Printf("receiver #%d get %d\n", id, i)
        }
        fmt.Printf("receiver #%d exit\n", id)
    }
    wg.Add(3)
    go recv(0)
    go recv(1)
    go recv(2)
    send()
    wg.Wait()
}
多写一读
sync.Once 来解决多个写入端重复关闭 channel 的问题,但更优雅的办法设置一个额外的 channel ,由读取端通过关闭来通知写入端任务完成不要再继续再写入数据了:package main
import (
    "fmt"
    "sync"
)
func main() {
    wg := &sync.WaitGroup{}
    ch := make(chan int, 100)
    done := make(chan struct{})
    send := func(id int) {
        defer wg.Done()
        for i := 0; ; i++ {
            select {
            case <-done:
                // get exit signal
                fmt.Printf("sender #%d exit\n", id)
                return
            case ch <- id*1000 + i:
            }
        }
    }
    recv := func() {
        count := 0
        for i := range ch {
            fmt.Printf("receiver get %d\n", i)
            count++
            if count >= 1000 {
                // signal recving finish
                close(done)
                return
            }
        }
    }
    wg.Add(3)
    go send(0)
    go send(1)
    go send(2)
    recv()
    wg.Wait()
}
多写多读
package main
import (
    "fmt"
    "sync"
    "time"
)
func main() {
    wg := &sync.WaitGroup{}
    ch := make(chan int, 100)
    done := make(chan struct{})
    send := func(id int) {
        defer wg.Done()
        for i := 0; ; i++ {
            select {
            case <-done:
                // get exit signal
                fmt.Printf("sender #%d exit\n", id)
                return
            case ch <- id*1000 + i:
            }
        }
    }
    recv := func(id int) {
        defer wg.Done()
        for {
            select {
            case <-done:
                // get exit signal
                fmt.Printf("receiver #%d exit\n", id)
                return
            case i := <-ch:
                fmt.Printf("receiver #%d get %d\n", id, i)
                time.Sleep(time.Millisecond)
            }
        }
    }
    wg.Add(6)
    go send(0)
    go send(1)
    go send(2)
    go recv(0)
    go recv(1)
    go recv(2)
    time.Sleep(time.Second)
    // signal finish
    close(done)
    // wait all sender and receiver exit
    wg.Wait()
}
总结
转自:ExplorerMan
cnblogs.com/ExMan/p/11710017.html
文章转载:Go开发大全
(版权归原作者所有,侵删)
![]()

点击下方“阅读原文”查看更多
评论
