scala - 如何从多个文件写入组装 Akka Streams 接收器?

标签 scala akka akka-stream playframework-2.5

我正在尝试将基于 akka 流的流集成到我的 Play 2.5 应用程序中。这个想法是,您可以流式传输照片,然后将其作为原始文件、缩略图版本和水印版本写入磁盘。

我设法使用这样的图表来完成这项工作:

val byteAccumulator = Flow[ByteString].fold(new ByteStringBuilder())((builder, b) => {builder ++= b.toArray})
                                    .map(_.result().toArray)

def toByteArray = Flow[ByteString].map(b => b.toArray)

val graph = Flow.fromGraph(GraphDSL.create() {implicit builder =>
  import GraphDSL.Implicits._
  val streamFan = builder.add(Broadcast[ByteString](3))
  val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))
  val output = builder.add(Flow[ByteString].map(x => Success(Done)))

  val rawFileSink = FileIO.toFile(file)
  val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
  val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))

  streamFan.out(0) ~> rawFileSink
  streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in
  streamFan.out(2) ~> output.in

  byteArrayFan.out(0) ~> slowThumbnailProcessing ~> thumbnailFileSink
  byteArrayFan.out(1) ~> slowWatermarkProcessing ~> watermarkedFileSink

  FlowShape(streamFan.in, output.out)
})

graph

}

然后我使用这样的累加器将它连接到我的播放 Controller :
val sink = Sink.head[Try[Done]]

val photoStorageParser = BodyParser { req =>
     Accumulator(sink).through(graph).map(Right.apply)
}

问题是我的两个处理过的文件接收器没有完成,两个处理过的文件的大小都为零,但原始文件的大小没有。我的理论是累加器只等待我的扇出的输出之一,所以当输入流完成并且我的 byteAccumulator 吐出完整的文件时,在处理完成时播放已经从输出中获得了物化值.

所以,我的问题是:
就我的方法而言,我是否在正确的轨道上?
运行这样的图形的预期行为是什么?
我怎样才能把所有的水槽放在一起形成一个最终的水槽?

最佳答案

好的,经过一点帮助(安德烈亚斯走在正确的轨道上),我找到了这个解决方案,它可以解决问题:

val rawFileSink = FileIO.toFile(file)
val thumbnailFileSink = FileIO.toFile(getFile(path, Thumbnail))
val watermarkedFileSink = FileIO.toFile(getFile(path, Watermarked))

val graph = Sink.fromGraph(GraphDSL.create(rawFileSink, thumbnailFileSink, watermarkedFileSink)((_, _, _)) {
  implicit builder => (rawSink, thumbSink, waterSink) => {
    val streamFan = builder.add(Broadcast[ByteString](2))
    val byteArrayFan = builder.add(Broadcast[Array[Byte]](2))

    streamFan.out(0) ~> rawSink
    streamFan.out(1) ~> byteAccumulator ~> byteArrayFan.in

    byteArrayFan.out(0) ~> processorFlow(Thumbnail) ~> thumbSink
    byteArrayFan.out(1) ~> processorFlow(Watermarked) ~> waterSink

    SinkShape(streamFan.in)
  }
})

graph.mapMaterializedValue[Future[Try[Done]]](fs => Future.sequence(Seq(fs._1, fs._2, fs._3)).map(f => Success(Done)))

之后很容易从 Play 调用它:
val photoStorageParser = BodyParser { req =>
  Accumulator(theSink).map(Right.apply)
}

def createImage(path: String) = Action(photoStorageParser) { req =>
  Created
}

关于scala - 如何从多个文件写入组装 Akka Streams 接收器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37132272/

相关文章:

scala - Akka Siblings 递归 ActorRef 要求

java - akka camel 将案例类作为消息发送

Akka Streams `groupBy` SubFlow 完成时的容量变化?

Java/Scala 双向 MD5

scala - 如何在 Spark 数据框中(均等)分区数组数据

android - Akka on Android 多重引用.conf

scala - 如何在 Akka actor 中等待文件上传流完成

scala - mapMaterializedValue 对 Source.actorRef 不执行任何操作

arrays - Scala 中的二维数组乘法

eclipse(设置scala环境): object apache is not a member of package org