Go:说说fanIn和fanOut模式

Go语言精选

共 7931字,需浏览 16分钟

 ·

2021-09-13 20:16

文章目录

  • fanIn

    • 协程版

    • 递归版

    • 反射版

  • fanOut

    • 同步版

    • 协程异步版

    • 反射版


今天回顾下常用的两种channel应用模式: fanInfanOut,

分别对应了,对一组相同类型chan的合并和广播。

fanIn

将全部输入chan都聚合到一个out chan中,在全部聚合完成后,关闭out chan.

协程版

func fanIn(chans ...<-chan interface{}) <-chan interface{} {
 out := make(chan interface{})

 go func() {
  var wg sync.WaitGroup
  wg.Add(len(chans))
  for _, ch := range chans {
   go func(ch <-chan interface{}) {
    for v := range ch {
     out <- v
    }
    wg.Done()
   }(ch)
  }
  // 等待协程全部结束
  wg.Wait()
  close(out)
 }()
 return out
}

这里用waitGroup是防止关闭out时还有写入(out <- v),避免panic

递归版

2 分递归并合并。

其中合并mergeTwo主要用了nil chan对读写均阻塞。

chan关闭时,设置为nil,阻塞读取。

func fanInRecur(chans ...<-chan interface{}) <-chan interface{} {
 switch len(chans) {
 case 0:
  c := make(chan interface{})
  close(c)
  // 无可聚合chan,返回一个已关闭chan,可读不可写
  return c
 case 1:
  return chans[0]
 case 2:
  return mergeTwo(chans[0], chans[1])
 default:
  // 一分为二,递归
  m := len(chans) / 2
  return mergeTwo(
   fanInRecur(chans[:m]...),
   fanInRecur(chans[m:]...))
 }
}

func mergeTwo(a, b <-chan interface{}) <-chan interface{} {
 c := make(chan interface{})
 go func() {
  defer close(c)
  for a != nil || b != nil { // 只要还有可读的chan
   select {
   case v, ok := <-a:
    if !ok { // a 已关闭,设置为nil
     a = nil
     continue
    }
    c <- v
   case v, ok := <-b:
    if !ok { // b 已关闭,设置为nil
     b = nil
     continue
    }
    c <- v
   }
  }
 }()
 return c
}

反射版

利用reflect.SelectCase构造批量可Select的发送chan

func fanInReflect(chans ...<-chan interface{}) <-chan interface{} {
 out := make(chan interface{})
 go func() {
  defer close(out)
  // 构造SelectCase slice
  var cases []reflect.SelectCase
  for _, c := range chans {
   cases = append(cases, reflect.SelectCase{
    Dir:  reflect.SelectRecv,
    Chan: reflect.ValueOf(c),
   })
  }

  // 循环,从cases中选择一个可用的
  for len(cases) > 0 {
   i, v, ok := reflect.Select(cases)
   if !ok {
    // 此channel已经close, 从切片移除
    cases = append(cases[:i], cases[i+1:]...)
    continue
   }
   out <- v.Interface()
  }
 }()
 return out
}

附上压测数据

性能对比

fanOut

同步版

最直观的方式,直接向每一个chan都同步发送一遍 返回前关闭这组chan, 即不再写入

func fanOut(ch <-chan interface{}, out []chan interface{}) {
 go func() {
  defer func() { // 退出时关闭所有的输出chan
   for i := range out {
    close(out[i])
   }
  }()

  for v := range ch { // 从输入chan中读取数据
   v := v
   for i := range out {
    i := i
    out[i] <- v // 放入到输出chan中,同步方式
   }
  }
 }()
}

协程异步版

发送这里用起协程的方式,实现异步,发送操作耗时情况下无需阻塞等待

可是有个问题,不知道你看出来没。

func fanOut(ch <-chan interface{}, out []chan interface{}) {
 go func() {
  defer func() { // 退出时关闭所有的输出chan
   for i := range out {
    close(out[i])
   }
  }()

  for v := range ch { // 从输入chan中读取数据
   v := v
   for i := range out {
    i := i
    // 协程异步
    go func(){}
      out[i] <- v
    }()
   }
  }
 }()
}

乍一看好像没什么问题, 但退出时关闭时,很可能发送的协程写入还没完成,

毕竟这里out之前写入的要有人读才能继续写。

这里加waitGroup可以等待全部发送完毕在关闭

func fanOutAsync(ch <-chan interface{}, out []chan interface{}) {
 go func() {
  var wg sync.WaitGroup
  defer func() { // 退出时关闭所有的输出chan
   wg.Wait()
   for i := range out {
    close(out[i])
   }
  }()

  for v := range ch { // 从输入chan中读取数据
   v := v
   for i := range out {
    i := i
    wg.Add(1)
    go func() { // 异步,避免一个out阻塞的时候影响其他out
     out[i] <- v
     wg.Done()
    }()
   }
  }
 }()
}

反射版

构造一票chan send case, 遍历select,发送完成的将其置为nil阻塞,避免再次发送

不得不说,nil chan出镜率很高啊

func fanOutReflect(ch <-chan interface{}, out []chan interface{}) {
 go func() {
  defer func() { // 退出时关闭所有的输出chan
   for i := range out {
    close(out[i])
   }
  }()
  cases := make([]reflect.SelectCase, len(out))
  // 构造SelectCase slice
  for i := range cases {
   cases[i].Dir = reflect.SelectSend
  }
  for v := range ch {
   v := v
   // 先完成send case构造
   for i := range cases {
    cases[i].Chan = reflect.ValueOf(out[i])
    cases[i].Send = reflect.ValueOf(v)
   }
   // 遍历select
   for range cases {
    chosen, _, _ := reflect.Select(cases)
    // 已发送过,用nil阻塞,避免再次发送
    cases[chosen].Chan = reflect.ValueOf(nil)
   }
  }
 }()
}

附上压测数据

性能对比

具体测试代码详见:concurrency[1]

参考资料

[1]

concurrency: https://github.com/NewbMiao/Dig101-Go/tree/master/concurrency/channel/schedule



推荐阅读


福利

我为大家整理了一份从入门到进阶的Go学习资料礼包,包含学习建议:入门看什么,进阶看什么。关注公众号 「polarisxu」,回复 ebook 获取;还可以回复「进群」,和数万 Gopher 交流学习。


浏览 55
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报