我想了解 Java 中嵌套流之间的排序约束。
示例 1:
public static void main(String[] args) {
IntStream.range(0, 10).forEach(i -> {
System.out.println(i);
IntStream.range(0, 10).forEach(j -> {
System.out.println(" " + i + " " + j);
});
});
}
此代码确定性地执行,因此内部循环运行 forEach
每个 j
在外循环运行自己的 forEach
之前上下i
:
0
0 0
0 1
0 2
0 3
0 4
0 5
0 6
0 7
0 8
0 9
1
1 0
1 1
1 2
1 3
1 4
1 5
1 6
1 7
1 8
1 9
2
2 0
2 1
2 2
2 3
...
示例 2:
public static void main(String[] args) {
IntStream.range(0, 10).parallel().forEach(i -> {
System.out.println(i);
IntStream.range(0, 10).parallel().forEach(j -> {
System.out.println(" " + i + " " + j);
});
});
}
如果制作了流parallel()
就像在第二个示例中一样,我可以想象内部工作线程在等待外部工作队列中的线程变得可用时会阻塞,因为外部工作队列线程必须在内部流完成时阻塞,并且仅默认线程池线程数量有限。然而,死锁似乎并没有发生:
6
5
8
8 6
0
1
6 2
7
1 6
8 5
7 6
8 8
2
0 6
0 2
0 8
5 2
5 4
5 6
0 5
2 6
7 2
7 5
7 8
6 4
8 9
1 5
...
两个流共享相同的默认线程池,但它们生成不同的工作单元。每个外部工作单元只能在该外部工作单元的所有内部单元完成后才能完成,因为每个并行流的末尾都有一个完成屏障。
如何在工作线程共享池中管理这些内部流和外部流之间的协调,而不出现任何死锁?
最佳答案
并行流背后的线程池是公共(public)池,可以通过ForkJoinPool.commonPool()
获得。它通常使用 NumberOfProcessors - 1 个工作线程。为了解决您所描述的依赖关系,如果(某些)当前工作线程被阻止并且可能出现死锁,它可以动态创建额外的工作线程。
但是,这不是您案例的答案。
ForkJoinPool 中的任务有两个重要的功能:
- 他们可以创建子任务并将当前任务拆分为更小的部分( fork )。
- 他们可以等待子任务(加入)。
当线程执行这样的任务A并加入子任务B时,它不仅会阻塞等待子任务完成执行,还会执行另一个任务< em>C 同时。当C完成时,线程返回A并检查B是否完成。请注意,B 和 C 可以(并且很可能是)执行相同的任务。如果B完成,则A已成功等待/加入它(非阻塞!)。查看this如果前面的解释不清楚,请指导。
现在,当您使用并行流时,流的范围会递归地拆分为任务,直到任务变得很小,以便可以更有效地按顺序执行。这些任务被放入公共(public)池中的工作队列中(每个工作人员都有一个)。因此, IntStream.range(0, 100).parallel().forEach
所做的就是递归地分割范围,直到它不再值得为止。每个最终任务,或者更确切地说是一堆迭代,都可以使用 forEach
中提供的代码顺序执行。此时,公共(public)池中的工作人员可以执行这些任务,直到所有任务完成并且流可以返回。请注意,调用线程通过加入子任务来帮助执行!
现在,在您的情况下,每个任务本身都使用并行流。程序相同;将其拆分为更小的任务,并将这些任务放入公共(public)池中的工作队列中。从 ForkJoinPool 的角度来看,这些只是现有任务之上的附加任务。工作人员只是继续执行/加入任务,直到所有任务完成并且外部流可以返回。
这就是您在输出中看到的内容:没有确定性行为,没有固定顺序。此外,不会发生死锁,因为在给定的用例中不会出现阻塞线程。
您可以通过以下代码查看说明:
public static void main(String[] args) {
IntStream.range(0, 10).parallel().forEach(i -> {
IntStream.range(0, 10).parallel().forEach(j -> {
for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
System.out.printf("%d %d %s\n", i, j, Thread.currentThread().getName());
for (int x = 0; x < 1e6; x++) { Math.sqrt(Math.log(x)); }
});
});
}
您应该注意到主线程参与了内部迭代的执行,因此它不会(!)被阻塞。公共(public)池工作人员只是一个接一个地挑选任务,直到全部完成。
关于java - Java中的嵌套并行流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62670334/