我已经使用多个线程很长时间了,但无法解释这种简单的情况。
import java.util.concurrent.Executors
import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
def addOne(x: Int) = Future(x + 1)
def addTwo(x: Int) = Future {addOne(x + 1)}
addTwo(1)
// res5: Future[Future[Int]] = Future(Success(Future(Success(3))))
令我惊讶的是,它有效。而且我不知道为什么。
问题:
为什么给定一个线程可以同时执行两个Future?
我的期望:
第一个
Future
(addTwo
)占据了一个唯一的线程(newFixedThreadPool(1)
),然后调用了另一个Future
(addOne
),后者又需要另一个线程。因此,该程序最终将因线程不足而陷入困境。
最佳答案
您的代码起作用的原因是,两个 future 都将由同一线程执行。您创建的ExecutionContext
不会直接为每个Thread
使用Future
,而是安排要执行的任务(Runnable
实例)。如果池中没有更多线程可用,则将这些任务放入BlockingQueue
中等待执行。 (有关详细信息,请参见ThreadPoolExecutor API)
如果查看Executors.newFixedThreadPool(1)
的实现,您将看到创建带有无限队列的Executor:
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue[Runnable])
为了获得所需的线程匮乏的效果,您可以自己创建一个具有有限队列的执行程序:
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))
由于
ArrayBlockingQueue
的最小容量为1,您将需要三个Future才能达到限制,并且还需要添加一些代码以对Future的结果执行,以防止它们完成(在下面的示例中,我这样做通过添加.map(identity)
)下面的例子
import scala.concurrent._
implicit val ec = ExecutionContext.fromExecutor(new ThreadPoolExecutor(1, 1, 0L,
TimeUnit.MILLISECONDS, new ArrayBlockingQueue[Runnable](1)))
def addOne(x: Int) = Future {
x + 1
}
def addTwo(x: Int) = Future {
addOne(x + 1) .map(identity)
}
def addThree(x: Int) = Future {
addTwo(x + 1).map(identity)
}
println(addThree(1))
失败于
java.util.concurrent.RejectedExecutionException: Task scala.concurrent.impl.CallbackRunnable@65a264b6 rejected from java.util.concurrent.ThreadPoolExecutor@10d078f4[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 1]
关于multithreading - 为什么此Scala代码在一个线程中执行两个Future?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56829439/