我正在在线阅读管道教程并尝试构建一个像这样运行的阶段 --
- 在将传入事件发送到输出 channel 之前,以每批 10 个为一组对传入事件进行批处理
- 如果我们在 5 秒内没有看到 10 个事件,则合并我们收到的所有事件并发送它们,关闭输出 channel 并返回。
但是,我不知道第一个 select case 会是什么样子。尝试了很多东西但无法通过这个。 非常感谢任何指点!
func BatchEvents(inChan <- chan *Event) <- chan *Event {
batchSize := 10
comboEvent := Event{}
go func() {
defer close(out)
i = 0
for event := range inChan {
select {
case -WHAT GOES HERE?-:
if i < batchSize {
comboEvent.data = append(comboEvent.data, event.data)
i++;
} else {
out <- &comboEvent
// reset for next batch
comboEvent = Event{}
i=0;
}
case <-time.After(5 * time.Second):
// process whatever we have seen so far if the batch size isn't filled in 5 secs
out <- &comboEvent
// stop after
return
}
}
}()
return out
}
最佳答案
您的第一个选择案例应该来自该 channel ,而不是在 channel 上做一个范围,整个事情都在一个无限循环中。
func BatchEvents(inChan <-chan *Event) <-chan *Event {
batchSize := 10
comboEvent := Event{}
go func() {
defer close(out)
i = 0
for {
select {
case event, ok := <-inChan:
if !ok {
return
}
comboEvent.data = append(comboEvent.data, event.data)
i++
if i == batchSize {
out <- &comboEvent
// reset for next batch
comboEvent = Event{}
i = 0
}
case <-time.After(5 * time.Second):
// process whatever we have seen so far if the batch size isn't filled in 5 secs
if i > 0 {
out <- &comboEvent
}
// stop after
return
}
}
}()
return out
}
关于go - 如何使用 channel 对 golang 管道阶段中的项目进行批处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45872441/