scala - Play Framework Scala : How to Stream Request Body

标签 scala playframework streaming

我正在使用Scala使用Play Framework 2.3.x构建微服务(我都是这两者的初学者),但我想不出一种方法来流式处理我的请求正文。

这是问题所在:

我需要一个端点/transform,可以在其中接收一个巨大的TSV文件,该文件将以另一种格式进行解析和呈现:简单转换。问题是 Controller 中的每个命令都“太迟了”。启动代码之前,它等待接收完整的文件。

例子:

  def transform = Action.async {
    Future {
      Logger.info("Too late")
      Ok("A response")
    }
  }

我希望能够在上载过程中逐行读取请求正文并已经处理该请求,而不必等待文件被完全接收。

任何提示都将受到欢迎。

最佳答案

此答案适用于Play 2.5.x及更高版本,因为它使用Akka流API取代了该版本中基于Play基于Iteratee的流。

基本上,您可以创建一个正文分析器,该分析器返回可以传递给Source[T]Ok.chunked(...)。一种方法是在主体解析器中使用Accumulator.source[T]。例如,一个刚刚返回原样发送给它的数据的 Action 可能看起来像这样:

def verbatimBodyParser: BodyParser[Source[ByteString, _]] = BodyParser { _ =>
  // Return the source directly. We need to return
  // an Accumulator[Either[Result, T]], so if we were
  // handling any errors we could map to something like
  // a Left(BadRequest("error")). Since we're not
  // we just wrap the source in a Right(...)
  Accumulator.source[ByteString]
    .map(Right.apply)
}

def stream = Action(verbatimBodyParser) { implicit request =>
  Ok.chunked(request.body)
}

如果您想要执行类似转换TSV文件的操作,则可以使用Flow转换源,例如:
val tsvToCsv: BodyParser[Source[ByteString, _]] = BodyParser { req =>

  val transformFlow: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString]
    // Chunk incoming bytes by newlines, truncating them if the lines
    // are longer than 1000 bytes...
    .via(Framing.delimiter(ByteString("\n"), 1000, allowTruncation = true))
    // Replace tabs by commas. This is just a silly example and
    // you could obviously do something more clever here...
    .map(s => ByteString(s.utf8String.split('\t').mkString(",") + "\n"))

  Accumulator.source[ByteString]
    .map(_.via(transformFlow))
    .map(Right.apply)
}

def convert = Action(tsvToCsv) { implicit request =>
  Ok.chunked(request.body).as("text/csv")
}

Play文档的Directing the Body Elsewhere部分可能会提供更多启发。

关于scala - Play Framework Scala : How to Stream Request Body,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38269853/

相关文章:

c# - 如何在 C#/.NET 中捕获二进制进程输出

scala - 我如何知道 Scala 中代码的运行时间?

Scala Stackable Trait 和 Self Type 不兼容类型

module - 如何安装 Play !框架模块?

playframework - 从 SBT 二进制分发版中排除 Elm 源文件

playframework - 从 Play 2.1 提供视频文件

python - 从任务中调用 Java/Scala 函数

scala - !不是 String 的成员

scala - 玩!框架数据库脚本应用程序错误

ffmpeg - 如何在不消耗所有 CPU 的情况下从多台摄像机录制