parallel-processing - 数据通道关闭时 Goroutines 不退出

标签 parallel-processing go goroutine

我正在尝试遵循发布在 http://blog.golang.org/pipelines/bounded.go 的有界 goroutine 示例.我遇到的问题是,如果有更多的 worker 启动,那么要做的工作量就会增加,额外的 worker 永远不会被取消。其他一切似乎都有效,计算并记录了值,但是当我关闭 groups channel 时,工作人员只是卡在 range 语句上。

我想我不明白的(在我的代码和示例代码中)是工作人员如何知道什么时候没有更多的工作要做并且他们应该退出?

更新

工作(即非工作)示例发布在 http://play.golang.org/p/T7zBCYLECp .它显示了 worker 的僵局,因为他们都在 sleep ,没有工作要做。我感到困惑的是,我认为示例代码会有同样的问题。

这是我目前使用的代码:

// Creates a pool of workers to do a bunch of computations
func computeAll() error {
    done := make(chan struct{})
    defer close(done)

    groups, errc := findGroups(done)

    // start a fixed number of goroutines to schedule with
    const numComputers = 20     
    c := make(chan result)
    var wg sync.WaitGroup
    wg.Add(numComputers)
    for i := 0; i < numComputers; i++ {
        go func() {
            compute(done, groups, c)
            wg.Done()
        }()
    }

    go func() {
        wg.Wait()
        close(c)
    }()

    // log the results of the computation
    for r := range c { // log the results }

    if err := <-errc; err != nil {
        return err
    }

    return nil
}

这是用数据填充 channel 的代码:

// Retrieves the groups of data the must be computed
func findGroups(done <-chan struct{}) (<-chan model, <-chan error) {
    groups := make(chan model)
    errc := make(chan error, 1)
    go func() {
        // close the groups channel after find returns
        defer close(groups)

        group, err := //... code to get the group ...
        if err == nil {
            // add the group to the channel
            select {
                case groups <- group:
            }
        }
    }()

    return groups, errc
}

下面是读取 channel 进行计算的代码。

// Computes the results for the groups of data
func compute(done <-chan struct{}, groups <-chan model, c chan<- result) {
    for group := range groups {
        value := compute(group)

        select {
        case c <- result{value}:
        case <-done:
            return
        }
    }
}

最佳答案

因为您正在尝试从 errc 读取数据除非有错误,否则它是空的。

//编辑

computeAll()将始终阻塞 <- errc如果没有错误,另一种方法是使用类似的东西:

func computeAll() (err error) {
    .........
    select {
    case err = <-errc:
    default: //don't block
    }
    return
}

关于parallel-processing - 数据通道关闭时 Goroutines 不退出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25515539/

相关文章:

c++ - MPI:进程 0 执行其代码两次

go - 用户 : Current not implemented on linux/amd64 with golang using docker scratch

go - 为什么即使有锁,GO 也会出现 'concurrent map writes' panic ?

go - 如何等待一组goroutines?

Java 8 : How can I convert a for loop to run in parallel?

c++ - parallel_reduce on double 返回不正确的结果

go - 使用嵌入式结构构造结构文字

go - 在 Go 中使用嵌入式类型的基

Goroutine实现疑惑

c# - 调用许多网络服务的最佳方式?