go - 缓冲区为空后关闭 "worker"go routine

标签 go concurrency shutdown channel goroutine

我希望我的 go routine worker(下面代码中的 ProcessToDo())在关闭之前等到所有“排队”的工作都处理完。

worker 例程有一个“待办事项” channel (缓冲),工作通过该 channel 发送给它。它有一个“完成” channel 来告诉它开始关机。该文档说,如果满足多个选择, channel 上的选择将选择一个“伪随机值”......这意味着在所有缓冲工作完成之前触发关闭(返回)。

在下面的代码示例中,我希望打印所有 20 条消息...

package main

import (
    "time"
    "fmt"
)


func ProcessToDo(done chan struct{}, todo chan string) {
    for {
        select {
        case work, ok := <-todo:
            if !ok {
                fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
                return
            }
            fmt.Printf("todo: %q\n", work)
            time.Sleep(100 * time.Millisecond)
        case _, ok := <-done:
            if ok {
                fmt.Printf("Shutting down ProcessToDo - done message received!\n")
            } else {
                fmt.Printf("Shutting down ProcessToDo - done channel closed!\n")
            }
            close(todo)
            return
        }
    }
}

func main() {

    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    time.Sleep(1 * time.Second)
    close(done)
    time.Sleep(4 * time.Second)
}

最佳答案

done您的情况下的 channel 是完全不必要的,因为您可以通过关闭 todo 来表示关闭 channel 本身。

并使用 for range在 channel 上迭代,直到 channel 关闭且其缓冲区为空。

你应该有一个 done channel,但这只是为了让 goroutine 本身可以发出它完成工作的信号,这样主 goroutine 可以继续或退出。

此变体等同于您的变体,更简单并且不需要 time.Sleep()调用等待其他 goroutines(无论如何,这将是错误和不确定的)。在 Go Playground 上试用:

func ProcessToDo(done chan struct{}, todo chan string) {
    for work := range todo {
        fmt.Printf("todo: %q\n", work)
        time.Sleep(100 * time.Millisecond)
    }
    fmt.Printf("Shutting down ProcessToDo - todo channel closed!\n")
    done <- struct{}{} // Signal that we processed all jobs
}

func main() {
    done := make(chan struct{})
    todo := make(chan string, 100)

    go ProcessToDo(done, todo)

    for i := 0; i < 20; i++ {
        todo <- fmt.Sprintf("Message %02d", i)
    }

    fmt.Println("*** all messages queued ***")
    close(todo)
    <-done // Wait until the other goroutine finishes all jobs
}

另请注意,worker goroutine 应使用 defer 发出完成信号因此,如果 main goroutine 以某种意想不到的方式返回或出现 panic ,则不会卡在等待 worker 中。所以它应该像这样开始:

defer func() {
    done <- struct{}{} // Signal that we processed all jobs
}()

您还可以使用 sync.WaitGroup 将主 goroutine 同步到 worker(等待它)。事实上,如果你计划使用多个 worker goroutines,那比从 done 中读取多个值更干净。 channel 。使用 WaitGroup 来表示完成也更简单因为它有一个 Done() 方法(这是一个函数调用)所以你不需要匿名函数:

defer wg.Done()

参见 JimB's anwser有关 WaitGroup 的完整示例.

使用 for range如果你想使用多个 worker goroutines,这也是惯用的: channel 是同步的,所以你不需要任何额外的代码来同步对 todo 的访问。 channel 或从中收到的工作。如果你关闭 todo channel 在main() ,这将正确地向所有 worker goroutine 发出信号。但是当然,所有排队的作业都只会被接收和处理一次。

现在采用使用 WaitGroup 的变体让主 goroutine 等待 worker(JimB 的回答):如果你想要超过 1 个 worker goroutine 怎么办;并发(很可能是并行)处理您的作业?

您唯一需要在代码中添加/更改的是:真正启动多个代码:

for i := 0; i < 10; i++ {
    wg.Add(1)
    go ProcessToDo(todo)
}

在不更改任何其他内容的情况下,您现在拥有一个正确的并发应用程序,它使用 10 个并发 goroutine 接收和处理您的作业。而且我们没有使用任何“丑陋”的time.Sleep() (我们使用了一个但只是为了模拟慢速处理,而不是等待其他 goroutines),你不需要任何额外的同步。

关于go - 缓冲区为空后关闭 "worker"go routine,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32383063/

相关文章:

Go 为外部包导出常量报告 "undefined"

戈兰;难以理解作为接收者的函数

linux - 为什么关机报错?

objective-c - 执行某些系统事件,Mac OS X

go - AWS API Gateway WebSockets [POST] @connections返回404未找到

从 Go 语言调用的 C# DLL(类库)函数

c# - 如何跨 AppDomain 边界传递 CancellationToken?

haskell - 什么时候可以保证 MVar 操作不间断?

java - 增加锁数量的最佳方法是什么?

windows-7 - 检测 Windows 关闭事件