scala - 在并行计算中正确使用 future

标签 scala

我正在实现一种可以轻松并行化的算法,但无法弄清楚如何创建适当数量的 future 以及如何提前中止。目前代码的大纲是沿着这些线

def solve: Boolean = {
  var result = false
  while(!result && i < iterations) {
    val futures = (1 to threads) map { _ => solveIter(geInitialValues()) }
    val loopResult = Future.fold(futures)(false)((acc, r) => acc || r )
    result = Await.result(loopResult, Duration.Inf)
    i+=1
  }
}

def solveIter(initialValues: Values): Future[Boolean] = Future {
  /* Takes a lot of time */
}

明显的问题是明确设置的并行级别可能适合也可能不适合当前执行上下文。如果一次创建所有 future 如何制作Future.fold早点流产?

最佳答案

您不能取消 Future,因为 Futures 是只读的。但是您可以使用 Promise,它是“来自 future 的写入部分”。

示例代码:

  • 此代码在 5 秒后超时,因为 future 未完成(solveIter 永远不会完成)
  • 要完成 promise ,请删除在“ promise ”中添加已完成 promise 的评论 - 其他 future 将被取消
  • 删除 'promises.foreach(_.trySuccess(false))' 并再次超时,因为其他 future 没有取消

  • import scala.concurrent.ExecutionContext.Implicits.global
    import scala.concurrent.duration._
    import scala.concurrent.{Await, Future, Promise}
    import scala.util.Try
    
    
    // create a bunch of promises
    val promises = ((1 to 10) map { _ =>
      val p = Promise[Boolean]()
      p.completeWith(solveIter())
      p
    }) // :+ Promise().success(true)
    // ^^ REMOVE THIS COMMENT TO ADD A PROMISE WHICH COMPLETES
    
    // get the futures from the promises
    val futures = promises.map(_.future)
    
    // loop over all futures
    futures.foreach(oneFuture =>
      // register callback when future is done
      oneFuture.foreach{
        case true =>
          println("future with 'true' result found")
    
          // stop others
          promises.foreach(_.trySuccess(false))
    
        case _ => // future completes with false
      })
    
    
    
    // wait at most 5 seconds till all futures are done
    Try(Await.ready(Future.sequence(futures), 5.seconds)).recover { case _ =>
      println("TIMEOUT")
    }
    
    
    def solveIter(): Future[Boolean] = Future {
      /* Takes a VERY VERY VERY .... lot of time */
      Try(Await.ready(Promise().future, Duration.Inf))
      false
    }
    

    关于scala - 在并行计算中正确使用 future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26883283/

    相关文章:

    斯卡拉函数指针

    scala - 追加到列表模式匹配

    scala - 当我尝试在 Spark-Scala 中编译 SBT 时,错误 package org.apache.spark.sql is not a value

    scala - 调用采用隐式方法返回的函数

    Scala 错误 : found and required are same

    java - Play 无标签的框架输入

    scala - 光滑地选择表格的所有行而不进行过滤

    scala - Array[BigInt] 与 Array[Int] 在 Scala 中的初始化

    scala - 有效比较 Scala 集合中两个的所有组合

    scala - 断言值的简洁方法与 ScalaTest 中的给定模式匹配