scala - 在 Scala 中最多只能做 4 个并发 future

标签 scala concurrency

我认为使用 future 可以很容易地让我触发一次代码块,但似乎我一次只能有 4 个 future 。

这个限制是从哪里来的,或者我是在滥用 Futures 这样使用它吗?

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import java.util.Calendar

object Main extends App{

  val rand = scala.util.Random

  for (x <- 1 to 100) {
    val f = Future {
      //val sleepTime =  rand.nextInt(1000)
      val sleepTime =  2000
      Thread.sleep(sleepTime)

      val today = Calendar.getInstance().getTime()
      println("Future: " + x + " - sleep was: " + sleepTime + " - " + today)
      1;
    }
  }

  Thread.sleep(10000)
}

输出:
Future: 3 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 2 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 4 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 1 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 7 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 5 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 6 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 8 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 9 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 11 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 10 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 12 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 16 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 13 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 15 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 14 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015

我希望他们都同时出现。

为了给出一些上下文,我想我可以使用这个结构并通过一个主循环来扩展它,在这个主循环中,它根据从指数分布中提取的值休眠每个循环,以模拟用户到达/执行查询。每次 sleep 后,我想通过将查询发送到程序的驱动程序(在本例中为 Spark,并且驱动程序允许多个线程使用它)来执行查询。还有比使用 Futures 更明显的方法吗?

最佳答案

当您使用 import ExecutionContext.Implicits.global 时,
它创建具有与 CPU 数量相同大小的线程池。

ExecutionContext.scala 的来源

The default ExecutionContext implementation is backed by a work-stealing thread pool. By default, the thread pool uses a target number of worker threads equal to the number of [[https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#availableProcessors-- available processors]].



还有一个很好的 StackOverflow 问题:What is the behavior of scala.concurrent.ExecutionContext.Implicits.global?

由于线程池的默认大小取决于 CPU 的数量,如果要使用更大的线程池,则必须编写类似
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
implicit val ec = ExecutionContext.fromExecutorService(Executors.newWorkStealingPool(8))

在执行 Future 之前。

(在您的代码中,您必须将它放在 for 循环之前。)

请注意,工作窃取池是在 java 8 中添加的,scala 有自己的 ForkJoinPool 来完成工作窃取:scala.concurrent.forkjoin.ForkJoinPool vs java.util.concurrent.ForkJoinPool

此外,如果您希望每个 Future 一个线程,您可以编写类似
implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)

因此,以下代码并行执行 100 个线程
import scala.concurrent._
import java.util.concurrent.Executors

object Main extends App{
  for (x <- 1 to 100) {
    implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)
    val f = Future {
      val sleepTime =  2000
      Thread.sleep(sleepTime)

      val today = Calendar.getInstance().getTime()
      println("Future: " + x + " - sleep was: " + sleepTime + " - " + today)
      1;
    }
  }

  Thread.sleep(10000)
}

除了工作窃取线程池和单线程执行器,还有一些其他的执行器:http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html

详细阅读文档:
http://docs.scala-lang.org/overviews/core/futures.html

关于scala - 在 Scala 中最多只能做 4 个并发 future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32306671/

相关文章:

java - 在spark中使用stanford nlp,错误 "Class java.util.function.Function not found - continuing with a stub."

scala - 是否可以创建带有命名参数的部分应用函数?

Linux ext3 readdir 和并发更新

mysql - 当另一个事务拥有排他锁时,MySQL 如何允许获取共享锁?

java - Java的事前关系?

java - 您能否为使用 Lift 框架和 Scala 制作的 Web 应用推荐一个好的共享托管服务提供商?

jquery - 在 scala.js 中实现 jquery-ui

scala - 可变协变类中Scala字段的下限类型?

java - 等待 Completable future 线程完成的推荐方法是什么

java - 了解 JVM 对不可变对象(immutable对象)的保证