multithreading - Scala:加入/等待不断增长的 future 队列

标签 multithreading scala future scala.js

我启动了几个异步进程,如果需要的话,这些进程可以启动更多进程(想想遍历目录结构或类似的东西)。每个进程都会返回一些内容,最后我想等待所有进程完成并安排一个函数来对结果集合执行某些操作。

天真的尝试

我的解决方案尝试使用可变的ListBuffer(我不断向其中添加我生成的 futures)和 Future.sequence 来安排一些函数在完成所有操作后运行这些 future 在此缓冲区中列出。

我准备了一个最小的例子来说明这个问题:

object FuturesTest extends App {
  var queue = ListBuffer[Future[Int]]()

  val f1 = Future {
    Thread.sleep(1000)
    val f3 = Future {
      Thread.sleep(2000)
      Console.println(s"f3: 1+2=3 sec; queue = $queue")
      3
    }
    queue += f3
    Console.println(s"f1: 1 sec; queue = $queue")
    1
  }
  val f2 = Future {
    Thread.sleep(2000)
    Console.println(s"f2: 2 sec; queue = $queue")
    2
  }

  queue += f1
  queue += f2
  Console.println(s"starting; queue = $queue")

  Future.sequence(queue).foreach(
    (all) => Console.println(s"Future.sequence finished with $all")
  )

  Thread.sleep(5000) // simulates app being alive later
}

它首先调度 f1f2 future,然后 f3 将以 f1 分辨率 1 秒调度之后。 f3 本身将在 2 秒后解析。因此,我期望得到的是以下内容:

starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
Future.sequence finished with ListBuffer(1, 2, 3)

但是,我实际上得到:

starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
Future.sequence finished with ListBuffer(1, 2)
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))

...这很可能是因为我们等待的 futures 列表在 Future.sequence 的初始调用期间是固定的,并且以后不会改变。

工作,但尝试很丑陋

最终,我用这段代码让它按照我想要的方式运行:

  waitForSequence(queue, (all: ListBuffer[Int]) => Console.println(s"finished with $all"))

  def waitForSequence[T](queue: ListBuffer[Future[T]], act: (ListBuffer[T] => Unit)): Unit = {
    val seq = Future.sequence(queue)
    seq.onComplete {
      case Success(res) =>
        if (res.size < queue.size) {
          Console.println("... still waiting for tasks")
          waitForSequence(queue, act)
        } else {
          act(res)
        }
      case Failure(exc) =>
        throw exc
    }
  }

这按预期工作,最终得到所有 3 个 future:

starting; queue = ListBuffer(Future(<not completed>), Future(<not completed>))
f1: 1 sec; queue = ListBuffer(Future(<not completed>), Future(<not completed>), Future(<not completed>))
f2: 2 sec; queue = ListBuffer(Future(Success(1)), Future(<not completed>), Future(<not completed>))
... still waiting for tasks
f3: 1+2=3 sec; queue = ListBuffer(Future(Success(1)), Future(Success(2)), Future(<not completed>))
finished with ListBuffer(1, 2, 3)

但是还是很丑。如果它在完成时发现队列比结果数长,它只会重新启动 Future.sequence 等待,希望下次完成时情况会更好。当然,这很糟糕,因为它会耗尽堆栈,并且如果此检查在创建 future 和将其附加到队列之间的一个小窗口中触发,则可能容易出错。


是否可以在不使用 Akka 重写所有内容或诉诸使用 Await.result (其中 I can't actually use 因为我的代码是为 Scala.js 编译)的情况下做到这一点。

最佳答案

就像 Justin 提到的,你不能丢失对其他 futures 内部生成的 futures 的引用,你应该使用 map 和 flatMap 来链接它们。

val f1 = Future {
  Thread.sleep(1000)
  val f3 = Future {
    Thread.sleep(2000)
    Console.println(s"f3: 1+2=3 sec")
    3
  }
  f3.map{
    r =>
      Console.println(s"f1: 1 sec;")
      Seq(1, r)
  }
}.flatMap(identity)

val f2 = Future {
  Thread.sleep(2000)
  Console.println(s"f2: 2 sec;")
  Seq(2)
}

val futures = Seq(f1, f2)

Future.sequence(futures).foreach(
  (all) => Console.println(s"Future.sequence finished with ${all.flatten}")
)

Thread.sleep(5000) // simulates app being alive later

这适用于最小的示例,我不确定它是否适用于您的实际用例。结果是:

f2: 2 sec;
f3: 1+2=3 sec
f1: 1 sec;
Future.sequence finished with List(1, 3, 2)

关于multithreading - Scala:加入/等待不断增长的 future 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42695985/

相关文章:

c# - 为什么要使用 Mutex 来锁定?

scala - NoClassDefFoundError:SparkSession-即使构建正常

java - 如何使用 CompletionService 取消那些花费太长时间的任务

java - java中的LinkedBlockingQueue和写锁

java - 同步保护不适用于双线程

java - 同步BlockingQueues的ArrayList

scala - 如何在不阻塞的情况下使用 Akka 询问模式

json - 如何使用 scala 中的 json4s 库测试我为解析器创建的案例类是否正确?

scala - 用 scala 替换列表中的元素

java - Future.cancel(boolean) 方法的实用程序