java - 有没有办法部分控制Java并行流的顺序?

标签 java java-stream

我知道尝试让并行流以特定顺序执行每个元素是没有意义的。由于它并行运行数据,因此顺序显然会存在一些不确定性。然而,我想知道是否有可能让它按“某种程度”的顺序执行,或者至少尝试保持顺序与按顺序执行的顺序有些相似。

用例

我需要对几个数组中的每个值组合执行一些代码。我创建了所有可能的索引组合的流,如下所示(变量名称已被混淆,以免泄露专有信息,我保证我通常不会命名我的变量 arr1, arr2 等):

public static void doMyComputation(double[] arr1, double[] arr2, double[] arr3) {
  DoubleStream.of(arr1).mapToObj(Double::valueOf)
    .flatMap(
      i1->DoubleStream.of(arr2).mapToObj(Double::valueOf)
        .flatMap(
          i2->DoubleStream.of(arr3).mapToObj(Double::valueOf)
            .flatMap(
              i3->new Inputs(i1,i2,i3)
             )
        )
    )
    .parallel()
    .forEach(input -> doComputationallyIntensiveThing(input.i1, input.i2, input.i3);

这工作得很好(或者至少真实版本是这样,我简化了我在此处发布的代码片段的一些内容,所以我可能弄乱了代码片段)。我预计,由于并行性,我不会看到按 arr1[0]、arr2[0]、arr3[0] 顺序排列的值,后跟 arr1[0] 、 arr2[0]、arr3[1] 等。但是,我希望我至少能首先看到带有来自 arr1 的前几个值的输入,然后慢慢工作我的方式到了arr1的结尾。我很惊讶地发现它甚至没有接近这一点。

问题在于,在该 doComputationallyIntentialThing 方法中,只有当我们同时看到 arr1 中的许多相同值时,某些缓存才会表现良好。如果值完全随机输入,则缓存弊大于利。

是否有任何方法可以提示流按照将输入按 arr1 中的值分组在一起的顺序执行输入?

如果没有,那么我可能可以为 arr1 中的每个值创建一个新流,并且效果很好,但我想看看是否有办法完成这一切在一个流中。

最佳答案

通常,您不应该假设并行流的特定处理顺序,但假设您的算法是正确的,无论实际处理顺序如何,您都可以推断顺序和性能之间的关系。

Stream 实现已经被设计为允许从本地处理器处理连续元素中受益。因此,当您有一个包含数百个元素的流时,例如 IntStream.range(0, 100)为了简化,并使用四个空闲的 CPU 核心来处理它,实现会将其分为四个范围 0-25、25-50、50-75 和 75-100,在最好的情况下独立处理。因此,每个处理器将在本地处理连续的元素并从低级效果中受益,例如一次将多个数组元素提取到本地缓存中等。

所以你的 doComputationallyIntensiveThing 有问题方法似乎是,缓存(和您的监控)在本地不起作用。因此,继续上面的示例,操作将从并行执行 0 开始。 , 25 , 50 ,和75同时,如果所有这些都在相似的耗时后完成,则随后将并行评估 1 , 26 , 51 ,和76 。如果第一个评估的四个元素中的任何一个“获胜”并确定了缓存数据,则它将仅适用于接下来的四个值中的一个。如果线程的时间发生变化,该比率会变得更糟。

一种解决方案是更改 doComputationallyIntensiveThing使用线程本地缓存,从每个线程中连续元素的处理中受益。然后,您定义 Stream 操作的方式非常适合此操作,该操作受益于重复查看 arr1 的相同元素。 。不过,您可以简化代码并消除大量装箱开销:

Arrays.stream(arr1).parallel().forEach(i1 ->
    Arrays.stream(arr2).forEach(i2 ->
        Arrays.stream(arr3).forEach(i3 ->
            doComputationallyIntensiveThing(i1, i2, i3))));

但是,这会带来随后清理线程本地缓存的挑战,因为并行流使用您无法控制的线程池。

目前该方法有效的一种更简单的解决方法是更改​​嵌套:

Arrays.stream(arr2).parallel().forEach(i2 ->
    Arrays.stream(arr1).forEach(i1 ->
        Arrays.stream(arr3).forEach(i3 ->
            doComputationallyIntensiveThing(i1, i2, i3))));

现在,arr2按照上述方式进行拆分。然后,每个工作线程都会在 arr1 上执行相同的迭代。 ,处理其中每个元素的次数与 arr3 中的元素相同。 。这允许利用线程间缓存行为,但存在线程由于时间差异而不同步的风险,最终会出现与之前相同的情况。

更好的选择是重新设计 doComputationallyIntensiveThing ,创建两种不同的方法,一种为 arr1 的特定元素准备操作。返回一个包含元素缓存数据的对象,以及另一个用于利用缓存数据进行实际处理的对象:

Arrays.stream(arr1).parallel()
    .mapToObj(i1 -> prepareOperation(i1))
    .forEach(cached ->
        Arrays.stream(arr2).forEach(i2 ->
            Arrays.stream(arr3).forEach(i3 ->
                doComputationallyIntensiveThing(cached, i2, i3))));

这里,prepareOperation 返回的每个实例与 arr1 的特定元素相关联并充当与其关联的任何数据的本地缓存,但在特定元素的处理结束时正常收集垃圾。所以不需要清理。

原则上,如果 prepareOperation 也可以工作。只返回一个空的持有者对象,由第一次调用 doComputationallyIntensiveThing 填充对于特定元素。

关于java - 有没有办法部分控制Java并行流的顺序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55906518/

相关文章:

java - 在java中按多个字段过滤的最佳方法是什么

java - 如何在流中使用两个过滤器进行不同的转换

java - 什么是NullPointerException,我该如何解决?

java - 如何流式传输和映射一对多关系?

java - 从多个供应商生成一个流

Java 8 - 省略了繁琐的收集方法

java - Jquery函数将java数据转换为json

Java,无法访问同一类中的方法

java - 表数据覆盖

java - 如何使用 java 将十六进制转换为十进制 rgb565?