go - 如何使并行泛型去处理?

标签 go goroutine

我有以下功能:

func myrun(entries []WhatEverType) {
    for i := range entries {
        dotreatment(entries[i])
    } 
}

我想对 dotreatment 进行并行调用,我尝试了以下方法:

    func myrunMT(entries []WhatEverType) {
        var wg sync.WaitGroup
        stopped := false
        threads := 5 //number of threads could be argument
        com := make(chan WhatEverType, 100) //size of chan could be argument
        wg.Add(threads)
        for i := 0; i < threads; i++ {
            go func() {
                for !stopped || len(com) {
                    select {
                        case entry := <-com:
                            dotreatment(entry) //lock if necessary
                        case time.After(100*time.Millisecond):
                    }
                }
                wg.Done()
            }()
        }
        for _, entry := range entries {
            com <- entry
        }
        stopped = true
        wg.Wait()
    }

有没有更好的方法呢?特别是我想避免通过 chan 发送所有条目,并且只在 go 例程之间使用共享索引。

最佳答案

首先,您的解决方案存在数据竞争。您正在从多个 goroutine 中读取和修改 stopped 变量。

一个简单的解决方案可能是划分传递的 slice 的索引范围,并让多个 goroutine 处理不同的索引范围。这就是它的样子:

func process(ms []My) {
    workers := 5
    count := len(ms) / workers
    if count*workers < len(ms) {
        count++
    }

    wg := &sync.WaitGroup{}
    for idx := 0; idx < len(ms); {
        wg.Add(1)
        idx2 := idx + count
        if idx2 > len(ms) {
            idx2 = len(ms)
        }
        ms2 := ms[idx:idx2]
        idx = idx2
        go func() {
            defer wg.Done()
            for i := range ms2 {
                handle(&ms2[i])
            }
        }()
    }
    wg.Wait()
}

func handle(m *My) {}

对于 worker goroutines 的数量你可以使用 runtime.GOMAXPROCS(),就好像处理条目不涉及 IO 操作(或等待 goroutine 之外的东西)一样,不需要Go 运行时管理的 goroutines 比那些可以主动运行的 goroutines 多:

workers := runtime.GOMAXPROCS(0)

请注意,尽管此解决方案不涉及通过 channel 发送条目,但如果一个(某些)goroutine 较早完成,则 CPU 利用率可能会在最后下降(当较少的 goroutine 有工作要做时)。

生产者-消费者模型的优点是所有的 worker goroutine 将平等地工作直到最后。但是,是的,通信开销可能不可忽略。一个是否比另一个更好取决于每个条目需要完成的工作量。

一个改进的版本可以混合 2:你可以通过 channel 发送更小的 slice ,更小的索引范围,例如每批 100 个条目。与第一种解决方案相比,这可以减少空闲时间,还可以减少通信开销,因为条目是通过 channel 单独发送的,因此发送的值仅为总数的百分之一。

这是这个改进的混合版本的示例实现:

func process(ms []My) {
    workers := runtime.GOMAXPROCS(0)
    // 100 jobs per worker average:
    count := len(ms) / workers / 100
    if count < 1 {
        count = 1
    }

    ch := make(chan []My, workers*2) // Buffer size scales with # of workers

    wg := &sync.WaitGroup{}

    // Start workers
    wg.Add(workers)
    for i := 0; i < workers; i++ {
        go func() {
            defer wg.Done()
            for ms2 := range ch {
                for j := range ms2 {
                    handle(&ms2[j])
                }
            }
        }()
    }

    // Send jobs:
    for idx := 0; idx < len(ms); {
        idx2 := idx + count
        if idx2 > len(ms) {
            idx2 = len(ms)
        }
        ch <- ms[idx:idx2]
        idx = idx2
    }

    // Jobs sent, close channel:
    close(ch)

    // Wait workers to finish processing all jobs:
    wg.Wait()
}

请注意,没有 stopping 变量来表示完成。相反,我们在每个 goroutine 的 channel 上使用了 for range,因为它在 channel 上进行范围直到 channel 关闭,并且它对于并发使用是安全的。一旦 channel 关闭并且 goroutines 处理了 channel 上发送的所有作业,它们就会终止,整个处理算法也会终止(而不是更早 - 这意味着所有作业都将被处理)。

关于go - 如何使并行泛型去处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41802835/

相关文章:

go - 在 golang 中,我的 go 例程使用了所有内核,但每个内核只使用了 50% 到 75%

mongodb - 无法将类型 bson.Raw 转换为 BSON 文档 : length read exceeds number of bytes available

go - len 方法未定义

go - 如何使用 Golang 开发 Google Analytics Reporting API v4

multithreading - 带有 goroutines 的 Golang 守护进程不会停止执行

go - 为什么我的代码会导致停顿或竞争条件?

go - Tcp连接监听器中延迟的目的是什么

go - 为什么 make([]int, 14) 中有 runtime.morestack 而 make([]int, 13) 中没有?

Goroutine 在 Windows 和 Linux 上的行为不同

http - panic : runtime error: invalid memory address or nil pointer dereference with bigger data