go - 从管道读取的非阻塞方式

标签 go

我想创建一个简单的应用程序,它将连续读取一个应用程序的输出,对其进行处理并将处理后的输出写入标准输出。这个应用程序可以在一秒钟内产生大量数据,接下来会静默几分钟。

问题是我的数据处理算法很慢,所以主循环被阻塞了。当循环被阻塞时,我正在丢失此时到来的数据。

    cmd := exec.Command("someapp")
    stdoutPipe, _ := cmd.StdoutPipe()
    stdoutReader := bufio.NewReader(stdoutPipe)

    go func() {
        bufioReader := bufio.NewReader(stdoutReader)
        for {
            output, _, err := bufioReader.ReadLine()
            if err != nil || err == io.EOF {
                break
            }

            processedOutput := dataProcessor(output);

            fmt.Print(processedOutput)
        }
    }()

可能解决这个问题的最好方法是缓冲所有输出并在另一个 Goroutine 中处理它,但我不确定如何在 Golang 中实现它。解决这个问题最惯用的方法是什么?

最佳答案

您可以有两个 goroutine,一个是 vendor ,另一个是消费者。 vendor 执行命令并通过 channel 将数据传递给消费者。

    cmd := exec.Command("someapp")
    stdoutPipe, _ := cmd.StdoutPipe()
    stdoutReader := bufio.NewReader(stdoutPipe)
    Datas := make(chan Data, 100)
    go dataProcessor(Datas)
        bufioReader := bufio.NewReader(stdoutReader)
        for {
            output, _, err := bufioReader.ReadLine()
            var tempData Data
            tempData.Out = output
            if err != nil || err == io.EOF {
                break
            }
            Datas <- tempData
            }
    }

然后您将在 dataProcessor 函数中处理数据:

func dataProcessor(Datas <-chan Data)  {
    for  {
        select {
        case data := <-Datas:
        fmt.Println(data)
        default:
            continue
        }
    }
}

显然这是一个非常简单的示例,您应该对其进行自定义并使其变得更好。搜索 chanle 和 goroutins。读书this教程可能会有所帮助。

关于go - 从管道读取的非阻塞方式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56507564/

相关文章:

go - 如何在 Go 中自动生成 Avro 模式?

go - 打印机接收器程序中的 "all goroutines are asleep - deadlock! Exit status 2"错误

go - 如何在 Windows 上编译 Go 包?

go - 是否有可能知道在 HandleFunc 中使用了什么路由

mysql - 如何使用mysql通过选择查询获取行数

Mongodb不会使用游标检索具有200万条记录的集合中的所有文档

string - 从阅读器读取直到到达一个字符串

go - golang channel 中的函数调用

mysql - beego:如何创建一个上传表单来存储文件到MySQL数据库?

go - 如何在 Goji (Golang) 中使用不同的中间件创建单独的路由组?