multithreading - 你会如何定义一个 goroutines 池来一次执行?

标签 multithreading go goroutine

TL;DR:请转到最后一部分并告诉我您将如何解决此问题。
今天早上我开始使用来自 Python 的 Go。我想从 Go 调用一个闭源可执行文件多次,有一点并发,有不同的命令行参数。我生成的代码运行良好,但我想得到您的意见以改进它。由于我处于早期学习阶段,我还将解释我的工作流程。
为了简单起见,这里假设这个“外部闭源程序”是zenity ,一个 Linux 命令行工具,可以从命令行显示图形消息框。
从 Go 调用可执行文件
所以,在 Go 中,我会这样:

package main
import "os/exec"
func main() {
    cmd := exec.Command("zenity", "--info", "--text='Hello World'")
    cmd.Run()
}
这应该工作得恰到好处。请注意 .Run()是功能等价于 .Start()其次是 .Wait() .这很好,但是如果我只想执行一次这个程序,那么整个编程的东西就不值得了。所以让我们多次这样做。
多次调用可执行文件
现在我有了这个工作,我想多次调用我的程序,使用自定义命令行参数(为了简单起见,这里只是 i)。
package main    
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 // Number of times the external program is called
    for i:=0; i<NumEl; i++ {
        cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
        cmd.Run()
    }
}
好的,我们做到了!但是还是看不出Go over Python的优势……这段代码其实是串行执行的。我有一个多核 CPU,我想利用它。所以让我们用 goroutines 添加一些并发性。
Goroutines,或一种使我的程序并行的方法
a) 第一次尝试:只需在任何地方添加“go”
让我们重写我们的代码,使事情更容易调用和重用,并添加著名的 go关键词:
package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8 
    for i:=0; i<NumEl; i++ {
        go callProg(i)  // <--- There!
    }
}

func callProg(i int) {
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}
没有!问题是什么?所有的 goroutine 都被一次性执行。我真的不知道为什么不执行 zenity 但 AFAIK,Go 程序在 zenity 外部程序甚至可以初始化之前就退出了。使用 time.Sleep 证实了这一点。 :WAITING几秒钟就足以让 zenity 的 8 实例自行启动。我不知道这是否可以被视为一个错误。
更糟糕的是,我真正想要调用的真正程序需要一段时间才能执行。如果我在我的 4 核 CPU 上并行执行这个程序的 8 个实例,它会浪费一些时间做很多上下文切换……我不知道普通 Go 协程的行为,但 exec.Command 在 8 个不同的线程中启动 zenity 8 次。更糟糕的是,我想执行这个程序超过 100,000 次。在 goroutine 中一次完成所有这些工作根本没有效率。不过,我想利用我的 4 核 CPU!
b) 第二次尝试:使用 goroutines 池
网上资源倾向于推荐使用sync.WaitGroup对于这种工作。这种方法的问题在于,您基本上是在处理成批的 goroutine:如果我创建了 4 个成员的 WaitGroup,Go 程序将等待所有 4 个外部程序完成,然后再调用新的 4 个程序批次。这效率不高:CPU 又一次被浪费了。
其他一些资源建议使用缓冲 channel 来完成这项工作:
package main
import (
    "os/exec"
    "strconv"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    for i:=0; i<NumEl; i++ {
        go callProg(i, c)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
}

func callProg(i int, c chan bool) {
    defer func () {<- c}()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}
这看起来很丑。 channel 并非用于此目的:我正在利用副作用。我喜欢 defer 的概念但我讨厌必须声明一个函数(甚至是一个 lambda)来从我创建的虚拟 channel 中弹出一个值。哦,当然,使用虚拟 channel 本身就是丑陋的。
c) 第三次尝试:当所有 child 都死了就死
现在我们快完成了。我只需要考虑另一个副作用:Go 程序在所有 zenity 弹出窗口关闭之前关闭。这是因为当循环结束时(在第 8 次迭代时),没有什么可以阻止程序完成。这一次,sync.WaitGroup会很有用。
package main
import (
    "os/exec"
    "strconv"
    "sync"
)

func main() {
    NumEl := 8               // Number of times the external program is called
    NumCore := 4             // Number of available cores
    c := make(chan bool, NumCore - 1) 
    wg := new(sync.WaitGroup)
    wg.Add(NumEl)            // Set the number of goroutines to (0 + NumEl)
    for i:=0; i<NumEl; i++ {
        go callProg(i, c, wg)
        c <- true            // At the NumCoreth iteration, c is blocking   
    }
    wg.Wait() // Wait for all the children to die
    close(c)
}

func callProg(i int, c chan bool, wg *sync.WaitGroup) {
    defer func () {
        <- c
        wg.Done() // Decrease the number of alive goroutines
    }()
    cmd := exec.Command("zenity", "--info", "--text='Hello from iteration n." + strconv.Itoa(i) + "'")
    cmd.Run()
}
完毕。
我的问题
  • 你知道任何其他适当的方法来限制一次执行的 goroutines 的数量吗?

  • 我不是指线程; Go 如何在内部管理 goroutines 并不相关。我的意思是限制一次启动的 goroutine 的数量:exec.Command每次调用都会创建一个新线程,所以我应该控制它被调用的次数。
  • 你觉得这段代码好吗?
  • 您知道在这种情况下如何避免使用虚拟 channel 吗?

  • 我无法说服自己这样的虚拟 channel 是要走的路。

    最佳答案

    我会生成 4 个从公共(public) channel 读取任务的工作程序 goroutine。比其他 Goroutine 更快(因为它们的调度方式不同或碰巧获得简单任务)将从该 channel 接收到的任务比其他 Go 多。除此之外,我会使用 sync.WaitGroup等待所有 worker 完成。剩下的部分只是任务的创建。您可以在此处查看该方法的示例实现:

    package main
    
    import (
        "os/exec"
        "strconv"
        "sync"
    )
    
    func main() {
        tasks := make(chan *exec.Cmd, 64)
    
        // spawn four worker goroutines
        var wg sync.WaitGroup
        for i := 0; i < 4; i++ {
            wg.Add(1)
            go func() {
                for cmd := range tasks {
                    cmd.Run()
                }
                wg.Done()
            }()
        }
    
        // generate some tasks
        for i := 0; i < 10; i++ {
            tasks <- exec.Command("zenity", "--info", "--text='Hello from iteration n."+strconv.Itoa(i)+"'")
        }
        close(tasks)
    
        // wait for the workers to finish
        wg.Wait()
    }
    

    可能还有其他可能的方法,但我认为这是一个非常干净的解决方案,很容易理解。

    关于multithreading - 你会如何定义一个 goroutines 池来一次执行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/18405023/

    相关文章:

    go - 带有sync.waitGroup的Goroutine每次输出不同的值

    c# - 如何从另一个线程更新主线程中的文本框?

    c# - 等待一个线程在循环中存活是好习惯吗?

    c - 线程池加入函数

    performance - 使用 Docker 对简单的 Go 服务器造成巨大的性能影响

    go - 通过代理发出请求时为 "EOF"

    asynchronous - 戈朗 : Why does increasing the size of a buffered channel eliminate output from my goroutines?

    java - Reader Writers prob using Semaphores in Java-错误的线程名称,无法并发运行并显示并发

    go - 循环指针

    go - 菊花链输入,输出 channel 一起在golang中