multithreading - Play 框架 future 没有被默认调度程序并行化

标签 multithreading scala playframework akka executioncontext

这个问题在这里已经有了答案:





Why are Futures within Futures running sequentially when started on Akka Dispatcher

(3 个回答)


1年前关闭。




我正在尝试测试 ExecutionContext在 Play 应用程序中的行为,发现当我通过调用 as.dispatcher 使用默认调度程序时,我无法实现任何程度的并行性。 , as.dispatchers.lookup("akka.actor.default-dispatcher")或者将默认执行上下文作为参数传递给我的 Controller 类:

class HomeController @Inject()(cc: ControllerComponents)(implicit ec: ExecutionContext)

我正在构建 here 中可用的游戏示例.并添加/更改以下配置:

路线
GET    /futures    controllers.HomeController.testFutures(dispatcherId: String)

配置文件
akka {
  my-dispatcher {
    executor = "fork-join-executor"
    fork-join-executor {
      # vm-cores = 4
      parallelism-min = 4

      parallelism-factor = 2.0

      # 2x vm-cores
      parallelism-max = 8
    }
  }

  actor.default-dispatcher {
    executor = "fork-join-executor"
    fork-join-executor {
      # vm-cores = 4
      parallelism-min = 4

      parallelism-factor = 2.0

      # 2x vm-cores
      parallelism-max = 8
    }
  }
}

家庭 Controller
@Singleton
class HomeController @Inject()(cc: ControllerComponents, as: ActorSystem) extends AbstractController(cc) {
  import HomeController._

  def testFutures(dispatcherId: String) = Action.async { implicit request =>
    implicit val dispatcher = as.dispatchers.lookup(dispatcherId)
    Future.sequence((0 to 10).map(i => Future {
      val time = 1000 + Random.nextInt(200)
      log.info(s"Sleeping #$i for $time ms")
      Thread.sleep(time)
      log.info(s"Awakening #$i")
    })).map(_ => Ok("ok"))
  }
}

出于某种原因,请调用 http://localhost:9000/futures?dispatcherId=akka.actor.default-dispatcher ( 默认调度程序 )不并行化并产生以下输出:
[info] c.HomeController - Sleeping #0 for 1044 ms
[info] c.HomeController - Awakening #0
[info] c.HomeController - Sleeping #1 for 1034 ms
[info] c.HomeController - Awakening #1
[info] c.HomeController - Sleeping #2 for 1031 ms
[info] c.HomeController - Awakening #2
[info] c.HomeController - Sleeping #3 for 1065 ms
[info] c.HomeController - Awakening #3
[info] c.HomeController - Sleeping #4 for 1082 ms
[info] c.HomeController - Awakening #4
[info] c.HomeController - Sleeping #5 for 1057 ms
[info] c.HomeController - Awakening #5
[info] c.HomeController - Sleeping #6 for 1090 ms
[info] c.HomeController - Awakening #6
[info] c.HomeController - Sleeping #7 for 1165 ms
[info] c.HomeController - Awakening #7
[info] c.HomeController - Sleeping #8 for 1173 ms
[info] c.HomeController - Awakening #8
[info] c.HomeController - Sleeping #9 for 1034 ms
[info] c.HomeController - Awakening #9
[info] c.HomeController - Sleeping #10 for 1056 ms
[info] c.HomeController - Awakening #10

但是打电话到这个http://localhost:9000/futures?dispatcherId=akka.my-dispatcher (使用 另一个调度程序 )并行化正确性并产生以下输出。
[info] c.HomeController - Sleeping #1 for 1191 ms
[info] c.HomeController - Sleeping #0 for 1055 ms
[info] c.HomeController - Sleeping #7 for 1196 ms
[info] c.HomeController - Sleeping #4 for 1121 ms
[info] c.HomeController - Sleeping #6 for 1040 ms
[info] c.HomeController - Sleeping #2 for 1016 ms
[info] c.HomeController - Sleeping #5 for 1107 ms
[info] c.HomeController - Sleeping #3 for 1165 ms
[info] c.HomeController - Awakening #2
[info] c.HomeController - Sleeping #8 for 1002 ms
[info] c.HomeController - Awakening #6
[info] c.HomeController - Sleeping #9 for 1127 ms
[info] c.HomeController - Awakening #0
[info] c.HomeController - Sleeping #10 for 1016 ms
[info] c.HomeController - Awakening #5
[info] c.HomeController - Awakening #4
[info] c.HomeController - Awakening #3
[info] c.HomeController - Awakening #1
[info] c.HomeController - Awakening #7
[info] c.HomeController - Awakening #8
[info] c.HomeController - Awakening #10
[info] c.HomeController - Awakening #9

任何想法为什么会发生这种情况?

最佳答案

我认为行为是由 akka.actor.default-dispatcher 给出的这是类型 BatchingExecutor这将在诸如 map/flatmap 的操作的情况下尝试优化通过在同一个线程中执行它们来避免不必要的调度。在我们要阻止的情况下,我们可以用提示表示它为 scala.concurrent.blocking (Thread.sleep (time))并以这种方式将标记存储在 ThreadLocal[BlockContext] 中这表明意图阻塞并且不应用优化而是在另一个线程中抛出操作。

如果您更改此行 Thread.sleep(time)为此 scala.concurrent.blocking(Thread.sleep(time))你会得到想要的行为

@Singleton
class HomeController @Inject()(cc: ControllerComponents, as: ActorSystem) extends AbstractController(cc) {
  import HomeController._

  def testFutures(dispatcherId: String) = Action.async { implicit request =>
    implicit val dispatcher = as.dispatchers.lookup(dispatcherId)
    Future.sequence((0 to 10).map(i => Future {
      val time = 1000 + Random.nextInt(200)
      log.info(s"Sleeping #$i for $time ms")
      scala.concurrent.blocking(Thread.sleep(time))
      log.info(s"Awakening #$i")
    })).map(_ => Ok("ok"))
  }
}
[info] play.api.Play - Application started (Dev) (no global state)
Sleeping #0 for 1062 ms
Sleeping #1 for 1128 ms
Sleeping #2 for 1189 ms
Sleeping #3 for 1105 ms
Sleeping #4 for 1169 ms
Sleeping #5 for 1178 ms
Sleeping #6 for 1057 ms
Sleeping #7 for 1003 ms
Sleeping #8 for 1164 ms
Sleeping #9 for 1029 ms
Sleeping #10 for 1005 ms
Awakening #7
Awakening #10
Awakening #9
Awakening #6
Awakening #0
Awakening #3
Awakening #1
Awakening #8
Awakening #4
Awakening #5
Awakening #2

关于multithreading - Play 框架 future 没有被默认调度程序并行化,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58306612/

相关文章:

Scala:Play2 中的 def 与 val

java - 在 Heroku(或类似的云服务)中存储 Blob

java - 在使用 Swing 的 Java 中,我如何知道使用 invokeLater 启动的所有线程何时完成?

scala - 由类的方法返回类型参数化的方法

c# - 线程问题

斯卡拉 Spark : Convert Double Column to Date Time Column in dataframe

jpa - playframework JPA 错误和数据库设计问题

java - 如何在play框架中使用多个实体管理器-使用spring data JPA?

java - 如何查看JVM中每个线程使用了多少资源(cpu,内存,io)

c++ - 通过引用传递数组到线程