我正在尝试将基于 akka 流的流集成到我的 Play 2.5 应用程序中。这个想法是,您可以流式传输照片,然后将其作为原始文件、缩略图版本和水印版本写入磁盘。
我设法使用这样的图表来完成这项工作:
val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray})
.map(_.result().toArray)
def toByteArray = Flow[ByteString].map(b => b.toArray)
val graph = Flow.fromGraph(GraphDSL.create() {implicit builder =>
import GraphDSL.Implicits._
val streamFan = builder.add(Broadcast[ByteString](3))
val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
val output = builder.add(Flow[ByteString].map(x => Success(Done)))
val rawFileSink = FileIO.toFile(file)
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))
streamFan.out(0) ~> rawFileSink
streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
streamFan.out(2) ~> output.in
byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink
byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink
FlowShape(streamFan.in, output.out)
})
graph
}
然后我使用这样的累加器将它连接到我的播放 Controller :
val sink = Sink.head[Try[Done]]
val photoStorageParser = BodyParser { req =>
Accumulator(sink).through(graph).map(Right.apply)
}
问题是我的两个处理过的文件接收器没有完成,两个处理过的文件的大小都为零,但原始文件的大小没有。我的理论是累加器只等待我的扇出的输出之一,所以当输入流完成并且我的 byteAccumulator 吐出完整的文件时,在处理完成时播放已经从输出中获得了物化值.
所以,我的问题是:
就我的方法而言,我是否在正确的轨道上?
运行这样的图形的预期行为是什么?
我怎样才能把所有的水槽放在一起形成一个最终的水槽?
最佳答案
好的,经过一点帮助(安德烈亚斯走在正确的轨道上),我找到了这个解决方案,它可以解决问题:
val rawFileSink = FileIO.toFile(file)
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))
val graph = Sink.fromGraph(GraphDSL.create(rawFileSink, thumbnailFileSink, watermarkedFileSink)((_, _, _)) {
implicit builder => (rawSink, thumbSink, waterSink) => {
val streamFan = builder.add(Broadcast[ByteString](2))
val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
streamFan.out(0) ~> rawSink
streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
byteArrayFan.out(0) ~> processorFlow(Thumbnail) ~> thumbSink
byteArrayFan.out(1) ~> processorFlow(Watermarked) ~> waterSink
SinkShape(streamFan.in)
}
})
graph.mapMaterializedValue[Future[Try[Done]]](fs => Future.sequence(Seq(fs._1, fs._2, fs._3)).map(f => Success(Done)))
之后很容易从 Play 调用它:
val photoStorageParser = BodyParser { req =>
Accumulator(theSink).map(Right.apply)
}
def createImage(path: String) = Action(photoStorageParser) { req =>
Created
}
关于scala - 如何从多个文件写入组装 Akka Streams 接收器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37132272/