scala - 在Akka流中,如何从 future 集合中创建无序来源

标签 scala future akka-stream reactive-streams

我需要从akka.stream.scaladsl.Source[T, Unit]的集合中创建一个Future[T]

例如,拥有一组返回整数的 future ,

val f1: Future[Int] = ???
val f2: Future[Int] = ???
val fN: Future[Int] = ???
val futures = List(f1, f2, fN)

如何创建一个
val source: Source[Int, Unit] = ???

从中。

我不能使用Future.sequence组合器,因为那样我将等待每个将来完成,然后再从源代码中获取任何东西。我希望在将来完成时能以任何顺序获得结果。

我知道Source是纯粹的功能性API,在实现它之前,不应运行任何东西。因此,我的想法是使用Iterator(这是惰性的)来创建源:
Source { () =>
  new Iterator[Future[Int]] {
    override def hasNext: Boolean = ???
    override def next(): Future[Int] = ???
  }
}

但这将是 future 的来源,而不是实际值(value)。我也可以使用next阻止Await.result(future),但是我不确定哪个踏板池的线程将被阻止。同样,这将顺序调用Future,而我需要并行执行。

更新2 :事实证明,有一种更简单的方法(感谢Viktor Klang):
Source(futures).mapAsync(1)(identity)

更新:这是基于@sschaef答案的内容:
def futuresToSource[T](futures: Iterable[Future[T]])(implicit ec: ExecutionContext): Source[T, Unit] = {
  def run(actor: ActorRef): Unit = {
    futures.foreach { future =>
      future.onComplete {
        case Success(value) =>
          actor ! value
        case Failure(NonFatal(t)) =>
          actor ! Status.Failure(t) // to signal error
      }
    }

    Future.sequence(futures).onSuccess { case _ =>
      actor ! Status.Success(()) // to signal stream's end
    }
  }

  Source.actorRef[T](futures.size, OverflowStrategy.fail).mapMaterializedValue(run)
}

// ScalaTest tests follow

import scala.concurrent.ExecutionContext.Implicits.global

implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()

"futuresToSource" should "convert futures collection to akka-stream source" in {
  val f1 = Future(1)
  val f2 = Future(2)
  val f3 = Future(3)

  whenReady {
    futuresToSource(List(f1, f2, f3)).runFold(Seq.empty[Int])(_ :+ _)
  } { results =>
    results should contain theSameElementsAs Seq(1, 2, 3)
  }
}

it should "fail on future failure" in {
  val f1 = Future(1)
  val f2 = Future(2)
  val f3 = Future.failed(new RuntimeException("future failed"))

  whenReady {
    futuresToSource(List(f1, f2, f3)).runWith(Sink.ignore).failed
  } { t =>
    t shouldBe a [RuntimeException]
    t should have message "future failed"
  }
}

最佳答案

创建 future 的来源,然后通过mapAsync将其“展平”:

scala> Source(List(f1,f2,fN)).mapAsync(1)(identity)
res0: akka.stream.scaladsl.Source[Int,Unit] = akka.stream.scaladsl.Source@3e10d804

关于scala - 在Akka流中,如何从 future 集合中创建无序来源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32424172/

相关文章:

scala - 如何设置 Play! 的 POST header 2.0 web服务查询?

scala - 我怎样才能有一个可选的 Sbt 设置?

scala - 为什么我的简单 Scala 对象在包含 future 时会挂起一分钟左右

scala - Akka Streams 对一组键进行过滤和分组

scala - Akka-http 流使用 Slick 3.0 Databasepublisher

java - 绕过类型删除 : problem with trait type!

scala - 删除 GraphX 中没有出边的顶点

java - 在没有多线程的情况下使用 Future 有什么意义?

Java ExecutorService - 监控任务完成/状态栏

scala - 如何从 Source[A] 创建 Akka Stream Source[Seq[A]]