使用 java,我可以创建一个带有执行器和一堆任务的 ExecutorCompletionService。此类安排提交的任务在完成后放置在可使用 take 访问的队列中。 https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html Akka 有类似的东西来管理 Actor 返回的 Future 吗?
最佳答案
此答案仅适用于 Scala。在 scala 中有 sequence
/firstCompletedOf
到 compose futures ,它会在基础 futures 的 all
/one
isCompleted 之后返回新的 future 完成(相当于 CompletionService
的 api docs 中的示例) )。这种解决方案比 ecs.take().get() 更安全,因为如果使用 onComplete 监听器,则不会阻塞;但是,如果您仍然想要一些阻塞服务员 - 请使用 Await.result
。因此,不需要 CompletionService
因为 future 列表足够灵活且安全得多。第一个示例的等效项:
val solvers: List[() => Int] = ...
val futures = solvers.map(s => Future {s()}) //run execution
(Future sequence futures) onComplete { results: Seq[Int] =>
results.map(use)
}
另一个例子是cancelling the task :
val solvers: List[Future => Int] = ... //some list of functions(tasks), Future is used to check if task was interrupted
val (futures, cancels): solvers.map(cancellableFuture) //see https://stackoverflow.com/questions/16020964/cancellation-with-future-and-promise-in-scala
(Future firstCompletedOf futures) onComplete { result: Int =>
cancels.foreach(_())
use(result)
}
说到Java,Akka对scala的futures进行了适配:http://doc.akka.io/docs/akka/snapshot/java/futures.html
如果您只想在完成后按顺序处理结果,您可以使用 actor:
val futures: List[Future]
futures.map(_ pipeTo actor) //actor's mailbox is used as queue
对完成队列的行为进行建模(不推荐):
import scala.concurrent._
import duration._
import scala.concurrent.ExecutionContext.Implicits.global //some execution context
class Queue[T](solvers: Seq[() => T]) extends Iterator[T]{
case class Result(f: Future[Result], r: T)
var futures: Set[Future[Result]] = solvers map {s =>
lazy val f: Future[Result] = Future{Result(f, s())}
f
} toSet
def hasNext() = futures.nonEmpty
def next() = {
val result = Await.result((Future firstCompletedOf futures.toSeq), Duration.Inf)
futures -= result.f
result.r
}
}
scala> val q = new Queue(List(() => 1, () => 2, () => 3, () => 4))
q: Queue[Int] = non-empty iterator
scala> q.next
res14: Int = 2
scala> q.next
res15: Int = 1
scala> q.foreach(println)
4
3
关于java - Akka 是否有一个 ExecutorCompletionService 等效项,其中 Futures 按其完成时间排队?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29454612/