并发控制模式:orDone的两种实现
Go语言精选
共 5002字,需浏览 11分钟
·
2021-09-07 12:34
文章目录
方式一 递归
方式二 利用反射
性能差异
orDone
是一种并发控制模式,旨在多任务场景下实现,有一个任务成功返回即立即结束等待。
今天我们来看下两种不同的实现方式:
方式一 递归
利用二分法递归, 将所有待监听信号的chan
都select
起来,
当有第一个chan
返回时,close orDone
来通知读取方已有第一个任务返回
代码如下比较直观:
// 传入多个并发chan,返回是否结束的 orDone chan
func Or(channels ...<-chan interface{}) <-chan interface{} {
// 只有零个或者1个chan
switch len(channels) {
case 0:
// 返回nil, 让读取阻塞等待
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
// 返回时利用close做结束信号的广播
defer close(orDone)
// 利用select监听第一个chan的返回
switch len(channels) {
case 2: // 直接select
select {
case <-channels[0]:
case <-channels[1]:
}
default: // 二分法递归处理
m := len(channels) / 2
select {
case <-Or(channels[:m]...):
case <-Or(channels[m:]...):
}
}
}()
return orDone
}
方式二 利用反射
这里要用到reflect.SelectCase
, 他可以描述一种select
的case
,
来指明其接受的是chan
的读取或发送
type SelectCase struct {
Dir SelectDir // direction of case
Chan Value // channel to use (for send or receive)
Send Value // value to send (for send)
}
有了这个,就可以之间遍历,不用递归来实现有限的select case
构造
最后用reflect.Select(cases)
监听信号就可以了,代码如下:
func OrInReflect(channels ...<-chan interface{}) <-chan interface{} {
// 只有0个或者1个
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
orDone := make(chan interface{})
go func() {
defer close(orDone)
// 利用反射构建SelectCase,这里是读取
var cases []reflect.SelectCase
for _, c := range channels {
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c),
})
}
// 随机选择一个可用的case
reflect.Select(cases)
}()
return orDone
}
性能差异
这两种都可以支持大量chan
的信号监听,那性能差异大么
虽说递归开销肯定不小,反射也不一定效率高,拿个压测来试试吧
先构造一下大量并发chan
func repeat(
done <-chan interface{},
// 外部传入done控制是否结束
values ...interface{},
) <-chan interface{} {
valueStream := make(chan interface{})
go func() {
// 返回时释放
defer close(valueStream)
for {
for _, v := range values {
select {
case <-done:
return
case valueStream <- v:
}
}
}
}()
return valueStream
}
然后压测
func BenchmarkOr(b *testing.B) {
done := make(chan interface{})
defer close(done)
num := 100
streams := make([]<-chan interface{}, num)
for i := range streams {
streams[i] = repeat(done, []int{1, 2, 3})
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
<-Or(streams...)
}
}
func BenchmarkOrInReflect(b *testing.B) {
// 代码类似
}
跑了下结果如下:
goos: darwin
goarch: amd64
pkg: github.com/NewbMiao/Dig101-Go/concurrency/channel/schedule/orDone
BenchmarkOr-12 31815 38136 ns/op 9551 B/op 99 allocs/op
BenchmarkOrInReflect-12 55797 21755 ns/op 25232 B/op 112 allocs/op
可以看出,大量并发chan
场景下, 反射使用内存更多些,但速度更快。
推荐阅读
评论