我有多个 channel c1、c2、c3、c4 ...,如何将这些 channel 中的所有数据收集到一个 channel 中? 我的代码:
package main
import (
"fmt"
"sync"
)
func putToChannel(c chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 1; i < 6; i++ {
c <- i
}
}
func main() {
c := make(chan int, 15)
c1 := make(chan int, 5)
c2 := make(chan int, 5)
c3 := make(chan int, 5)
go func(){c <- <-c1}()
go func(){c <- <-c2}()
go func(){c <- <-c3}()
wg := new(sync.WaitGroup)
wg.Add(3)
go putToChannel(c1, wg)
go putToChannel(c2, wg)
go putToChannel(c3, wg)
wg.Wait()
close(c)
for i := range c {
fmt.Println("Receive:", i)
}
fmt.Println("Finish")
}
我想将 c1、c2 ... 到 c 的所有数据组合在一起,但它不起作用
最佳答案
This article有一篇关于如何进行 channel “扇入”(包括短暂停止)的精彩文章。
这些行有问题:
go func(){c <- <-c1}()
go func(){c <- <-c2}()
go func(){c <- <-c3}()
其中每一个都将从 cx
接收一个值 channel 并将该值发送到 c
.
您需要一个如下所示的方法;
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
此方法依赖于以下事实: channel cs...
正在传递给 merge
当没有更多值要发送时关闭。
这意味着您需要更新您的 putToChannel
方法还
func putToChannel(c chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
defer close(c)
for i := 1; i < 6; i++ {
c <- i
}
}
最后一件值得注意的事情是,一般来说;尝试将创建并发送到 channel 的函数和关闭 channel 的函数封装为同一个函数。这意味着您永远不会尝试在封闭的 channel 上发送。
而不是:
c1 := make(chan int, 5)
go putToChannel(c1, wg)
你可以做到;
func generator() (<-chan int) {
c := make(chan int, 5)
go func() {
for i := 1; i < 6; i++ {
c <- i
}
close(c)
}()
return c
}
您的主要方法将类似于:
func main() {
var cs []<-chan int
cs = append(cs, generator())
cs = append(cs, generator())
cs = append(cs, generator())
c := merge(cs...)
for v := range c {
fmt.Println(v)
}
}
关于go - 扇入 channel 至单 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51238972/