go - golang 中的事件驱动模式

标签 go channel goroutine

我正在使用 golang 来实现一个简单的事件驱动的 worker。是这样的:

  go func() {
        for {
            select {
            case data := <-ch:
                time.Sleep(1)
                someGlobalMap[data.key] = data.value 
            }
        }
    }()

main 函数会创建几个 goroutines,每个 goroutines 都会做这样的事情:

ch <- data
fmt.Println(someGlobalMap[data.key])

如您所见,因为我的工作人员需要一些时间来完成工作,所以我的主要功能将得到零结果。我如何才能正确控制此工作流程?

最佳答案

编辑:我可能误读了你的问题,我看到你提到 main 将启动许多 producer goroutine。我认为它是许多消费者 goroutines,和一个生产者。将答案留在此处,以防它对其他寻找该模式的人有用,但要点仍然适用于您的情况。

因此,如果我对您的用例理解正确,您就不能指望在 channel 上发送并在之后立即阅读结果。您不知道 worker 何时会处理该发送,您需要在 goroutine 之间进行通信,这是通过 channel 完成的。假设只调用一个有返回值的函数在你的场景中不起作用,如果你真的需要发送给一个工作人员,然后阻塞直到你得到结果,你可以发送一个 channel 作为数据结构的一部分,然后阻塞 -发送后接收,即:

resCh := make(chan Result)
ch <- Data{key, value, resCh}
res := <- resCh

但您可能应该尝试将工作分解为独立步骤的管道,请参阅我在原始答案中链接到的博文。


原始答案,我认为这是一个单一的生产者 - 多个消费者/ worker 模式:

这是一种常见的模式,Go 的协程和 channel 语义非常适合这种模式。您需要牢记以下几点:

  • main 函数不会自动等待 goroutines 完成。如果在 main 中没有其他事情可做,则程序退出并且您没有结果。

  • 您使用的全局 map 不是线程安全的。您需要通过互斥锁来同步访问,但有更好的方法 - 使用已经同步的结果输出 channel 。

  • 您可以在 channel 上使用 for..range,并且可以在多个 goroutine 之间安全地共享 channel 。正如我们将看到的,这使得该模式编写起来非常优雅。

Playground :https://play.golang.org/p/WqyZfwldqp

有关 Go 管道和并发模式的更多信息,以介绍错误处理、提前取消等:https://blog.golang.org/pipelines

您提到的用例的注释代码:

// could be a command-line flag, a config, etc.
const numGoros = 10

// Data is a similar data structure to the one mentioned in the question.
type Data struct {
    key   string
    value int
}

func main() {
    var wg sync.WaitGroup

    // create the input channel that sends work to the goroutines
    inch := make(chan Data)
    // create the output channel that sends results back to the main function
    outch := make(chan Data)

    // the WaitGroup keeps track of pending goroutines, you can add numGoros
    // right away if you know how many will be started, otherwise do .Add(1)
    // each time before starting a worker goroutine.
    wg.Add(numGoros)
    for i := 0; i < numGoros; i++ {
        // because it uses a closure, it could've used inch and outch automaticaly,
        // but if the func gets bigger you may want to extract it to a named function,
        // and I wanted to show the directed channel types: within that function, you
        // can only receive from inch, and only send (and close) to outch.
        //
        // It also receives the index i, just for fun so it can set the goroutines'
        // index as key in the results, to show that it was processed by different
        // goroutines. Also, big gotcha: do not capture a for-loop iteration variable
        // in a closure, pass it as argument, otherwise it very likely won't do what
        // you expect.
        go func(i int, inch <-chan Data, outch chan<- Data) {
            // make sure WaitGroup.Done is called on exit, so Wait unblocks
            // eventually.
            defer wg.Done()

            // range over a channel gets the next value to process, safe to share
            // concurrently between all goroutines. It exits the for loop once
            // the channel is closed and drained, so wg.Done will be called once
            // ch is closed.
            for data := range inch {
                // process the data...
                time.Sleep(10 * time.Millisecond)
                outch <- Data{strconv.Itoa(i), data.value}
            }
        }(i, inch, outch)
    }

    // start the goroutine that prints the results, use a separate WaitGroup to track
    // it (could also have used a "done" channel but the for-loop would be more complex, with a select).
    var wgResults sync.WaitGroup
    wgResults.Add(1)
    go func(ch <-chan Data) {
        defer wgResults.Done()

        // to prove it processed everything, keep a counter and print it on exit
        var n int
        for data := range ch {
            fmt.Println(data.key, data.value)
            n++
        }

        // for fun, try commenting out the wgResults.Wait() call at the end, the output
        // will likely miss this line.
        fmt.Println(">>> Processed: ", n)
    }(outch)

    // send work, wherever that comes from...
    for i := 0; i < 1000; i++ {
        inch <- Data{"main", i}
    }

    // when there's no more work to send, close the inch, so the goroutines will begin
    // draining it and exit once all values have been processed.
    close(inch)

    // wait for all goroutines to exit
    wg.Wait()

    // at this point, no more results will be written to outch, close it to signal
    // to the results goroutine that it can terminate.
    close(outch)

    // and wait for the results goroutine to actually exit, otherwise the program would
    // possibly terminate without printing the last few values.
    wgResults.Wait()
}

在现实生活中,事先不知道工作量, channel 内的关闭可能来自例如一个 SIGINT 信号。只需确保在 channel 关闭后没有代码路径可以发送工作,因为那样会出现 panic 。

关于go - golang 中的事件驱动模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37643931/

相关文章:

go - 如何在 Go 中生成可变长度的随机数

regex - 使用mongo的原始包进行部分匹配

json - 使用 Golang 从 DuckDuckGo 解码 JSON

java - 使用java通过远程服务器执行powershell命令

concurrency - 匿名结构和空结构

go - 为什么在使用 select 并顺序将值输入 2 个 channel 时所有 goroutine 都处于休眠状态?

go - 实现 worker 功能的管道

go - 如何在golang中编写一个函数来处理两种类型的输入数据

Netty 的 `sync` 与 `syncUninterruptibly`

go - 为什么从零 channel 读取会增加Goroutine的数量?