我希望我的 go routine worker(下面代码中的 ProcessToDo()
)在关闭之前等到所有“排队”的工作都处理完。
worker 例程有一个“待办事项” channel (缓冲),工作通过该 channel 发送给它。它有一个“完成” channel 来告诉它开始关机。该文档说,如果满足多个选择, channel 上的选择将选择一个“伪随机值”......这意味着在所有缓冲工作完成之前触发关闭(返回)。
在下面的代码示例中,我希望打印所有 20 条消息...
package main
import (
"time"
"fmt"
)
func ProcessToDo(done chan struct{}, todo chan string) {
for {
select {
case work, ok := <-todo:
if !ok {
fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
return
}
fmt.Printf("todo: %q\n", work)
time.Sleep(100 * time.Millisecond)
case _, ok := <-done:
if ok {
fmt.Printf("Shutting down ProcessToDo - done message received!\n")
} else {
fmt.Printf("Shutting down ProcessToDo - done channel closed!\n")
}
close(todo)
return
}
}
}
func main() {
done := make(chan struct{})
todo := make(chan string, 100)
go ProcessToDo(done, todo)
for i := 0; i < 20; i++ {
todo <- fmt.Sprintf("Message %02d", i)
}
fmt.Println("*** all messages queued ***")
time.Sleep(1 * time.Second)
close(done)
time.Sleep(4 * time.Second)
}
最佳答案
done
您的情况下的 channel 是完全不必要的,因为您可以通过关闭 todo
来表示关闭 channel 本身。
并使用 for range
在 channel 上迭代,直到 channel 关闭且其缓冲区为空。
你应该有一个 done
channel,但这只是为了让 goroutine 本身可以发出它完成工作的信号,这样主 goroutine 可以继续或退出。
此变体等同于您的变体,更简单并且不需要 time.Sleep()
调用等待其他 goroutines(无论如何,这将是错误和不确定的)。在 Go Playground 上试用:
func ProcessToDo(done chan struct{}, todo chan string) {
for work := range todo {
fmt.Printf("todo: %q\n", work)
time.Sleep(100 * time.Millisecond)
}
fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
done <- struct{}{} // Signal that we processed all jobs
}
func main() {
done := make(chan struct{})
todo := make(chan string, 100)
go ProcessToDo(done, todo)
for i := 0; i < 20; i++ {
todo <- fmt.Sprintf("Message %02d", i)
}
fmt.Println("*** all messages queued ***")
close(todo)
<-done // Wait until the other goroutine finishes all jobs
}
另请注意,worker goroutine 应使用 defer
发出完成信号因此,如果 main goroutine 以某种意想不到的方式返回或出现 panic ,则不会卡在等待 worker 中。所以它应该像这样开始:
defer func() {
done <- struct{}{} // Signal that we processed all jobs
}()
您还可以使用 sync.WaitGroup
将主 goroutine 同步到 worker(等待它)。事实上,如果你计划使用多个 worker goroutines,那比从 done
中读取多个值更干净。 channel 。使用 WaitGroup
来表示完成也更简单因为它有一个 Done()
方法(这是一个函数调用)所以你不需要匿名函数:
defer wg.Done()
参见 JimB's anwser有关 WaitGroup
的完整示例.
使用 for range
如果你想使用多个 worker goroutines,这也是惯用的: channel 是同步的,所以你不需要任何额外的代码来同步对 todo
的访问。 channel 或从中收到的工作。如果你关闭 todo
channel 在main()
,这将正确地向所有 worker goroutine 发出信号。但是当然,所有排队的作业都只会被接收和处理一次。
现在采用使用 WaitGroup
的变体让主 goroutine 等待 worker(JimB 的回答):如果你想要超过 1 个 worker goroutine 怎么办;并发(很可能是并行)处理您的作业?
您唯一需要在代码中添加/更改的是:真正启动多个代码:
for i := 0; i < 10; i++ {
wg.Add(1)
go ProcessToDo(todo)
}
在不更改任何其他内容的情况下,您现在拥有一个正确的并发应用程序,它使用 10 个并发 goroutine 接收和处理您的作业。而且我们没有使用任何“丑陋”的time.Sleep()
(我们使用了一个但只是为了模拟慢速处理,而不是等待其他 goroutines),你不需要任何额外的同步。
关于go - 缓冲区为空后关闭 "worker"go routine,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32383063/