multithreading - `foreach` 过度并行收集永远不会开始

标签 multithreading mongodb scala collections parallel-processing

我有一个 Mongo 数据库,其中包含我想并行处理的作业;我想尝试使用并行集合来透明地为我处理线程(并不是说使用线程池会更难)。我想出了这段代码:

def run(stopSignal: SynchronizedQueue[Any]) = {
  val queue = new Iterator[Job] {
    private var prevId = new ObjectId("000000000000000000000000")

    def hasNext = stopSignal.isEmpty

    @tailrec
    def next = {
      val job = Job
        .where(_.status eqs Pending)
        // this works because the IDs start with a timestamp part
        .where(_._id gt prevId)
        .orderAsc(_.regTime)
        .get()
      job match {
        case Some(job) =>
          prevId = job.id
          println(s"next() => ${job.id}")
          job
        case None if hasNext =>
          Thread.sleep(500) // TODO: use a tailable cursor instead
          next
        case None =>
          throw new InterruptedException
      }
    }
  }

  try {
    queue.toStream.par.foreach { job =>
      println(s"processing ${job.id}...")
      processOne(job)
      println(s"processing complete: ${job.id}")
    }
  } catch { case _: InterruptedException => }
}

这会产生:

next() => 53335f7bef867e6f0805abdb
next() => 53335fc6ef867e6f0805abe2
next() => 53335ffcef867e6f0805abe6
next() => 53336005ef867e6f0805abe7
next() => 53336008ef867e6f0805abe8
next() => 5333600cef867e6f0805abe9

但处理从未开始;即传递给 foreach 的函数永远不会被调用。如果我删除 .par 调用,它工作正常(但当然是连续的)。

这里究竟泄漏了哪个抽象?我该如何解决?还是我应该为此放弃使用并行集合并转向更简单的线程池方法?

最佳答案

par 方法首先将流中的元素排空到一个ParSeq 中。 所以当你调用 queue.toStream.par 时。它将遍历流(调用底层迭代器的 hasNext 和 next 方法,直到迭代器没有 next)。在检索到所有作业后,它开始调用 processJob

例如

scala> (1 to 100).iterator.toStream
res7: scala.collection.immutable.Stream[Int] = Stream(1, ?)

scala> (1 to 100).iterator.toStream.par
res8: scala.collection.parallel.immutable.ParSeq[Int] = ParVector(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)

par 方法不偷懒

如果您只想并行执行(实际上它已经并行但不是惰性):

  • 将流分组。调用每个批处理的 par
  • 您可以将 processJob 方法放在 Future 中。
  • 或者你可以考虑使用actor模型来处理这些事情
  • 使用https://github.com/scalaz/scalaz-stream/

关于multithreading - `foreach` 过度并行收集永远不会开始,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22723208/

相关文章:

java - OSGi 线程永远不会停止运行,为线程定义了固定的生命周期

scala - 如何编写绑定(bind)集合类型和元素类型的通用 Scala 增强方法?

c++ - 嵌入式 Python 段错误

c# - 使用 backgroundworker 进行时间控制

mongodb - 如何在MongoDB查询中过滤和映射文档数组?

C# Mongodb 查找集合中最接近的时间

regex - 如何从 RDD 中的键中删除双引号并将 JSON 分成两行?

scala - 寻找 windowAll() 的替代形式,将数据保存在同一节点上以进行聚合

python - PyQt5 线程 GUI 不工作

javascript - “null”传递到 Mongoose 查询 'findOne()'