go - 如何使用 channel 对 golang 管道阶段中的项目进行批处理?

标签 go concurrency pipeline channel

我正在在线阅读管道教程并尝试构建一个像这样运行的阶段 --

  1. 在将传入事件发送到输出 channel 之前,以每批 10 个为一组对传入事件进行批处理
  2. 如果我们在 5 秒内没有看到 10 个事件,则合并我们收到的所有事件并发送它们,关闭输出 channel 并返回。

但是,我不知道第一个 select case 会是什么样子。尝试了很多东西但无法通过这个。 非常感谢任何指点!

func BatchEvents(inChan <- chan *Event) <- chan *Event {
    batchSize := 10
    comboEvent := Event{}
    go func() {
        defer close(out)
        i = 0
        for event := range inChan {
            select {
            case -WHAT GOES HERE?-:
                if i < batchSize {
                    comboEvent.data = append(comboEvent.data, event.data)
                    i++;
                } else {
                    out <- &comboEvent
                    // reset for next batch
                    comboEvent = Event{}
                    i=0;
                }
            case <-time.After(5 * time.Second):
                // process whatever we have seen so far if the batch size isn't filled in 5 secs
                out <- &comboEvent
                // stop after
                return
            }
        }
    }()
    return out
}

最佳答案

您的第一个选择案例应该来自该 channel ,而不是在 channel 上做一个范围,整个事情都在一个无限循环中。

func BatchEvents(inChan <-chan *Event) <-chan *Event {
    batchSize := 10
    comboEvent := Event{}
    go func() {
        defer close(out)
        i = 0
        for {
            select {
            case event, ok := <-inChan:
                if !ok {
                    return
                }
                comboEvent.data = append(comboEvent.data, event.data)
                i++
                if i == batchSize {
                    out <- &comboEvent
                    // reset for next batch
                    comboEvent = Event{}
                    i = 0
                }
            case <-time.After(5 * time.Second):
                // process whatever we have seen so far if the batch size isn't filled in 5 secs
                if i > 0 {
                    out <- &comboEvent
                }
                // stop after
                return
            }
        }
    }()
    return out
}

关于go - 如何使用 channel 对 golang 管道阶段中的项目进行批处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45872441/

相关文章:

generics - 管道运算符(operator)拒绝工作

go - 迭代结构集合( slice )的通用方法

go - 如何在 go 中模拟函数?

java - 为什么由 ScheduledExecutorService.schedule() 启动的线程永远不会终止?

java - Go 的并发 Java 示例

list - 是否可以选择管道输出插入 Elixir 函数 args 的位置?

loops - 如何创建枚举并对其进行迭代

javascript - 使用 go 处理文件上传

ios - 当我调用 glFinish() 时,这是否意味着 OpenGL ES 会忽略进一步的调用,直到我调用 -setCurrentContext :?

shell - 仅使用重定向和管道在 Unix shell 中是否可以实现无限循环?