我认为使用 future 可以很容易地让我触发一次代码块,但似乎我一次只能有 4 个 future 。
这个限制是从哪里来的,或者我是在滥用 Futures 这样使用它吗?
import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import java.util.Calendar
object Main extends App{
val rand = scala.util.Random
for (x <- 1 to 100) {
val f = Future {
//val sleepTime = rand.nextInt(1000)
val sleepTime = 2000
Thread.sleep(sleepTime)
val today = Calendar.getInstance().getTime()
println("Future: " + x + " - sleep was: " + sleepTime + " - " + today)
1;
}
}
Thread.sleep(10000)
}
输出:
Future: 3 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 2 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 4 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 1 - sleep was: 2000 - Mon Aug 31 10:02:44 CEST 2015
Future: 7 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 5 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 6 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 8 - sleep was: 2000 - Mon Aug 31 10:02:46 CEST 2015
Future: 9 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 11 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 10 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 12 - sleep was: 2000 - Mon Aug 31 10:02:48 CEST 2015
Future: 16 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 13 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 15 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
Future: 14 - sleep was: 2000 - Mon Aug 31 10:02:50 CEST 2015
我希望他们都同时出现。
为了给出一些上下文,我想我可以使用这个结构并通过一个主循环来扩展它,在这个主循环中,它根据从指数分布中提取的值休眠每个循环,以模拟用户到达/执行查询。每次 sleep 后,我想通过将查询发送到程序的驱动程序(在本例中为 Spark,并且驱动程序允许多个线程使用它)来执行查询。还有比使用 Futures 更明显的方法吗?
最佳答案
当您使用 import ExecutionContext.Implicits.global
时,
它创建具有与 CPU 数量相同大小的线程池。
从 ExecutionContext.scala 的来源
The default
ExecutionContext
implementation is backed by a work-stealing thread pool. By default, the thread pool uses a target number of worker threads equal to the number of [[https://docs.oracle.com/javase/8/docs/api/java/lang/Runtime.html#availableProcessors-- available processors]].
还有一个很好的 StackOverflow 问题:What is the behavior of scala.concurrent.ExecutionContext.Implicits.global?
由于线程池的默认大小取决于 CPU 的数量,如果要使用更大的线程池,则必须编写类似
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
implicit val ec = ExecutionContext.fromExecutorService(Executors.newWorkStealingPool(8))
在执行
Future
之前。(在您的代码中,您必须将它放在
for
循环之前。)请注意,工作窃取池是在 java 8 中添加的,scala 有自己的
ForkJoinPool
来完成工作窃取:scala.concurrent.forkjoin.ForkJoinPool vs java.util.concurrent.ForkJoinPool此外,如果您希望每个
Future
一个线程,您可以编写类似implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)
因此,以下代码并行执行 100 个线程
import scala.concurrent._
import java.util.concurrent.Executors
object Main extends App{
for (x <- 1 to 100) {
implicit val ec = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)
val f = Future {
val sleepTime = 2000
Thread.sleep(sleepTime)
val today = Calendar.getInstance().getTime()
println("Future: " + x + " - sleep was: " + sleepTime + " - " + today)
1;
}
}
Thread.sleep(10000)
}
除了工作窃取线程池和单线程执行器,还有一些其他的执行器:http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Executors.html
详细阅读文档:
http://docs.scala-lang.org/overviews/core/futures.html
关于scala - 在 Scala 中最多只能做 4 个并发 future ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32306671/