我有以下功能:
func myrun(entries []WhatEverType) {
for i := range entries {
dotreatment(entries[i])
}
}
我想对 dotreatment 进行并行调用,我尝试了以下方法:
func myrunMT(entries []WhatEverType) {
var wg sync.WaitGroup
stopped := false
threads := 5 //number of threads could be argument
com := make(chan WhatEverType, 100) //size of chan could be argument
wg.Add(threads)
for i := 0; i < threads; i++ {
go func() {
for !stopped || len(com) {
select {
case entry := <-com:
dotreatment(entry) //lock if necessary
case time.After(100*time.Millisecond):
}
}
wg.Done()
}()
}
for _, entry := range entries {
com <- entry
}
stopped = true
wg.Wait()
}
有没有更好的方法呢?特别是我想避免通过 chan 发送所有条目,并且只在 go 例程之间使用共享索引。
最佳答案
首先,您的解决方案存在数据竞争。您正在从多个 goroutine 中读取和修改 stopped
变量。
一个简单的解决方案可能是划分传递的 slice 的索引范围,并让多个 goroutine 处理不同的索引范围。这就是它的样子:
func process(ms []My) {
workers := 5
count := len(ms) / workers
if count*workers < len(ms) {
count++
}
wg := &sync.WaitGroup{}
for idx := 0; idx < len(ms); {
wg.Add(1)
idx2 := idx + count
if idx2 > len(ms) {
idx2 = len(ms)
}
ms2 := ms[idx:idx2]
idx = idx2
go func() {
defer wg.Done()
for i := range ms2 {
handle(&ms2[i])
}
}()
}
wg.Wait()
}
func handle(m *My) {}
对于 worker goroutines 的数量你可以使用 runtime.GOMAXPROCS()
,就好像处理条目不涉及 IO 操作(或等待 goroutine 之外的东西)一样,不需要Go 运行时管理的 goroutines 比那些可以主动运行的 goroutines 多:
workers := runtime.GOMAXPROCS(0)
请注意,尽管此解决方案不涉及通过 channel 发送条目,但如果一个(某些)goroutine 较早完成,则 CPU 利用率可能会在最后下降(当较少的 goroutine 有工作要做时)。
生产者-消费者模型的优点是所有的 worker goroutine 将平等地工作直到最后。但是,是的,通信开销可能不可忽略。一个是否比另一个更好取决于每个条目需要完成的工作量。
一个改进的版本可以混合 2:你可以通过 channel 发送更小的 slice ,更小的索引范围,例如每批 100 个条目。与第一种解决方案相比,这可以减少空闲时间,还可以减少通信开销,因为条目是通过 channel 单独发送的,因此发送的值仅为总数的百分之一。
这是这个改进的混合版本的示例实现:
func process(ms []My) {
workers := runtime.GOMAXPROCS(0)
// 100 jobs per worker average:
count := len(ms) / workers / 100
if count < 1 {
count = 1
}
ch := make(chan []My, workers*2) // Buffer size scales with # of workers
wg := &sync.WaitGroup{}
// Start workers
wg.Add(workers)
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for ms2 := range ch {
for j := range ms2 {
handle(&ms2[j])
}
}
}()
}
// Send jobs:
for idx := 0; idx < len(ms); {
idx2 := idx + count
if idx2 > len(ms) {
idx2 = len(ms)
}
ch <- ms[idx:idx2]
idx = idx2
}
// Jobs sent, close channel:
close(ch)
// Wait workers to finish processing all jobs:
wg.Wait()
}
请注意,没有 stopping
变量来表示完成。相反,我们在每个 goroutine 的 channel 上使用了 for range
,因为它在 channel 上进行范围直到 channel 关闭,并且它对于并发使用是安全的。一旦 channel 关闭并且 goroutines 处理了 channel 上发送的所有作业,它们就会终止,整个处理算法也会终止(而不是更早 - 这意味着所有作业都将被处理)。
关于go - 如何使并行泛型去处理?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41802835/