假设我有多个 future,并且需要等到它们中的任何一个失败或全部成功。
例如:假设有 3 个 future:f1
、f2
、f3
。
如果
f1
成功而f2
失败,我不会等待f3
(并返回失败给客户)。如果
f2
失败,而f1
和f3
仍在运行,我不会等待它们(并返回失败)如果
f1
成功,然后f2
成功,我将继续等待f3
。
你会如何实现它?
最佳答案
您可以使用 for 理解,如下所示:
val fut1 = Future{...}
val fut2 = Future{...}
val fut3 = Future{...}
val aggFut = for{
f1Result <- fut1
f2Result <- fut2
f3Result <- fut3
} yield (f1Result, f2Result, f3Result)
在此示例中, future 1、2 和 3 并行启动。然后,在 for 理解中,我们等待结果 1、然后 2、然后 3 可用。如果 1 或 2 失败,我们将不再等待 3。如果 3 个都成功,那么 aggFut val 将保存一个具有 3 个槽的元组,对应于 3 个 future 的结果。
现在,如果您需要在 fut2 首先失败时停止等待的行为,事情会变得有点棘手。在上面的示例中,您必须等待 fut1 完成才能意识到 fut2 失败。为了解决这个问题,你可以尝试这样的事情:
val fut1 = Future{Thread.sleep(3000);1}
val fut2 = Promise.failed(new RuntimeException("boo")).future
val fut3 = Future{Thread.sleep(1000);3}
def processFutures(futures:Map[Int,Future[Int]], values:List[Any], prom:Promise[List[Any]]):Future[List[Any]] = {
val fut = if (futures.size == 1) futures.head._2
else Future.firstCompletedOf(futures.values)
fut onComplete{
case Success(value) if (futures.size == 1)=>
prom.success(value :: values)
case Success(value) =>
processFutures(futures - value, value :: values, prom)
case Failure(ex) => prom.failure(ex)
}
prom.future
}
val aggFut = processFutures(Map(1 -> fut1, 2 -> fut2, 3 -> fut3), List(), Promise[List[Any]]())
aggFut onComplete{
case value => println(value)
}
现在这可以正常工作,但问题在于知道成功完成后要从 Map
中删除哪个 Future
。只要您有某种方法可以将结果与产生该结果的 future 正确关联起来,那么这样的事情就可以了。它只是递归地不断从 Map 中删除已完成的 Future,然后对剩余的 Future 调用 Future.firstCompletedOf 直到没有剩余,并一路收集结果。这并不漂亮,但如果您确实需要您正在谈论的行为,那么这个或类似的东西就可以工作。
关于scala - 如何等待多个 future ?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16256279/