我正在通过Udemy类(class)学习基础Go。在goroutines部分中,有一个节流示例,使我对 WaitGroup 的工作方式有所了解。
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func main() {
c1 := make(chan int)
c2 := make(chan int)
go populate(c1)
go fanOutIn(c1, c2)
for v := range c2 {
fmt.Println(v)
}
fmt.Println("about to exit")
}
func populate(c chan int) {
for i := 0; i < 100; i++ {
c <- i
}
close(c)
}
func fanOutIn(c1, c2 chan int) {
var wg sync.WaitGroup
const goroutines = 10
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func() {
for v := range c1 {
func(v2 int) {
c2 <- timeConsumingWork(v2)
}(v)
}
wg.Done()
}()
}
wg.Wait()
close(c2)
}
func timeConsumingWork(n int) int {
time.Sleep(time.Microsecond * time.Duration(rand.Intn(500)))
return n + rand.Intn(1000)
}
与我的理解不符的部分是在函数
fanOutIn
中,我们在其中设置了WaitGroup
和Add(10)
。为什么我要打印出100个值?只能将一个值(
i := 0
)放在c1
上,并且永远不会从 channel 中显式删除该值。然后,该代码命中wg.Done()
,并且 WaitGroup 队列减少到9,依此类推。根据我目前的理解,我希望看到
0 + rand.Intn(1000)
的10个值。
最佳答案
分离出来的函数内容如下(包括前面的go
和调用它的括号):
go func() {
for v := range c1 {
func(v2 int) {
c2 <- timeConsumingWork(v2)
}(v)
}
wg.Done()
}()
这段代码有点怪异和怪异。让我们进一步缩小它,丢弃
wg.Done
并仅保留for
循环本身:for v := range c1 {
func(v2 int) {
c2 <- timeConsumingWork(v2)
}(v)
}
有一个内部的未命名函数,在这里几乎没有用。我们可以在不更改程序行为的情况下将其丢弃,以获得:
for v := range c1 {
c2 <- timeConsumingWork(v)
}
最后是一个简单的循环。现在,一个关键问题是:您希望此循环进行多少次迭代? 注意:不一定是任何常数。也许用更好的方式来表达问题:该循环何时结束?
for
循环读取一个 channel 。当从 channel 读取表明没有更多数据,即 channel 已关闭并且其队列为空时,这种循环结束。 (请参阅the Go specification section on for
loops。)因此,此最里面的循环
for v := range c1
不会终止,直到关闭 channel c1
且队列中没有更多数据为止。该 channel 是通过以下 channel 创建的:c1 := make(chan int)
因此它没有队列,因此我们甚至无需考虑:它在
close(c1)
关闭后终止。 您现在应该查找关闭close
的c1
。 我们的亲戚在哪里?
这是关闭
c1
的地方:func populate(c chan int) {
for i := 0; i < 100; i++ {
c <- i
}
close(c)
}
我们以
c1
作为参数来调用它,因此其最终close(c)
关闭c1
。现在,您可以问:我们什么时候可以打到close
电话? 答案很明显:在循环中的i >= 100
之后,即在我们将100个值(分别从零到99)发送到c1
channel 之后。fanOutIn
要做的是剥离10个goroutine。 10个goroutine中的每一个都运行我在上面引用的第一个匿名函数。该匿名函数有一个循环,循环运行的次数不确定,直到循环c1
关闭为止。循环中的每次行程都会获取 channel 的值,因此,最初,如果十个goroutine在所有可用值之前都设法启动,那么所有十个goroutine将等待值。当生产者函数将一个值放入 channel 时,十个等待的goroutine中的一个将获取它并开始使用它。如果该goroutine需要很长时间才能返回自己的
for
循环的顶部,则另一个goroutine将采用下一个产生的值。因此,这里发生的事情是,多达十个产生的值通过 channel 传播到多达十个goroutine。1每个(多达十个)goroutine都使用其值花费一些平凡的时间,然后将final-product-value发送到 channel c2
并返回其自己的不确定for
循环的顶部。只有当生产者关闭了其 channel
c
(此处是我们的c1
)时,这十个goroutine才会看到一个封闭 channel 空队列,从而允许他们退出其for
循环。当他们确实退出其for
循环时,它们每个都会调用wg.Done()
(每个一次)并终止。因此,一旦
close(c1)
发生了(通过close(c)
中的populate
),最终所有这十个匿名goroutine都将调用wg.Done()
。届时,wg.Wait()
中的fanOutIn
将返回。这将调用close(c2)
并从fanOutIn
返回,并终止该goroutine。同时,在
main
中,我们使用for v := range c2
从c2
channel 读取。当十个goroutine中的任何一个将值写入for
时,此c2
循环将运行。仅当c2
本身关闭时,它才会退出(其队列也必须为空,但是c2
的队列长度为零)。因此,在关闭main
之前,for
不会继续经过c2
循环,这要等到wg.Wait()
返回后才能发生,直到发生十个wg.Done()
调用才发生,直到关闭c1
channel 才发生。这意味着,在
main
调用for
之前,populate
不能超过其自己的close(c)
循环,并且仅在生成正好100个值之后才会发生。1正如in comments below所讨论的,此处的短语可能很重要:我们并不真正知道有多少个goroutine会真正消耗值。很大程度上取决于每个goroutine要做多少工作,做什么工作以及Go运行时可用的CPU数量。
关于go - Goroutines节气门示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61719731/