java - Akka 是否有一个 ExecutorCompletionService 等效项,其中 Futures 按其完成时间排队?

标签 java scala akka actor

使用 java,我可以创建一个带有执行器和一堆任务的 ExecutorCompletionService。此类安排提交的任务在完成后放置在可使用 take 访问的队列中。 https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorCompletionService.html Akka 有类似的东西来管理 Actor 返回的 Future 吗?

最佳答案

此答案仅适用于 Scala。在 scala 中有 sequence/firstCompletedOfcompose futures ,它会在基础 futures 的 all/one isCompleted 之后返回新的 future 完成(相当于 CompletionServiceapi docs 中的示例) )。这种解决方案比 ecs.take().get() 更安全,因为如果使用 onComplete 监听器,则不会阻塞;但是,如果您仍然想要一些阻塞服务员 - 请使用 Await.result。因此,不需要 CompletionService 因为 future 列表足够灵活且安全得多。第一个示例的等效项:

  val solvers: List[() => Int] = ...
  val futures = solvers.map(s => Future {s()}) //run execution
  (Future sequence futures) onComplete { results: Seq[Int] =>
      results.map(use)
  }

另一个例子是cancelling the task :

  val solvers: List[Future => Int] = ... //some list of functions(tasks), Future is used to check if task was interrupted
  val (futures, cancels): solvers.map(cancellableFuture) //see https://stackoverflow.com/questions/16020964/cancellation-with-future-and-promise-in-scala

  (Future firstCompletedOf futures) onComplete { result: Int =>
        cancels.foreach(_())
        use(result)
  }

说到Java,Akka对scala的futures进行了适配:http://doc.akka.io/docs/akka/snapshot/java/futures.html

如果您只想在完成后按顺序处理结果,您可以使用 actor:

  val futures: List[Future]
  futures.map(_ pipeTo actor) //actor's mailbox is used as queue

对完成队列的行为进行建模(不推荐):

  import scala.concurrent._
  import duration._
  import scala.concurrent.ExecutionContext.Implicits.global //some execution context

  class Queue[T](solvers: Seq[() => T]) extends Iterator[T]{
     case class Result(f: Future[Result], r: T)
     var futures: Set[Future[Result]] = solvers map {s => 
        lazy val f: Future[Result]  = Future{Result(f, s())}
        f
     } toSet

     def hasNext() = futures.nonEmpty        

     def next() = {         
        val result = Await.result((Future firstCompletedOf futures.toSeq), Duration.Inf)
        futures -= result.f
        result.r
     }
  }

scala> val q = new Queue(List(() => 1, () => 2, () => 3, () => 4))
q: Queue[Int] = non-empty iterator

scala> q.next
res14: Int = 2

scala> q.next
res15: Int = 1

scala> q.foreach(println)
4
3

关于java - Akka 是否有一个 ExecutorCompletionService 等效项,其中 Futures 按其完成时间排队?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29454612/

相关文章:

scala - Akka流:用于自定义SourceShape的KillSwitch,可从视频文件中发射帧

routes - Akka 消息传递机制示例

java - JSP 应用程序 - 使用 Context.xml 环境标记的全局变量列表

java从带有缓冲区的网络输入流中读取(需要排除空符号)

scala - 在另一个数据帧的转换中创建/访问数据帧

java - 凭经验估计大时间效率

scala - 如何使用映射将数据框保存到Elasticsearch

java - Akka 框架的最佳用例是什么

java - Java 中的正则表达式 : Capture last {n} words

java - Mediaplayer 播放几次后停止播放