java - 并行运算符如何改变后续流元素的流程?

标签 java java-stream

执行以下代码

List.of(1, 2, 3, 4).stream()
    .map(
        integer -> {
          System.out.println(
              "Before parallel operator : " + Thread.currentThread().getName() + " : " + integer);
          return integer * 2;
        })
    .parallel()
    .map(
        integer -> {
          System.out.println(
              " After parallel operator : " + Thread.currentThread().getName() + " : " + integer);
          return integer * 2;
        })
    .forEach(
            integer -> {
          System.out.println(" For Each : " + Thread.currentThread().getName() + " : " + integer);
        });

输出:

    Before parallel operator : main : 3
Before parallel operator : ForkJoinPool.commonPool-worker-19 : 2
Before parallel operator : ForkJoinPool.commonPool-worker-23 : 1
Before parallel operator : ForkJoinPool.commonPool-worker-5 : 4
 After parallel operator : main : 6
 After parallel operator : ForkJoinPool.commonPool-worker-23 : 2
 After parallel operator : ForkJoinPool.commonPool-worker-19 : 4
 After parallel operator : ForkJoinPool.commonPool-worker-5 : 8
 For Each : ForkJoinPool.commonPool-worker-19 : 8
 For Each : main : 12
 For Each : ForkJoinPool.commonPool-worker-23 : 4
 For Each : ForkJoinPool.commonPool-worker-5 : 16

除了元素 3 所有其他还是并行运行? 想了解并行运算符在后续调用中的行为?

并行运算符从哪里开始,并行性如何继续?

最佳答案

在调用终端操作(例如 forEachcollect)之前不会处理流,稍后会详细介绍。因此,回答您的问题“并行运算符从哪里开始以及并行性如何继续?”。

文档怎么说?

documentation对这件事很清楚:

the stream's mode can be modified with the BaseStream.sequential() and BaseStream.parallel() operations. The most recent sequential or parallel mode setting applies to the execution of the entire stream pipeline

一个小演示

现在考虑以下代码(请原谅我的 System.out,它用于演示目的)。如果我们在 parallelsequential 之间切换,整个管道都会发生变化,而不仅仅是后续运算符。

System.out.println("=== Creating stream s1 as 1,2,3,4");
var s1 = List.of(1, 2, 3, 4).stream();
System.out.println("s1 is parallel? " + s1.isParallel());

System.out.println("=== s2 results of applying map to s1");
var s2 = s1.map(integer -> integer * 2);
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());

System.out.println("=== s3 results of applying parallel to s2");
var s3 = s2.parallel();
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("s3 is parallel? " + s3.isParallel());

System.out.println("=== s4 results of applying map to s3");
var s4 = s3.map(integer -> integer * 2);
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("s3 is parallel? " + s3.isParallel());
System.out.println("s4 is parallel? " + s4.isParallel());

System.out.println("=== s5 results of applying sequential to s4");
var s5 = s4.sequential();
System.out.println("s1 is parallel? " + s1.isParallel());
System.out.println("s2 is parallel? " + s2.isParallel());
System.out.println("s3 is parallel? " + s3.isParallel());
System.out.println("s4 is parallel? " + s4.isParallel());
System.out.println("s5 is parallel? " + s5.isParallel());

这将输出以下内容:

=== Creating stream s1 as 1,2,3,4
s1 is parallel? false
=== s2 results of applying map to s1
s1 is parallel? false
s2 is parallel? false
=== s3 results of applying parallel to s2
s1 is parallel? true
s2 is parallel? true
s3 is parallel? true
=== s4 results of applying map to s3
s1 is parallel? true
s2 is parallel? true
s3 is parallel? true
s4 is parallel? true
=== s5 results of applying sequential to s4
s1 is parallel? false
s2 is parallel? false
s3 is parallel? false
s4 is parallel? false
s5 is parallel? false

现在,当您调用像 forEachcollect 这样的终端运算符时,它在处理过程中只会考虑顺序流,即使在中间调用了并行。正如文档所述,最近应用的模式用于整个管道。

这有什么用?

你可能会问。可以通过终端运算符(operator)“破坏”管道来更改管道中间的行为。例如,以您的示例为例,如果我们在第一个 map 之后立即应用 collect,则第一个 map 将按顺序执行,然后并行执行仅适用于后续运算符,但实际上,这现在是一个不同的管道,因为所有内容都在中间收集到列表中。

List.of(1, 2, 3, 4).stream()
    .map(integer -> {
        System.out.println("Before stream : " + Thread.currentThread().getName() + " : " + integer);
        return integer * 2;
    })
    .collect(Collectors.toList())
    .stream()
    .parallel()
    .map(integer -> {
        System.out.println("After parallel stream : " + Thread.currentThread().getName() + " : " + integer);
        return integer * 2;
    })
    .forEach(integer -> System.out.println("For Each : " + Thread.currentThread().getName() + " : " + integer));

这将输出如下内容:

Before stream : main : 1
Before stream : main : 2
Before stream : main : 3
Before stream : main : 4
After parallel stream : main : 6
After parallel stream : ForkJoinPool.commonPool-worker-23 : 2
After parallel stream : ForkJoinPool.commonPool-worker-5 : 4
After parallel stream : ForkJoinPool.commonPool-worker-19 : 8
For Each : ForkJoinPool.commonPool-worker-19 : 16
For Each : ForkJoinPool.commonPool-worker-5 : 8
For Each : ForkJoinPool.commonPool-worker-23 : 4
For Each : main : 12

注意第一个 map 是如何顺序执行的,而其余的运算符是并行执行的。

Observable 流实现,例如 RxJava observeOn 运算符对此有不同的理解,但它们也是一种完全不同的做事方式。

关于java - 并行运算符如何改变后续流元素的流程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62140719/

相关文章:

java - 来自 Java 的 HTTP POST 存在 JSON 问题

Java Date 获取实际偏移量

Java 8 按类分离列表元素

java - 从 for 循环到 Java 8 Stream 的例子

java-8 - 如何使用 Collectors.toMap 从具有列表的对象集中收集 map

java lambda - 如何遍历可选列表/可选流

java - MQ异步处理、聚合和发布数据

java - 获取 Android 通知以显示为横幅

java - 如何将 Apache Ignite 缓存条目写入 Apache Avro 文件?

java - 如果列表不为空,如何使用我的服务使用 Java 8 lambda/streams 删除列表中的每个元素