scala - 尽管失败,如何进行执行Future序列?

标签 scala concurrency future

traverse对象中的Future方法在第一次失败时停止。我想要此方法的容忍/宽恕版本,该版本在发生错误时会与其余序列一起进行。

当前,我们已经在utils中添加了以下方法:

def traverseFilteringErrors[A, B <: AnyRef]
                           (seq: Seq[A])
                           (f: A => Future[B]): Future[Seq[B]] = {
  val sentinelValue = null.asInstanceOf[B]
  val allResults = Future.traverse(seq) { x =>
    f(x) recover { case _ => sentinelValue }
  }
  val successfulResults = allResults map { result =>
    result.filterNot(_ == sentinelValue)
  }
  successfulResults
}

有一个更好的方法吗?

最佳答案

真正有用的东西(通常来说)将能够将 future 的错误提升为适当的值(value)。换句话说,将Future[T]转换为Future[Try[T]](成功的返回值变为Success[T],而失败情况变为Failure[T])。这是我们可能的实现方式:

// Can also be done more concisely (but less efficiently) as:
// f.map(Success(_)).recover{ case t: Throwable => Failure( t ) }
// NOTE: you might also want to move this into an enrichment class
def mapValue[T]( f: Future[T] ): Future[Try[T]] = {
  val prom = Promise[Try[T]]()
  f onComplete prom.success
  prom.future
}

现在,如果您执行以下操作:
Future.traverse(seq)( f andThen mapValue )

您将获得成功的Future[Seq[Try[A]]],其最终值包含每个成功的将来的Success实例和每个失败的将来的Failure实例。
如果需要,您可以在此seq上使用collect删除Failure实例并仅保留成功的值。

换句话说,您可以按如下方式重写您的辅助方法:
def traverseFilteringErrors[A, B](seq: Seq[A])(f: A => Future[B]): Future[Seq[B]] = {
  Future.traverse( seq )( f andThen mapValue ) map ( _ collect{ case Success( x ) => x } )
}

关于scala - 尽管失败,如何进行执行Future序列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15775824/

相关文章:

java - 为什么 Android 的 AsyncTask 没有实现 Future?

斯卡拉这是什么意思? "{/* compiled code */}"

scala - 如何在 Spark Scala 中使用 mapPartitions?

mysql - 定时Mysql查询akka http

scala - Spark:RDD到列表

ruby - .join 在 Ruby 线程中有什么意义?

c# - 是什么导致了这种并行化失败?

c++ - 使用 promise 数组的段错误

java - ConcurrentModificationException 在 Java 中递归使用 Maps

java - 等待多个 future 的回调