给定一个非常大的 collection.parallel.mutable.ParHashMap 实例(或任何其他并行集合),一旦给定的匹配数量(例如 50 个),如何中止过滤并行扫描被发现了?
尝试在线程安全的“外部”数据结构中累积中间匹配或保留具有结果计数的外部 AtomicInteger 在 4 核上似乎比使用常规的 collection.mutable.HashMap< 慢 2 到 3 倍/strong> 并将单个核心固定为 100%。
我知道 Par* 集合上的 find 或 exists 确实会“在内部”中止。有没有一种方法可以概括这一点以找到多个结果?
下面的代码在具有 ~ 79,000 个条目的 ParHashMap 上似乎仍然慢 2 到 3 倍,并且还存在将超过 maxResults 结果填充到结果中的问题。结果CHM(这可能是由于线程在incrementAndGet之后但在break之前被抢占,这允许其他线程添加更多元素)。更新:看来速度减慢是由于工作线程在 counter.incrementAndGet() 上竞争,这当然违背了整个并行扫描的目的:-(
def find(filter: Node => Boolean, maxResults: Int): Iterable[Node] =
{
val counter = new AtomicInteger(0)
val results = new ConcurrentHashMap[Key, Node](maxResults)
import util.control.Breaks._
breakable
{
for ((key, node) <- parHashMap if filter(node))
{
results.put(key, node)
val total = counter.incrementAndGet()
if (total > maxResults) break
}
}
results.values.toArray(new Array[Node](results.size))
}
最佳答案
我首先会进行并行扫描,其中变量 maxResults 将是线程局部的。这将找到最多 (maxResults * numberOfThreads) 个结果。
然后我会进行单线程扫描以将其减少到 maxResults。
关于scala - 过滤 Scala 的并行集合,并在找到所需数量的结果时提前中止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8073061/