我正在尝试遵循发布在 http://blog.golang.org/pipelines/bounded.go 的有界 goroutine 示例.我遇到的问题是,如果有更多的 worker 启动,那么要做的工作量就会增加,额外的 worker 永远不会被取消。其他一切似乎都有效,计算并记录了值,但是当我关闭 groups
channel 时,工作人员只是卡在 range 语句上。
我想我不明白的(在我的代码和示例代码中)是工作人员如何知道什么时候没有更多的工作要做并且他们应该退出?
更新
工作(即非工作)示例发布在 http://play.golang.org/p/T7zBCYLECp .它显示了 worker 的僵局,因为他们都在 sleep ,没有工作要做。我感到困惑的是,我认为示例代码会有同样的问题。
这是我目前使用的代码:
// Creates a pool of workers to do a bunch of computations
func computeAll() error {
done := make(chan struct{})
defer close(done)
groups, errc := findGroups(done)
// start a fixed number of goroutines to schedule with
const numComputers = 20
c := make(chan result)
var wg sync.WaitGroup
wg.Add(numComputers)
for i := 0; i < numComputers; i++ {
go func() {
compute(done, groups, c)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
// log the results of the computation
for r := range c { // log the results }
if err := <-errc; err != nil {
return err
}
return nil
}
这是用数据填充 channel 的代码:
// Retrieves the groups of data the must be computed
func findGroups(done <-chan struct{}) (<-chan model, <-chan error) {
groups := make(chan model)
errc := make(chan error, 1)
go func() {
// close the groups channel after find returns
defer close(groups)
group, err := //... code to get the group ...
if err == nil {
// add the group to the channel
select {
case groups <- group:
}
}
}()
return groups, errc
}
下面是读取 channel 进行计算的代码。
// Computes the results for the groups of data
func compute(done <-chan struct{}, groups <-chan model, c chan<- result) {
for group := range groups {
value := compute(group)
select {
case c <- result{value}:
case <-done:
return
}
}
}
最佳答案
因为您正在尝试从 errc
读取数据除非有错误,否则它是空的。
//编辑
computeAll()
将始终阻塞 <- errc
如果没有错误,另一种方法是使用类似的东西:
func computeAll() (err error) {
.........
select {
case err = <-errc:
default: //don't block
}
return
}
关于parallel-processing - 数据通道关闭时 Goroutines 不退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25515539/