java - 并行流、收集器和线程安全

标签 java concurrency parallel-processing java-8 java-stream

请参阅下面的简单示例,该示例计算列表中每个单词的出现次数:

Stream<String> words = Stream.of("a", "b", "a", "c");
Map<String, Integer> wordsCount = words.collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

最后,wordsCount{a=2, b=1, c=1}

但是我的流非常大,我想并行化这个工作,所以我写:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toMap(s -> s, s -> 1,
                                                      (i, j) -> i + j));

但是我注意到 wordsCount 是一个简单的 HashMap 所以我想知道是否需要明确要求并发映射以确保线程安全:

Map<String, Integer> wordsCount = words.parallel()
                                       .collect(toConcurrentMap(s -> s, s -> 1,
                                                                (i, j) -> i + j));

非并发收集器可以安全地与并行流一起使用,还是应该在从并行流中收集时只使用并发版本?

最佳答案

Can non-concurrent collectors be safely used with a parallel stream or should I only use the concurrent versions when collecting from a parallel stream?

在并行流的collect操作中使用非并发收集器是安全的。

specification Collector 界面,在有六个要点的部分,是这样的:

For non-concurrent collectors, any result returned from the result supplier, accumulator, or combiner functions must be serially thread-confined. This enables collection to occur in parallel without the Collector needing to implement any additional synchronization. The reduction implementation must manage that the input is properly partitioned, that partitions are processed in isolation, and combining happens only after accumulation is complete.

这意味着 Collectors 类提供的各种实现可以与并行流一起使用,即使其中一些实现可能不是并发收集器。这也适用于您可能实现的任何您自己的非并发收集器。它们可以安全地与并行流一起使用,前提是您的收集器不干扰流源、无副作用、与顺序无关等。

我还建议阅读 Mutable Reduction java.util.stream 包文档的部分。本节中间是一个示例,它被声明为可并行化的,但它将结果收集到一个 ArrayList 中,这不是线程安全的。

其工作方式是,以非并发收集器结束的并行流确保不同的线程始终在中间结果集合的不同实例上运行。这就是为什么收集器有一个 Supplier 函数,用于创建与线程一样多的中间集合,因此每个线程都可以累积到自己的。当要合并中间结果时,它们会在线程之间安全地传递,并且在任何给定时间只有一个线程正在合并任何一对中间结果。

关于java - 并行流、收集器和线程安全,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22350288/

相关文章:

java - 应用程序不会写入 MS DB

java - CyclicBarrier/CountDownLatch 和 Java 中的 join 有什么区别?

java - 是否应该实例化实用程序类?

java - 在 EMF 模型中使用 Java 关键字 volatile

c++ - 将生成的进程的输出捕获到字符串

java - 帮助 eclipse 找到我的枚举值?

java - LayoutInflater wordSpan

c - 链接到 openCL 内核程序中的外部库

java - 获取程序正在运行的 .jar 中文件的 URL

parallel-processing - 多核CPU的: Programming techniques to avoid disappointing scalability