scala - 过滤 Scala 的并行集合,并在找到所需数量的结果时提前中止

标签 scala parallel-processing parallel-collections

给定一个非常大的 collection.parallel.mutable.ParHashMap 实例(或任何其他并行集合),一旦给定的匹配数量(例如 50 个),如何中止过滤并行扫描被发现了?

尝试在线程安全的“外部”数据结构中累积中间匹配或保留具有结果计数的外部 AtomicInteger 在 4 核上似乎比使用常规的 collection.mutable.HashMap< 慢 2 到 3 倍/strong> 并将单个核心固定为 100%。

我知道 Par* 集合上的 findexists 确实会“在内部”中止。有没有一种方法可以概括这一点以找到多个结果?

下面的代码在具有 ~ 79,000 个条目的 ParHashMap 上似乎仍然慢 2 到 3 倍,并且还存在将超过 ma​​xResults 结果填充到结果中的问题。结果CHM(这可能是由于线程在incrementAndGet之后但在break之前被抢占,这允许其他线程添加更多元素)。更新:看来速度减慢是由于工作线程在 counter.incrementAndGet() 上竞争,这当然违背了整个并行扫描的目的:-(

def find(filter: Node => Boolean, maxResults: Int): Iterable[Node] =
{
  val counter = new AtomicInteger(0)
  val results = new ConcurrentHashMap[Key,  Node](maxResults)

  import util.control.Breaks._

  breakable
  {
    for ((key, node) <- parHashMap if filter(node))
    {
      results.put(key, node)
      val total = counter.incrementAndGet()
      if (total > maxResults) break
    }
  }

  results.values.toArray(new Array[Node](results.size))
}

最佳答案

我首先会进行并行扫描,其中变量 maxResults 将是线程局部的。这将找到最多 (maxResults * numberOfThreads) 个结果。

然后我会进行单线程扫描以将其减少到 maxResults。

关于scala - 过滤 Scala 的并行集合,并在找到所需数量的结果时提前中止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8073061/

相关文章:

java - Spark 的额外记录器位于特定任务的单独文件中

c - 如何用c语言准确描述进程状态(如初始化、运行、等待等)?

scala - 使用 Scala actors 进行阶乘计算

scala - 在并行集合上调用 .seq 会确保所有线程都加入了吗?

scala - 如何使用 scalaz 中的序列将 T[G[A]] 转换为 G[T[A]]

function - 将 scala 代码概括为函数

scala - 使用 Spark Scala 计算平均值

c# - 最多有 N 个线程按 FIFO 顺序执行的代码部分

multithreading - 定义C++ AMP中的函数

scala - Scala 中并行集合的效率/可扩展性(图表)