Goroutines channel 和 "stopping short"

标签 go concurrency goroutine

我正在阅读/研究 Go Concurrency Patterns: Pipelines and cancellation ,但我无法理解Stopping short 部分。我们有以下功能:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}
func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // enough space for the unread inputs

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output.
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // Apparently if we had not set the merge out buffer size to 1
    // then we would have a hanging go routine. 
}

现在,如果您注意到 merge 中的 2 行,它表示我们使用 buffer 输出 chan大小 1,因为这对于未读输入来说已经足够了。但是,我几乎肯定我们应该分配一个 buffer 大小为 2 的 chan。根据此代码示例:

c := make(chan int, 2) // buffer size 2
c <- 1  // succeeds immediately
c <- 2  // succeeds immediately
c <- 3  // blocks until another goroutine does <-c and receives 1 

因为这部分暗示 buffer 大小为 3 的 chan 不会阻塞。谁能澄清/帮助我的理解?

最佳答案

程序向 channel out 发送两个值,并从 channel out 读取一个值。未收到其中一个值。

如果 channel 是无缓冲的(容量为 0),那么其中一个发送 goroutines 将阻塞直到程序退出。这是泄漏。

如果创建的 channel 容量为 1,则两个 goroutine 都可以发送到 channel 并退出。发送到 channel 的第一个值由 main 接收。第二个值保留在 channel 中。

如果 main 函数没有从 out channel 接收到值,则需要一个容量为 2 的 channel 来防止 goroutine 无限期阻塞。

关于Goroutines channel 和 "stopping short",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34583722/

相关文章:

golang十进制到十六进制转换错误

python - python 中限制线程的规范/标准方法

go - 在多个goroutine之间共享的Golang结构中,非共享成员是否需要互斥保护?

Go:再次将类型为 uuid.UUID (satori) 的 reflect.Value 转换回 uuid.UUID

http - 多次get参数请求只返回第一个参数

mongodb - 执行 mgo.Pipe 没有结果,包括 $out

java - 我如何在这里整合执行者?或者我该如何改进这个线程池?

c# - 带有 Attach() 的 LINQ To SQL 异常 : Cannot add an entity with a key that is already in use

Golang 程序未完成执行就挂起

go - 如何避免这个 golang 程序中的死锁?