multithreading - Go 是否可以在不为每个外部进程启动一个 OS 线程的情况下生成并与外部进程通信?

标签 multithreading unix go concurrency elixir

短版:

Golang 是否可以并行生成多个外部进程(shell 命令),这样它就不会为每个外部进程启动一个操作系统线程。 .. 完成后仍然能够接收到它的输出吗?

加长版:

在 Elixir 中,如果您使用端口,则可以生成数千个外部进程,而不会真正增加 Erlang 虚拟机中的线程数。

例如以下代码片段启动了 2500 个外部 sleep 进程,仅由 Erlang VM 下的 20 个 操作系统线程 管理:

defmodule Exmultiproc do
  for _ <- 1..2500 do
    cmd = "sleep 3600"
    IO.puts "Starting another process ..."
    Port.open({:spawn, cmd}, [:exit_status, :stderr_to_stdout])
  end
  System.cmd("sleep", ["3600"])
end

(前提是你将 ulimit -n 设置为较大的数字,例如 10000)

另一方面,Go 中的以下代码应该做同样的事情 - 启动 2500 个外部 sleep 进程 - 也启动 2500 个 操作系统线程 .因此,它显然会为每个(阻塞?)系统调用启动一个操作系统线程(以免阻塞整个 CPU 或类似的,如果我理解正确的话):

package main

import (
    "fmt"
    "os/exec"
    "sync"
)

func main() {
    wg := new(sync.WaitGroup)
    for i := 0; i < 2500; i++ {
        wg.Add(1)
        go func(i int) {
            fmt.Println("Starting sleep ", i, "...")
            cmd := exec.Command("sleep", "3600")
            _, err := cmd.Output()
            if err != nil {
                panic(err)
            }
            fmt.Println("Finishing sleep ", i, "...")
            wg.Done()
        }(i)
    }
    fmt.Println("Waiting for WaitGroup ...")
    wg.Wait()
    fmt.Println("WaitGroup finished!")
}

因此,我想知道是否有一种方法可以编写 Go 代码,使其执行与 Elixir 代码类似的事情,而不是为每个外部进程打开一个操作系统线程

我基本上是在寻找一种方法来管理至少几千个外部长期运行(最多 10 天)的进程,以尽可能减少操作系统中任何虚拟或物理限制的问题.

(对于代码中的任何错误,我深表歉意,因为我是 Elixir 和 Go 的新手。我很想知道我正在做的任何错误。)

编辑:澄清了并行运行长时间运行的进程的要求。

最佳答案

我发现如果我们不wait进程,Go运行时不会启动2500个操作系统线程。所以请使用 cmd.Start() 而不是 cmd.Output()。

但是,如果不使用 golang os 包消耗操作系统线程,似乎不可能读取进程的 stdout。我认为是因为 os 包没有使用非阻塞 io 来读取管道。

下面的程序在我的 Linux 上运行良好,虽然它像 @JimB 在评论中所说的那样阻塞了进程的标准输出,可能是因为我们的输出很小并且它适合系统缓冲区。

func main() {
    concurrentProcessCount := 50
    wtChan := make(chan *result, concurrentProcessCount)
    for i := 0; i < concurrentProcessCount; i++ {
        go func(i int) {
            fmt.Println("Starting process ", i, "...")
            cmd := exec.Command("bash", "-c", "for i in 1 2 3 4 5; do echo to sleep $i seconds;sleep $i;echo done;done;")
            outPipe,_ := cmd.StdoutPipe()
            err := cmd.Start()
            if err != nil {
                panic(err)
            }
            <-time.Tick(time.Second)
            fmt.Println("Finishing process ", i, "...")
            wtChan <- &result{cmd.Process, outPipe}
        }(i)
    }

    fmt.Println("root:",os.Getpid());

    waitDone := 0
    forLoop:
    for{
        select{
        case r:=<-wtChan:
            r.p.Wait()
            waitDone++
            output := &bytes.Buffer{}
            io.Copy(output, r.b)
            fmt.Println(waitDone, output.String())
            if waitDone == concurrentProcessCount{
                break forLoop
            }
        }
    }
}

关于multithreading - Go 是否可以在不为每个外部进程启动一个 OS 线程的情况下生成并与外部进程通信?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33948726/

相关文章:

java - ConcurrentLinkedDeque 可以有固定大小并覆盖旧元素吗?

java - 创建一个发生在与 AtomicBoolean 的关系之前

c - 等待 execlp

xml - 将 innerxml 解码为字符串指针

for-loop - GO - for 循环中的子例程行为

multithreading - JavaFX任务一旦取消就不会重新运行,也不会一次完成

ios - 如何检查dispatch_async是否被提前调用以进行相同的操作

unix - crontab 设置 18 :00 to 09:00

C/UNIX 每 x 毫秒执行一次函数

loops - 遍历 map 的所有键