我正在实现一种可以轻松并行化的算法,但无法弄清楚如何创建适当数量的 future 以及如何提前中止。目前代码的大纲是沿着这些线
def solve: Boolean = {
var result = false
while(!result && i < iterations) {
val futures = (1 to threads) map { _ => solveIter(geInitialValues()) }
val loopResult = Future.fold(futures)(false)((acc, r) => acc || r )
result = Await.result(loopResult, Duration.Inf)
i+=1
}
}
def solveIter(initialValues: Values): Future[Boolean] = Future {
/* Takes a lot of time */
}
明显的问题是明确设置的并行级别可能适合也可能不适合当前执行上下文。如果一次创建所有 future 如何制作
Future.fold
早点流产?
最佳答案
您不能取消 Future,因为 Futures 是只读的。但是您可以使用 Promise,它是“来自 future 的写入部分”。
示例代码:
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.util.Try
// create a bunch of promises
val promises = ((1 to 10) map { _ =>
val p = Promise[Boolean]()
p.completeWith(solveIter())
p
}) // :+ Promise().success(true)
// ^^ REMOVE THIS COMMENT TO ADD A PROMISE WHICH COMPLETES
// get the futures from the promises
val futures = promises.map(_.future)
// loop over all futures
futures.foreach(oneFuture =>
// register callback when future is done
oneFuture.foreach{
case true =>
println("future with 'true' result found")
// stop others
promises.foreach(_.trySuccess(false))
case _ => // future completes with false
})
// wait at most 5 seconds till all futures are done
Try(Await.ready(Future.sequence(futures), 5.seconds)).recover { case _ =>
println("TIMEOUT")
}
def solveIter(): Future[Boolean] = Future {
/* Takes a VERY VERY VERY .... lot of time */
Try(Await.ready(Promise().future, Duration.Inf))
false
}
关于scala - 在并行计算中正确使用 future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26883283/