我正在阅读/研究 Go Concurrency Patterns: Pipelines and cancellation ,但我无法理解Stopping short 部分。我们有以下功能:
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int, 1) // enough space for the unread inputs
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := gen(2, 3)
// Distribute the sq work across two goroutines that both read from in.
c1 := sq(in)
c2 := sq(in)
// Consume the first value from output.
out := merge(c1, c2)
fmt.Println(<-out) // 4 or 9
return
// Apparently if we had not set the merge out buffer size to 1
// then we would have a hanging go routine.
}
现在,如果您注意到 merge
中的 2
行,它表示我们使用 buffer
输出 chan
大小 1,因为这对于未读输入来说已经足够了。但是,我几乎肯定我们应该分配一个 buffer
大小为 2 的 chan
。根据此代码示例:
c := make(chan int, 2) // buffer size 2
c <- 1 // succeeds immediately
c <- 2 // succeeds immediately
c <- 3 // blocks until another goroutine does <-c and receives 1
因为这部分暗示 buffer
大小为 3 的 chan
不会阻塞。谁能澄清/帮助我的理解?
最佳答案
程序向 channel out
发送两个值,并从 channel out
读取一个值。未收到其中一个值。
如果 channel 是无缓冲的(容量为 0),那么其中一个发送 goroutines 将阻塞直到程序退出。这是泄漏。
如果创建的 channel 容量为 1,则两个 goroutine 都可以发送到 channel 并退出。发送到 channel 的第一个值由 main
接收。第二个值保留在 channel 中。
如果 main 函数没有从 out
channel 接收到值,则需要一个容量为 2 的 channel 来防止 goroutine 无限期阻塞。
关于Goroutines channel 和 "stopping short",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34583722/