asynchronous - 在golang中,如何编写一个为下一阶段引入延迟的流水线阶段?

标签 asynchronous go concurrency channel goroutine

我正在关注 https://blog.golang.org/pipelines文章实现了几个阶段。

我需要其中一个阶段在事件传递到管道的下一阶段之前引入几秒钟的延迟。

我对下面的代码的担忧是,它会在传递事件之前产生无限数量的 time.Sleep() 例程。有没有更好的方法来做到这一点?

谢谢!

func fooStage(inChan <- chan *Bar) (<- chan *Bar) {
    out := make(chan *Bar, 10000)
    go func() {
        defer close(out)
        wg := sync.WaitGroup{}
        for {
            select {
            case event, ok := <-inChan:
                if !ok {
                    // inChan closed
                    break
                }
                wg.Add(1)
                go func() {
                    time.Sleep(5 * time.Second)
                    out <- event
                    wg.Done()
                }()
            }
        }
        wg.Wait()
    }()
    return out
}

最佳答案

您可以使用另一个 channel 来限制您的循环能够创建的事件 goroutine 的数量。

const numRoutines = 10

func fooStage(inChan <-chan *Bar) <-chan *Bar {
    out := make(chan *Bar, 10000)
    routines := make(chan struct{}, numRoutines)
    go func() {
        defer close(out)
        wg := sync.WaitGroup{}
        for {
            select {
            case event, ok := <-inChan:
                if !ok {
                    // inChan closed
                    break
                }
                wg.Add(1)
                routines <- struct{}{}
                go func() {
                    time.Sleep(5 * time.Second)
                    out <- event
                    wg.Done()
                    <-routines
                }()
            }
        }
        wg.Wait()
    }()
    return out
}

关于asynchronous - 在golang中,如何编写一个为下一阶段引入延迟的流水线阶段?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46854891/

相关文章:

javascript - Node.js Promise 未按顺序执行

c# - Android - 异步任务问题

Go - 实例化结构并获取指向它的指针的 "&MyStruct{1, 2}"语法何时有用?

c# - 调用 EF 异步方法时异步任务卡住

json - 无法将 json 数据反序列化为结构

sql - 在 go 中结合 row.Scan 和 rows.Scan 接口(interface)?

iOS 如何限制 ASIHTTPRequest 并发连接数?

c++ - 隔离一个类的并发/非并发访问数据成员

jquery - Jquery的append()行为是异步的吗?

asynchronous - 使用 async.parallel 对事务进行 Sequelize