go - Goroutines节气门示例

标签 go wait goroutine

我正在通过Udemy类(class)学习基础Go。在goroutines部分中,有一个节流示例,使我对 WaitGroup 的工作方式有所了解。

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

func main() {
    c1 := make(chan int)
    c2 := make(chan int)

    go populate(c1)

    go fanOutIn(c1, c2)

    for v := range c2 {
        fmt.Println(v)
    }

    fmt.Println("about to exit")
}

func populate(c chan int) {
    for i := 0; i < 100; i++ {
        c <- i
    }
    close(c)
}

func fanOutIn(c1, c2 chan int) {
    var wg sync.WaitGroup
    const goroutines = 10
    wg.Add(goroutines) 
    for i := 0; i < goroutines; i++ {
        go func() {
            for v := range c1 {
                func(v2 int) {
                    c2 <- timeConsumingWork(v2)
                }(v)
            }
            wg.Done()
        }()
    }
    wg.Wait()
    close(c2)
}

func timeConsumingWork(n int) int {
    time.Sleep(time.Microsecond * time.Duration(rand.Intn(500)))
    return n + rand.Intn(1000)
}

与我的理解不符的部分是在函数fanOutIn中,我们在其中设置了WaitGroupAdd(10)

为什么我要打印出100个值?只能将一个值(i := 0)放在c1上,并且永远不会从 channel 中显式删除该值。然后,该代码命中wg.Done(),并且 WaitGroup 队列减少到9,依此类推。

根据我目前的理解,我希望看到0 + rand.Intn(1000)的10个值。

最佳答案

分离出来的函数内容如下(包括前面的go和调用它的括号):

go func() {
    for v := range c1 {
        func(v2 int) {
            c2 <- timeConsumingWork(v2)
        }(v)
    }
    wg.Done()
}()

这段代码有点怪异和怪异。让我们进一步缩小它,丢弃wg.Done并仅保留for循环本身:

for v := range c1 {
    func(v2 int) {
        c2 <- timeConsumingWork(v2)
    }(v)
}

有一个内部的未命名函数,在这里几乎没有用。我们可以在不更改程序行为的情况下将其丢弃,以获得:

for v := range c1 {
    c2 <- timeConsumingWork(v)
}

最后是一个简单的循环。现在,一个关键问题是:您希望此循环进行多少次迭代? 注意:不一定是任何常数。也许用更好的方式来表达问题:该循环何时结束?
for循环读取一个 channel 。当从 channel 读取表明没有更多数据,即 channel 已关闭并且其队列为空时,这种循环结束。 (请参阅the Go specification section on for loops。)

因此,此最里面的循环for v := range c1不会终止,直到关闭 channel c1且队列中没有更多数据为止。该 channel 是通过以下 channel 创建的:
c1 := make(chan int)

因此它没有队列,因此我们甚至无需考虑:它在close(c1)关闭后终止。 您现在应该查找关闭closec1

我们的亲戚在哪里?

这是关闭c1的地方:

func populate(c chan int) {
    for i := 0; i < 100; i++ {
        c <- i
    }
    close(c)
}

我们以c1作为参数来调用它,因此其最终close(c)关闭c1。现在,您可以问:我们什么时候可以打到close电话? 答案很明显:在循环中的i >= 100之后,即在我们将100个值(分别从零到99)发送到c1 channel 之后。
fanOutIn要做的是剥离10个goroutine。 10个goroutine中的每一个都运行我在上面引用的第一个匿名函数。该匿名函数有一个循环,循环运行的次数不确定,直到循环c1关闭为止。循环中的每次行程都会获取 channel 的值,因此,最初,如果十个goroutine在所有可用值之前都设法启动,那么所有十个goroutine将等待值。

当生产者函数将一个值放入 channel 时,十个等待的goroutine中的一个将获取它并开始使用它。如果该goroutine需要很长时间才能返回自己的for循环的顶部,则另一个goroutine将采用下一个产生的值。因此,这里发生的事情是,多达十个产生的值通过 channel 传播到多达十个goroutine。1每个(多达十个)goroutine都使用其值花费一些平凡的时间,然后将final-product-value发送到 channel c2并返回其自己的不确定for循环的顶部。

只有当生产者关闭了其 channel c(此处是我们的c1)时,这十个goroutine才会看到一个封闭 channel 空队列,从而允许他们退出其for循环。当他们确实退出其for循环时,它们每个都会调用wg.Done()(每个一次)并终止。

因此,一旦close(c1)发生了(通过close(c)中的populate),最终所有这十个匿名goroutine都将调用wg.Done()。届时,wg.Wait()中的fanOutIn将返回。这将调用close(c2)并从fanOutIn返回,并终止该goroutine。

同时,在main中,我们使用for v := range c2c2 channel 读取。当十个goroutine中的任何一个将值写入for时,此c2循环将运行。仅当c2本身关闭时,它才会退出(其队列也必须为空,但是c2的队列长度为零)。因此,在关闭main之前,for不会继续经过c2循环,这要等到wg.Wait()返回后才能发生,直到发生十个wg.Done()调用才发生,直到关闭c1 channel 才发生。

这意味着,在main调用for之前,populate不能超过其自己的close(c)循环,并且仅在生成正好100个值之后才会发生。

1正如in comments below所讨论的,此处的短语可能很重要:我们并不真正知道有多少个goroutine会真正消耗值。很大程度上取决于每个goroutine要做多少工作,做什么工作以及Go运行时可用的CPU数量。

关于go - Goroutines节气门示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61719731/

相关文章:

go - 同时来自列表的多个随机元素

go - 在Go应用程序中使用默认凭据时出现GCP权限问题

xml - 在 golang 中使用 XMLNS 声明编码/解码 XML 根 token

go - 是否可以延迟 goroutine?

asynchronous - 循环结果时 Golang Chan 挂了

dictionary - Go:从 map 弹出

java - notifyAll() 不唤醒进程

等待

java - 启动Chrome,然后等待其关闭

go - 函数退出后,例程如何从调用函数访问局部变量?