scala - 如何为 futures、并行集合和 akka 创建共享线程池/executioncontext

标签 scala akka

我想在我的应用程序中定义 2 个线程池。一个 fork-join 和一个线程池执行器。此外,每个池应该能够由 Akka 参与者、Scala future 和 Scala 并行集合共享。

对于 future ,scala 需要范围内的执行上下文,并且可以通过以下方式之一创建:

implicit val ec = ExecutionContext.global //want to avoid this
import scala.concurrent.ExecutionContext.Implicits.global //want to avoid this
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(64))
implicit val ec = ExecutionContext.fromExecutor(Executors.newForkJoinThreadPool(64))

对于并行收集,你必须像这样修改 TaskSupport

val forkJoinPool = new java.util.concurrent.ForkJoinPool(64)
parArray.tasksupport = new ForkJoinTaskSupport(forkJoinPool)

有了上面的内容,我唯一的选择是在我的应用程序中全局定义 val forkJoinPool = new java.util.concurrent.ForkJoinPool(64) 并将其用于两者。

但是,我不知道如何为 Akka Actor 使用同一个池。 对于 Akka,我至少看到了以下两种自定义池的方法。

val actorSysterm = ActorSystem.create("hello-system", config.getConfig("my-dispatcher”)) 
implicit val executionContext = actorSysterm.dispatcher

implicit val system = ActorSystem()
implicit val executionContext = actorSysterm.dispatchers.lookup("my-dispatcher")

这是基于配置文件。

my-dispatcher {
  type = Dispatcher
  executor = "fork-join-executor"
  fork-join-executor {
    parallelism-min = 8
    parallelism-factor = 2.0
    parallelism-max = 64
  }
  throughput = 100
}
blocking-io-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    fixed-pool-size = 32
  }
  throughput = 1
}

我更喜欢 Akka 创建 ExecutionContext 因为现在我可以在 hocon(json) 文件中配置我的池并且也可以在 play 框架应用程序中使用它。我可以将它与 Scala Futures 一起使用,但现在如何将它与 Scala Parallel 集合一起使用呢?有没有办法从 ExecutionContext 访问底层池,这样我就可以用它来初始化并行集合的 TaskSupport

最佳答案

我之前忽略了并行收集使用的 TaskSupport 的另一种实现。这样我就可以使用与 Future 和 ActorSystem 相同的 executionContext

pc.tasksupport = new ExecutionContextTaskSupport(executionContext)

关于scala - 如何为 futures、并行集合和 akka 创建共享线程池/executioncontext,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55272824/

相关文章:

scala - Akka 流/HTTP : Get original request from response

java - Akka Actor 意外地清理了它的状态

java - 使用 Akka(使用 Java)如何验证我的被测 Actor 正在观看另一个 Actor ?

eclipse - 在 Eclipse 中,我可以为我的项目选择 Scala 版本吗?

scala - 运行一个游戏!使用新的 sbt 和 jdk 9 的项目

algorithm - 扩展动态特征?

scala - 与 Akka 不匹配的消息会发生什么?

scala - 由类的方法返回类型参数化的方法

scala - 为什么在代码块中包装值会将我键入的数字更改为 double 值?

akka - 配置 core-pool-size-factor 和 max-pool-size-factor 设置的一般好做法是什么?