go - 如何停止被进程启动的外部 I/O 阻塞的 goroutine?

标签 go pipe exec goroutine

我遇到了一个问题,我无法安全退出 goroutine。

我正在使用 exec.Command 创建一个外部进程(存储进程的 cmd、stdin 管道和 stdout 管道):

exec.Command(args[0], args[1]...) // args[0] is a base command

每当需要启动该流程时,我都会调用:

cmd.Start()

然后在启动并附加时我正在运行 2 个 goroutine:

shutdown := make(chan struct{})
// Run the routine which will read from process and send the data to CmdIn channel
go pr.cmdInRoutine()
// Run the routine which will read from CmdOut and write to process
go pr.cmdOutRoutine()

cmdIn例程:

func (pr *ExternalProcess) cmdInRoutine() {
    app.At(te, "cmdInRoutine")

    for {
        println("CMDINROUTINE")
        select {
        case <-pr.shutdown:
            println("!!! Shutting cmdInRoutine down !!!")
            return
        default:
            println("Inside the for loop of the CmdInRoutine")
            if pr.stdOutPipe == nil {
                println("!!! Standard output pipe is nil. Sending Exit Request !!!")
                pr.ProcessExit <- true
                close(pr.shutdown)
                return
            }

            buf := make([]byte, 2048)

            size, err := pr.stdOutPipe.Read(buf)
            if err != nil {
                println("!!! Sending exit request from cmdInRoutine !!!")
                pr.ProcessExit <- true
                close(pr.shutdown)
                return
            }

            println("--- Received data for sending to CmdIn:", string(buf[:size]))
            pr.CmdIn <- buf[:size]
        }

    }
}

cmdOut例程:

func (pr *ExternalProcess) cmdOutRoutine() {
    app.At(te, "cmdOutRoutine")

    for {
        select {
        case <-pr.shutdown:
            println("!!! Shutting cmdOutRoutine down !!!")
            return
        case data := <-pr.CmdOut:
            println("Received data for sending to Process: ", data)
            if pr.stdInPipe == nil {
                println("!!! Standard input pipe is nil. Sending Exit Request !!!")
                pr.ProcessExit <- true
                return
            }

            println("--- Received input to write to external process:", string(data))
            _, err := pr.stdInPipe.Write(append(data, '\n'))
            if err != nil {
                println("!!! Couldn't Write To the std in pipe of the process !!!")
                pr.ProcessExit <- true
                return
            }
        }
    }
}

这里有趣的案例:

1) 当进程发送 EOF 时(不要介意 pr.ProcessExit <- true 我正在使用 channel 通知父处理程序来停止并退出进程)在 cmdInRoutine 中 我也关闭了关闭 channel ,这让 cmdOutRoutine 退出,因为在 select 语句内没有默认情况,因此它会阻塞并等待退出或写入数据使用存储的stdInPipe运行进程。

2)当我想停止 goroutine 但让进程保持运行时,即暂停读写,我将关闭关闭 channel ,希望这 2 个 goroutine 能够结束。
- cmdOutRoutine 打印 !!!关闭 cmdOutRoutine!!! 因为 select 没有默认情况并且关闭关闭 channel 会导致几乎立即返回
- cmdOutRoutine 不打印任何内容,我有一种奇怪的感觉,它甚至没有返回,我认为是因为在从 stdInPipe 读取时在默认情况下被阻止.

我正在考虑在 for 循环之前在 cmdOutRoutine 内运行另一个 goroutine,并将进程的 stdIn 数据转换为 channel ,然后我就能够消除默认值cmdInRoutine 中的情况,但这会产生另一个问题,新的 goroutine 也必须停止,它仍然会被正在运行的进程的 stdIn 的读取所阻止。

有什么想法可以解决这个问题(修改逻辑)以满足随时关闭和启动 goroutine (进程 I/O)而不是运行进程本身的需求吗?或者有没有一种方法可以完全避免阻止读取和写入的调用,而我还不知道?

非常感谢。

最佳答案

它可能在 pr.stdOutPipe.Read(buf) 处被阻止。您可以尝试关闭 pr.stdOutPipe ,这应该会中断读取。

您还可以关闭 pr.stdInPipe ,以确保写入不会阻塞。

编辑:这不会允许您重新连接,但没有其他方法可以中断该读取。最好保持这两个 goroutine 在整个进程中运行,并在堆栈中的其他位置暂停(例如,如果您不想在暂停状态下接收命令的输出,请不要将 buf 写入 pr.CmdIn - 但可以小心避免竞争条件)。

在当前版本的 go 中,Close 可能存在错误:issue 6817

编辑结束

另外,请小心 pr.CmdIn 。如果关闭 stdOutPipe 不会导致 Read 返回错误,则 cmdInRoutine 将尝试写入 channel 。如果没有从中读取任何内容,cmdInRoutine 将永远阻塞。我会将 pr.stdOutPipe.Read(buf) 从选择中移出,然后将 pr.CmdIn <- buf[:size] 作为另一种情况添加到选择中:

func (pr *ExternalProcess) cmdInRoutine() {
    app.At(te, "cmdInRoutine")

    // this check should probably only happen once.
    // if stdOutPipe can change during the loop,
    // then that's a race condition.
    if pr.stdOutPipe == nil {
        println("!!! Standard output pipe is nil. Sending Exit Request !!!")
        pr.ProcessExit <- true
        close(pr.shutdown)
        return
    }

    for {
        println("CMDINROUTINE")
        // we need to allocate a new buffer in each iteration,
        // because when we pass it through the channel,
        // we can no longer safely overwrite the data in it,
        // since the other goroutine might still be using it.
        buf := make([]byte, 2048)
        size, err := pr.stdOutPipe.Read(buf)
        if err != nil {
            println("!!! Sending exit request from cmdInRoutine !!!")
            // Be careful with this, if you also closed pr.shutdown when you closed stdOutPipe, then this is going to panic (closing a closed channel).
            pr.ProcessExit <- true
            close(pr.shutdown)
            return
        }

        // now that we have some data, try to send it,
        // unless we're done.
        select {
        case <-pr.shutdown:
            println("!!! Shutting cmdInRoutine down !!!")
            return
        case pr.CmdIn <- buf[:size]:
            println("--- Received data for sending to CmdIn:", string(buf[:size]))
        }
    }
}

关于go - 如何停止被进程启动的外部 I/O 阻塞的 goroutine?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43436751/

相关文章:

java - 是什么导致 Runtime.exec 出现 "system cannot find the file specified"? (关联/ftype)

c - pipeline() 将结构从子级发送到父级。 read() 中的象形文字;

go - 模拟外部库以进行单元测试

go - kubernetes Controller 的单元测试

C# 控制台通过管道接收输入

带管道的 c 程序执行 "ps aux | grep firefox | tee processes.txt"

io - 转到 channel 和 I/O

json - 是否可以有一个包含多个 JSON 标签的结构?

c - fork() 和 pipe() 的小问题

c++ - 使用 exec 在新进程中执行系统命令