我正在使用Scala使用Play Framework 2.3.x构建微服务(我都是这两者的初学者),但我想不出一种方法来流式处理我的请求正文。
这是问题所在:
我需要一个端点/transform
,可以在其中接收一个巨大的TSV文件,该文件将以另一种格式进行解析和呈现:简单转换。问题是 Controller 中的每个命令都“太迟了”。启动代码之前,它等待接收完整的文件。
例子:
def transform = Action.async {
Future {
Logger.info("Too late")
Ok("A response")
}
}
我希望能够在上载过程中逐行读取请求正文并已经处理该请求,而不必等待文件被完全接收。
任何提示都将受到欢迎。
最佳答案
此答案适用于Play 2.5.x及更高版本,因为它使用Akka流API取代了该版本中基于Play基于Iteratee的流。
基本上,您可以创建一个正文分析器,该分析器返回可以传递给Source[T]
的Ok.chunked(...)
。一种方法是在主体解析器中使用Accumulator.source[T]
。例如,一个刚刚返回原样发送给它的数据的 Action 可能看起来像这样:
def verbatimBodyParser: BodyParser[Source[ByteString, _]] = BodyParser { _ =>
// Return the source directly. We need to return
// an Accumulator[Either[Result, T]], so if we were
// handling any errors we could map to something like
// a Left(BadRequest("error")). Since we're not
// we just wrap the source in a Right(...)
Accumulator.source[ByteString]
.map(Right.apply)
}
def stream = Action(verbatimBodyParser) { implicit request =>
Ok.chunked(request.body)
}
如果您想要执行类似转换TSV文件的操作,则可以使用
Flow
转换源,例如:val tsvToCsv: BodyParser[Source[ByteString, _]] = BodyParser { req =>
val transformFlow: Flow[ByteString, ByteString, NotUsed] = Flow[ByteString]
// Chunk incoming bytes by newlines, truncating them if the lines
// are longer than 1000 bytes...
.via(Framing.delimiter(ByteString("\n"), 1000, allowTruncation = true))
// Replace tabs by commas. This is just a silly example and
// you could obviously do something more clever here...
.map(s => ByteString(s.utf8String.split('\t').mkString(",") + "\n"))
Accumulator.source[ByteString]
.map(_.via(transformFlow))
.map(Right.apply)
}
def convert = Action(tsvToCsv) { implicit request =>
Ok.chunked(request.body).as("text/csv")
}
Play文档的Directing the Body Elsewhere部分可能会提供更多启发。
关于scala - Play Framework Scala : How to Stream Request Body,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38269853/