scala.concurrent.Future.onSuccess 在不同的 ExecutorService 上的执行时间

标签 scala future executioncontext

我想控制 ExecutionContext 中的线程数。所以我创建了一个 ThreadPoolExecutor 实例,然后从中创建了 ExecutionContext。

然后我创建了一些 Futures 并在其上附加了 onSuccess 回调。我希望在每个 Future 工作完成时调用每个 onSuccess 回调。但是我发现所有的 onSuccess 回调都是同时执行的。

import java.util.concurrent.{ Executors, ForkJoinPool }

import scala.concurrent.{ Await, ExecutionContext, Future }
import scala.concurrent.duration.Duration

object Main extends App {
  implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
  // implicit val ec = ExecutionContext.fromExecutorService(new ForkJoinPool(2))

  val start = System.currentTimeMillis()

  val futures = for {
    i <- 1 to 10
  } yield Future[Int] {
    Thread.sleep(i * 1000)
    i
  }

  futures.foreach { f =>
    f.onSuccess { case i =>
      println(s"${i} Success. ${System.currentTimeMillis() - start}ms elapsed.")
    }
  }

  Await.ready(Future.sequence(futures.toList), Duration.Inf)
  ec.shutdown()
}

// ThreadPoolExecutor Result
// 1 Success. 25060ms elapsed.
// 2 Success. 25064ms elapsed.
// 3 Success. 25064ms elapsed.
// 4 Success. 25064ms elapsed.
// 5 Success. 25064ms elapsed.
// 6 Success. 25064ms elapsed.
// 7 Success. 25065ms elapsed.
// 8 Success. 25065ms elapsed.
// 9 Success. 25065ms elapsed.
// 10 Success. 30063ms elapsed.

// ForkJoinPool Result
// 1 Success. 1039ms elapsed.
// 2 Success. 2036ms elapsed.
// 3 Success. 4047ms elapsed.
// 4 Success. 6041ms elapsed.
// 5 Success. 12042ms elapsed.
// 6 Success. 12043ms elapsed.
// 7 Success. 25060ms elapsed.
// 8 Success. 25060ms elapsed.
// 9 Success. 25060ms elapsed.
// 10 Success. 30050ms elapsed.

上面的结果是同时打印的,不是分别打印的。但是当我使用 ForkJoinPool 而不是 ThreadPoolExecutor 时,这个问题就得到了缓解。我是否滥用了 ExecutionContext 和 Future?

已编辑:我发现当线程数小于 future 数时会出现问题。所以我编辑了上面的代码以重现问题并打印执行时间。

我觉得future callback即使线程数少也应该按时调用...

最佳答案

我最终知道 Future 回调(onComplete 或 onSuccess)是在提供的 ExecutionContext 的线程上执行的。所以如果池中没有空闲线程,则无法执行回调。 See scala.concurrent.Future

但我仍然不了解 ForkJoinPool 的行为。我需要研究一下。

关于scala.concurrent.Future.onSuccess 在不同的 ExecutorService 上的执行时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41482601/

相关文章:

scala - scala中是否有快速并发语法糖的实现?例如。 map 减少

Flutter - Dart - setState 在与异步函数的 Future<> 一起使用时不会重新加载状态

c# - 在Azure Function App中全局获取ExecutionContext

javascript - D365 CE Online - 全局执行上下文

scala - 计算 Hadoop 上偶数/奇数对的总和?

scala - 函数集中的特征函数

scala - 如何使用Scala在Spark中创建SQLContext?

java - 如何监控 future 任务完成的后台任务

dart - 错误 : The argument type '(File) → Future<dynamic>' can't be assigned to the parameter type '(dynamic) → FutureOr<dynamic>'