scala - 关于 Future.firstCompletedOf 和 Garbage Collect 机制

标签 scala concurrency garbage-collection jvm future

我在实际项目中遇到过这个问题,并通过我的测试代码和分析器证明了这一点。我没有粘贴“tl;dr”代码,而是向您展示一张图片,然后对其进行描述。
enter image description here

简单地说,我正在使用 Future.firstCompletedOf从 2 Future 得到结果s,两者没有共同的东西,互不关心。尽管如此,这是我想要解决的问题,垃圾收集器无法回收第一个 Result对象直到 Future s 完成 .

所以我真的很好奇这背后的机制。有人可以从较低的层次解释它,或者提供一些提示供我研究。

谢谢!

PS:是不是因为他们共享同一个ExecutionContext ?

** 根据要求更新 ** 粘贴测试代码

object Main extends App{
  println("Test start")

  val timeout = 30000

  trait Result {
    val id: Int
    val str = "I'm short"
  }
  class BigObject(val id: Int) extends Result{
    override val str = "really big str"
  }

  def guardian = Future({
    Thread.sleep(timeout)
    new Result { val id = 99999 }
  })

  def worker(i: Int) = Future({
    Thread.sleep(100)
    new BigObject(i)
  })

  for (i <- Range(1, 1000)){
    println("round " + i)
    Thread.sleep(20)
    Future.firstCompletedOf(Seq(
      guardian,
      worker(i)
    )).map( r => println("result" + r.id))
  }

  while (true){
    Thread.sleep(2000)
  }
}

最佳答案

让我们看看如何firstCompletedOf实现:

def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
  val p = Promise[T]()
  val completeFirst: Try[T] => Unit = p tryComplete _
  futures foreach { _ onComplete completeFirst }
  p.future
}

做的时候{ futures foreach { _ onComplete completeFirst } ,函数completeFirst保存在某处
通过 ExecutionContext.execute .这个函数究竟保存在哪里无关紧要,我们只知道它必须保存在某个地方
以便稍后可以在线程可用时选择并在线程池上执行它。只有当 future 完成时才引用completeFirst不再需要了。

因为 completeFirst结束 p ,只要还有一个 future (来自 futures )等待完成,就有一个对 p 的引用。这会阻止它被垃圾收集(即使到那时 firstCompletedOf 已经返回,从堆栈中移除 p)。

当第一个 future 完成时,它将结果保存到 promise 中(通过调用 p.tryComplete )。
因为 promise p保持结果,结果至少在 p 内是可到达的是可达的,正如我们看到的 p只要来自 futures 的至少一个 future 是可达的还没有完成。
这就是为什么在所有 future 完成之前无法收集结果的原因。

更新 :
现在的问题是:它可以修复吗?我认为可以。我们所要做的就是确保第一个完成的 future 以线程安全的方式“清除”对 p 的引用,这可以通过使用 AtomicReference 的示例来完成。像这样的东西:
def firstCompletedOf[T](futures: TraversableOnce[Future[T]])(implicit executor: ExecutionContext): Future[T] = {
  val p = Promise[T]()
  val pref = new java.util.concurrent.atomic.AtomicReference(p)
  val completeFirst: Try[T] => Unit = { result: Try[T] =>
    val promise = pref.getAndSet(null)
    if (promise != null) {
      promise.tryComplete(result)
    }
  }
  futures foreach { _ onComplete completeFirst }
  p.future
}

我已经对其进行了测试,并且正如预期的那样,它确实允许在第一个 future 完成后立即对结果进行垃圾收集。它应该在所有其他方面表现相同。

关于scala - 关于 Future.firstCompletedOf 和 Garbage Collect 机制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36420697/

相关文章:

c++ - 批评我的并发队列

go - 避免重复情况下嵌套选择的模式

C# - 释放对象使用的所有资源

jquery - 使用 JQuery 进行 JSONP Ajax 调用会导致内存泄漏

scala - 在 Scala 中使用类调用方法

scala - 与列表一样, map 为 Nil

java - (Java 中的线程池)增加线程数会导致简单 for 循环变慢。为什么?

c# - 我可以强制 .NET XmlDocument 实例尽快释放它占用的内存吗?

java - 打开现有的嵌入式 Neo4j 数据库

scala - 将 map 方法应用于 Scala 中的 BitSet