go - 序列化 goroutines(并行化但保证顺序)

标签 go parallel-processing

假设我们想并行处理一些计算,但我们必须保证结果的顺序与计算的顺序相同:

这可以通过例如:

https://play.golang.org/p/jQbo0EVLzvX

package main

import (
    "fmt"
    "time"
)

func main() {
    orderPutChans := make([]chan bool, 8)
    orderGetChans := make([]chan bool, 8)
    doneChans := make([]chan bool, 8)

    for i := 0; i < 8; i++ {
        orderPutChans[i] = make(chan bool, 1)
        orderGetChans[i] = make(chan bool)
        doneChans[i] = make(chan bool)
    }

    srcCh := make(chan int)
    dstCh := make(chan int)

    for i := 0; i < 8; i++ {
        go func(j int) {
            myGetCh := orderGetChans[j]
            nextGetCh := orderGetChans[(j+1) % 8]
            myPutCh := orderPutChans[j]
            nextPutCh := orderPutChans[(j+1) % 8]

            for {
                _ = <- myGetCh

                v, ok := <- srcCh

                if !ok {
                    k := (j + 1) % 8
                    if orderGetChans[k] != nil {
                            orderGetChans[k] <- true
                    }
                    orderGetChans[j] = nil

                    break
                }

                nextGetCh <- true

                time.Sleep(1000)

                v *= v

                _ = <- myPutCh

                dstCh <- v

                nextPutCh <- true
            }

            doneChans[j] <- true
        }(i)
    }

    go func() {
        for i := 0; i < 8; i++ {
            _ = <- doneChans[i]
        }
        close(dstCh)
    }()

    orderGetChans[0] <- true
    orderPutChans[0] <- true

    go func() {
        for i := 0; i < 100; i++ {
            srcCh <- i
        }
        close(srcCh)
    }()

    for vv := range dstCh {
        fmt.Println(vv)
    }
}

可以使用 channel 传递 channel 的读/写权限。代码比较乱,看起来不是很整洁。 Go 中是否有更简洁的方法来实现这一目标?

编辑: 我不是在要求“简单”的替换,例如使用 chan struct{}或使用 closedoneChans赞成doneChans[i] <- true .

编辑2:

一个更简单的方法(至少就代码而言)是使用 results数组,消费者将数据与索引(这将是 worker 的模数)一起发送,goroutines 将结果写入 results[j]然后让 WaitGroup 等到所有操作都完成(使用一批中的一批),然后遍历结果并将它们发送到目标 channel 。 (可能是因为虚假分享,效果不是很好?)

最佳答案

如果我理解正确,这是您使用“管道”样式的代码版本。管道中有多个步骤:

  1. 发送 src 值
  2. workers 在收到的 src 值中工作,发送到他们自己的结果 channel
  3. 将来自 workers 的结果 channel slice 合并为一个无序 channel
  4. 对无序合并 channel 中的无序值进行排序

这是代码,它使用您在对原始问题的编辑中提到的索引对样式。

type idxPair struct {
    idx, val int
}

func main() {
    // add a done channel, an ability to stop the world by closing this.
    done := make(chan struct{})
    defer close(done)

    // create srcChan, this will be where the values go into the pipeline
    srcCh := make(chan idxPair)

    // create a slice of result channels, one for each of the go workers
    const numWorkers = 8
    resChans := make([]<-chan idxPair, numWorkers)

    // waitgroup to wait for all the workers to stop
    var wg sync.WaitGroup
    wg.Add(numWorkers)

    // start the workers, passing them each the src channel,
    // collecting the result channels they return
    for i := 0; i < numWorkers; i++ {
        resChans[i]  = worker(done, &wg, srcCh)
    }

    // start a single goroutine to send values into the pipeline
    // all values are sent with an index, to be pieces back into order at the end.
    go func() {
        defer close(srcCh)
        for i := 1; i < 100; i++ {
            srcCh <- idxPair{idx: i, val: i}
        }
    }()

    // merge all the results channels into a single results channel
    // this channel is unordered.
    mergedCh := merge(done, resChans...)

    // order the values coming from the mergedCh according the the idxPair.idx field.
    orderedResults := order(100, mergedCh)

    // iterate over each of the ordered results
    for _, v := range orderedResults {
        fmt.Println(v)
    }
}

func order(len int, res <-chan idxPair) []int {
    results := make([]int, len)

    // collect all the values to order them
    for r := range res {
        results[r.idx] = r.val
    }

    return results
}

func worker(done <- chan struct{}, wg *sync.WaitGroup, src <-chan idxPair) <-chan idxPair {
    res := make(chan idxPair)

    go func() {
        defer wg.Done()
        defer close(res)
        sendValue := func(pair idxPair) {
            v := pair.val
            v *= v
            ip := idxPair{idx: pair.idx, val: v}
            select {
            case res <- ip:
            case <-done:
            }
        }

        for v := range src{
             sendValue(v)
        }
    }()

    return res
}


// example and explanation here: https://blog.golang.org/pipelines
func merge(done <-chan struct{}, cs ...<-chan idxPair) <-chan idxPair {
    var wg sync.WaitGroup
    out := make(chan idxPair)

    output := func(c <-chan idxPair) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

我认为这稍微干净一些,而不仅仅是“为了它而不同”的原因是:

  1. 您可以独立建模和实现每个阶段。 order 阶段可以轻松优化,以便在收到值时通过 channel 发送值等。
  2. 它的组合性更强;您可以对元素执行异步工作,并将排序留给其他人负责,而不是对存储在数组中的多个 channel 进行操作的大型方法。这促进了重用。

关于go - 序列化 goroutines(并行化但保证顺序),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51157337/

相关文章:

go:哪个 channel 端点在双向时必须关闭 channel ?

go - 在不同包中使用路径的便捷方式

string - 如何在 Golang 中解决(字符串和字节类型不匹配)

docker - 无法使用docker镜像在gitlab-ci中编译golang项目

Go去除重复代码的解决方案(defer, net/http)

asynchronous - fortran openmp 同步

python - Python 中的多处理 : execute two functions at the exact same time

linux - Shell脚本,并行和多线程?

python - 故意在python中制作一个孤儿进程

javascript - 了解 Node.JS async.parallel