Java 并行流 - 调用 parallel() 方法的顺序

标签 java java-stream

关闭。这个问题需要更多 focused .它目前不接受答案。












想改进这个问题?更新问题,使其仅关注一个问题 editing this post .

2年前关闭。




Improve this question



AtomicInteger recordNumber = new AtomicInteger();
Files.lines(inputFile.toPath(), StandardCharsets.UTF_8)
     .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
     .parallel()           
     .filter(record -> doSomeOperation())
     .findFirst()

当我写这篇文章时,我假设线程将仅在 map 调用中产生,因为并行放置在 map 之后。但是文件中的某些行在每次执行时都获得了不同的记录号。

我看官方Java stream documentation和一些网站来了解流是如何在幕后工作的。

几个问题:
  • Java 并行流工作基于 SplitIterator ,它由每个集合实现,如 ArrayList、LinkedList 等。当我们从这些集合中构造并行流时,将使用相应的拆分迭代器来拆分和迭代集合。这解释了为什么并行性发生在原始输入源(文件行)级别而不是映射的结果(即记录 pojo)。我的理解正确吗?
  • 在我的例子中,输入是一个文件 IO 流。将使用哪个拆分迭代器?
  • 我们放在哪里都没关系parallel()在管线中。原始输入源将始终被拆分,并且将应用剩余的中间操作。

    在这种情况下,Java 不应该允许用户在管道中的任何地方放置并行操作,除了原始源。因为,对于那些不知道java流内部如何工作的人来说,这是错误的理解。我知道parallel()操作本来是为 Stream 对象类型定义的,因此它以这种方式工作。但是,最好提供一些替代解决方案。
  • 在上面的代码片段中,我试图为输入文件中的每条记录添加一个行号,因此应该对其进行排序。但是,我想申请doSomeOperation()并行,因为它是重量级逻辑。实现的一种方法是编写我自己的自定义拆分迭代器。还有其他方法吗?
  • 最佳答案

    This explains why parallelism happened at original input source (File lines) level rather at the result of map (i.e Record pojo).



    整个流是并行的或顺序的。我们不会选择一个操作子集来顺序或并行运行。

    When the terminal operation is initiated, the stream pipeline is executed sequentially or in parallel depending on the orientation of the stream on which it is invoked. [...] When the terminal operation is initiated, the stream pipeline is executed sequentially or in parallel depending on the mode of the stream on which it is invoked. same source



    正如您所提到的,并行流使用拆分迭代器。显然,这是在操作开始运行之前对数据进行分区。

    In my case, the input is a file IO stream. Which split iterator will be used?



    查看源代码,我看到它使用 java.nio.file.FileChannelLinesSpliterator

    It doesn't matter where we place parallel() in the pipeline. The original input source will always be split and remaining intermediate operations will be applied.



    对。您甚至可以调用parallel()sequential()多次。最后调用的将获胜。当我们调用 parallel() ,我们为返回的流设置​​它;如上所述,所有操作都按顺序或并行运行。

    In this case, Java shouldn't allow users to place parallel operation anywhere in the pipeline except at the original source...



    这成为一个意见问题。我认为 Zabuza 提供了一个很好的理由来支持 JDK 设计者的选择。

    The one way to achieve is to write my own customized split iterator. Is there any other way?



    这取决于您的操作
  • 如果 findFirst()是你真正的终端操作,那么你甚至不用担心并行执行,因为不会有很多调用 doSomething()无论如何(findFirst() 是短路的)。 .parallel()实际上可能会导致处理多个元素,而 findFirst()在顺序流上会阻止这种情况。
  • 如果您的终端操作没有创建太多数据,那么也许您可以创建 Record使用顺序流的对象,然后并行处理结果:
    List<Record> smallData = Files.lines(inputFile.toPath(), 
                                         StandardCharsets.UTF_8)
      .map(record -> new Record(recordNumber.incrementAndGet(), record)) 
      .collect(Collectors.toList())
      .parallelStream()     
      .filter(record -> doSomeOperation())
      .collect(Collectors.toList());
    
  • 如果您的管道会在内存中加载大量数据(这可能是您使用 Files.lines() 的原因),那么您可能需要一个自定义拆分迭代器。不过,在我去那里之前,我会研究其他选项(例如以 id 列开头的保存行 - 这只是我的意见)。
    我还尝试以较小的批处理处理记录,如下所示:
    AtomicInteger recordNumber = new AtomicInteger();
    final int batchSize = 10;
    
    try(BufferedReader reader = Files.newBufferedReader(inputFile.toPath(), 
            StandardCharsets.UTF_8);) {
        Supplier<List<Record>> batchSupplier = () -> {
            List<Record> batch = new ArrayList<>();
            for (int i = 0; i < batchSize; i++) {
                String nextLine;
                try {
                    nextLine = reader.readLine();
                } catch (IOException e) {
                    //hanlde exception
                    throw new RuntimeException(e);
                }
    
                if(null == nextLine) 
                    return batch;
                batch.add(new Record(recordNumber.getAndIncrement(), nextLine));
            }
            System.out.println("next batch");
    
            return batch;
        };
    
        Stream.generate(batchSupplier)
            .takeWhile(list -> list.size() >= batchSize)
            .map(list -> list.parallelStream()
                             .filter(record -> doSomeOperation())
                             .collect(Collectors.toList()))
            .flatMap(List::stream)
            .forEach(System.out::println);
    }
    

    这执行 doSomeOperation()并行而不将所有数据加载到内存中。但请注意 batchSize需要考虑一下。
  • 关于Java 并行流 - 调用 parallel() 方法的顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61167863/

    相关文章:

    Java的InputStream会破坏信息,为什么?

    java - 如何在netbeans中逐行查看执行路径?

    java - 如何让File类使用相对路径?

    java - 使用 Stream 展平嵌套 Hashmap

    java - 无法在服务中未在 locationManager 上调用 Looper.prepare() 的线程内创建处理程序

    java - 如何在 Android 中最好地存储和修改分类数据

    java-8 - java 8流如何找到2个列表元素之间的最小差异

    java - 写入外部变量时,将内部循环与流并行化

    java - 将具有条件逻辑的多个 for 循环转换为 Java 8 流

    java - 将字符串列表拆分为 map 列表