scala - 如何等待多个 future ?

标签 scala concurrency future

假设我有多个 future,并且需要等到它们中的任何一个失败或全部成功。

例如:假设有 3 个 future:f1f2f3

  • 如果f1成功而f2失败,我不会等待f3(并返回失败给客户)。

  • 如果 f2 失败,而 f1f3 仍在运行,我不会等待它们(并返回失败)

  • 如果 f1 成功,然后 f2 成功,我将继续等待 f3

你会如何实现它?

最佳答案

您可以使用 for 理解,如下所示:

val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}

val aggFut = for{
  f1Result <- fut1
  f2Result <- fut2
  f3Result <- fut3
} yield (f1Result, f2Result, f3Result)

在此示例中, future 1、2 和 3 并行启动。然后,在 for 理解中,我们等待结果 1、然后 2、然后 3 可用。如果 1 或 2 失败,我们将不再等待 3。如果 3 个都成功,那么 aggFut val 将保存一个具有 3 个槽的元组,对应于 3 个 future 的结果。

现在,如果您需要在 fut2 首先失败时停止等待的行为,事情会变得有点棘手。在上面的示例中,您必须等待 fut1 完成才能意识到 fut2 失败。为了解决这个问题,你可以尝试这样的事情:

  val fut1 = Future{Thread.sleep(3000);1}
  val fut2 = Promise.failed(new RuntimeException("boo")).future
  val fut3 = Future{Thread.sleep(1000);3}

  def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
    val fut = if (futures.size == 1) futures.head._2
    else Future.firstCompletedOf(futures.values)

    fut onComplete{
      case Success(value) if (futures.size == 1)=> 
        prom.success(value :: values)

      case Success(value) =>
        processFutures(futures - value, value :: values, prom)

      case Failure(ex) => prom.failure(ex)
    }
    prom.future
  }

  val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
  aggFut onComplete{
    case value => println(value)
  }

现在这可以正常工作,但问题在于知道成功完成后要从 Map 中删除哪个 Future。只要您有某种方法可以将结果与产生该结果的 future 正确关联起来,那么这样的事情就可以了。它只是递归地不断从 Map 中删除已完成的 Future,然后对剩余的 Future 调用 Future.firstCompletedOf 直到没有剩余,并一路收集结果。这并不漂亮,但如果您确实需要您正在谈论的行为,那么这个或类似的东西就可以工作。

关于scala - 如何等待多个 future ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16256279/

相关文章:

java - 如何等待 future 列表得到满意的结果?

scala - 无法通过 sbt 正确启动 findbugs

scala - 如何更改 DataFrame 的模式(修复一些嵌套字段的名称)?

java - 使用 Spring 作为 play 2.4.x 的依赖注入(inject)框架?

multithreading - 如何将纤维传递给线程?

c# - 并发文件写入

java - 并行运行多个 HystrixCommand 的最佳方式

scala Spark - 基于可变日期匹配数据帧

php - PHP 与 MYSQL 的并发

java - 将调用方法的值返回给 future 对象