Java,顺序流在哪个线程中执行?

标签 java foreach java-8 java-stream

在阅读有关流的文档时,我遇到了以下句子:

  • ... attempting to access mutable state from behavioral parameters presents you with a bad choice ... if you do not synchronize access to that state, you have a data race and therefore your code is broken ... [1]

  • If the behavioral parameters do have side-effects ... [there are no] guarantees that different operations on the "same" element within the same stream pipeline are executed in the same thread. [2]

  • For any given element, the action may be performed at whatever time and in whatever thread the library chooses. [3]

这些句子没有区分顺序流和并行流。所以我的问题是:

  1. 在哪个线程中执行顺序流的管道?它总是调用线程还是可以自由选择任何线程的实现?
  2. 如果是顺序流,forEach终端操作的action参数在哪个线程执行?
  3. 在使用顺序流时是否必须使用任何同步?

最佳答案

这一切都归结为根据规范保证的内容,以及当前实现可能具有超出保证范围的额外行为这一事实。

Java 语言架构师 Brian Goetz 在 related question 中就规范提出了相关观点:

Specifications exist to describe the minimal guarantees a caller can depend on, not to describe what the implementation does.

[...]

When a specification says "does not preserve property X", it does not mean that the property X may never be observed; it means the implementation is not obligated to preserve it. [...] (HashSet doesn't promise that iterating its elements preserves the order they were inserted, but that doesn't mean this can't accidentally happen -- you just can't count on it.)

这一切都意味着,即使当前的实现恰好具有某些行为特征,也不应依赖它们,也不应假定它们不会在库的新版本中发生变化。

顺序流管道线程

In which thread is the pipeline of a sequential stream executed? Is it always the calling thread or is an implementation free to choose any thread?

当前的流实现可能使用也可能不使用调用线程,并且可能使用一个或多个线程。由于 API 未指定任何内容,因此不应依赖此行为。

forEach执行线程

In which thread is the action parameter of the forEach terminal operation executed if the stream is sequential?

虽然当前的实现使用现有线程,但不能依赖这一点,因为文档指出线程的选择取决于实现。事实上,无法保证元素不会由不同线程针对不同元素进行处理,尽管这不是当前流实现所做的事情。

根据 API:

For any given element, the action may be performed at whatever time and in whatever thread the library chooses.

请注意,虽然 API 在讨论遇到顺序时专门调用了并行流,但 Brian Goetz 澄清了这一点以阐明行为的动机,而不是任何行为特定于并行流:

The intent of calling out the parallel case explicitly here was pedagogical [...]. However, to a reader who is unaware of parallelism, it would be almost impossible to not assume that forEach would preserve encounter order, so this sentence was added to help clarify the motivation.

使用顺序流进行同步

Do I have to use any synchronization when using sequential streams?

当前的实现可能会起作用,因为它们对顺序流的 forEach 方法使用单个线程。但是,由于流规范不保证它,因此不应依赖它。因此,应该使用同步,就好像这些方法可以被多个线程调用一样。

也就是说,stream documentation特别建议不要使用需要同步的副作用,并建议使用缩减操作而不是可变累加器:

Many computations where one might be tempted to use side effects can be more safely and efficiently expressed without side-effects, such as using reduction instead of mutable accumulators. [...] A small number of stream operations, such as forEach() and peek(), can operate only via side-effects; these should be used with care.

As an example of how to transform a stream pipeline that inappropriately uses side-effects to one that does not, the following code searches a stream of strings for those matching a given regular expression, and puts the matches in a list.

     ArrayList<String> results = new ArrayList<>();
     stream.filter(s -> pattern.matcher(s).matches())
           .forEach(s -> results.add(s));  // Unnecessary use of side-effects!

This code unnecessarily uses side-effects. If executed in parallel, the non-thread-safety of ArrayList would cause incorrect results, and adding needed synchronization would cause contention, undermining the benefit of parallelism. Furthermore, using side-effects here is completely unnecessary; the forEach() can simply be replaced with a reduction operation that is safer, more efficient, and more amenable to parallelization:

     List<String>results =
         stream.filter(s -> pattern.matcher(s).matches())
               .collect(Collectors.toList());  // No side-effects!

关于Java,顺序流在哪个线程中执行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45871618/

相关文章:

Java 的 "forEach"到 Scala

lambda - 使用 JDK8 和 lambda 压缩流 (java.util.stream.Streams.zip)

java - 如何为 Kryo Serializer 注册多个类? (例如包中的所有类)

java - 为什么 HttpClient 在执行 POST 时会抛出 SocketTimeOutException

java - 在 OSGi 容器中使用 Solrj

java - 如何从包含数组的数组中删除空值

Java 类祖先流

java - 未找到 org.bukkit :bukkit 的 Maven 依赖项

php - Foreach 和 while 循环打印相同的值

PHP:如何创建一个好的 foreach 速记?