scala - 将输入流式传输到 Scala 中的外部进程

标签 scala streaming

我有一个 Iterable[String],我想将其流式传输到外部进程并为输出返回一个 Iterable[String]。

我觉得这在编译时应该可以工作

import scala.sys.process._

object PipeUtils {
  implicit class IteratorStream(s: TraversableOnce[String]) {
    def pipe(cmd: String) = s.toStream.#>(cmd).lines
    def run(cmd: String) = s.toStream.#>(cmd).!
  }
}

但是,Scala 尝试执行 s 的内容而不是将它们传递给标准 in。谁能告诉我我做错了什么?

更新:

我认为我最初的问题是 s.toStream 被隐式转换为 ProcessBuilder 然后执行。这是不正确的,因为它是流程的输入。

我想出了以下解决方案。这感觉很老套而且是错误的,但它现在似乎有效。我不是把它写成一个答案,因为我觉得答案应该是一行而不是这个巨大的东西。

object PipeUtils {

  /**
   * This class feels wrong.  I think that for the pipe command it actually loads all of the output
   * into memory.  This could blow up the machine if used wrong, however, I cannot figure out how to get it to
   * work properly.  Hopefully http://stackoverflow.com/questions/28095469/stream-input-to-external-process-in-scala
   * will get some good responses.
   * @param s
   */
  implicit class IteratorStream(s: TraversableOnce[String]) {

    val in = (in: OutputStream) => {
      s.foreach(x => in.write((x + "\n").getBytes))
      in.close
    }

    def pipe(cmd: String) = {
      val output = ListBuffer[String]()
      val io = new ProcessIO(in,
      out => {Source.fromInputStream(out).getLines.foreach(output += _)},
      err => {Source.fromInputStream(err).getLines.foreach(println)})

      cmd.run(io).exitValue
      output.toIterable
    }

    def run(cmd: String) = {
      cmd.run(BasicIO.standard(in)).exitValue
    }
  }
}

编辑

这样做的动机来自于在 RDD 上使用 Spark 的 .pipe 函数。我想在我的本地代码中使用完全相同的功能。

最佳答案

假设 scala 2.11+,您应该按照@edi 的建议使用 lineStream。原因是当它可用时您会得到一个流式响应,而不是批处理响应。假设我有一个 shell 脚本 echo-sleep.sh:

#/usr/bin/env bash
# echo-sleep.sh
while read line; do echo $line; sleep 1; done

我们想使用如下代码从 scala 中调用它:

import scala.sys.process._
import scala.language.postfixOps
import java.io.ByteArrayInputStream

implicit class X(in: TraversableOnce[String]) {
  // Don't do the BAOS construction in real code.  Just for illustration.
  def pipe(cmd: String) = 
    cmd #< new ByteArrayInputStream(in.mkString("\n").getBytes) lineStream
}

然后如果我们像这样进行最后的调用:

1 to 10 map (_.toString) pipe "echo-sleep.sh" foreach println

序列中的数字每 1 秒出现在 STDOUT 上。如果像示例中那样缓冲并转换为 Iterable,您将失去这种响应能力。

关于scala - 将输入流式传输到 Scala 中的外部进程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28095469/

相关文章:

python - 如何在scala中实现Python的norm.expect

java - 用于匹配不同浮点格式的正则表达式

scala - 所有这些“要么”的问题是怎么回事?

scala - 案例类 .copy() 和大对象

scala - 在 Scala 中,如何解决 TraversableLike.toIterator 效率低下的中间流

reactjs - Shaka 播放器无法在谷歌浏览器上加载 HLS

WMA 音频文件的 iPhone 广播流

audio - 组播如何工作?

ffmpeg IP 摄像机无法录制

networking - 音乐流使用哪种协议(protocol)?