java - 并行读取文件时内存不足

标签 java csv parallel-processing stream nio

我正在进行测试以找出读取和处理 csv 文件的最佳方法。 所以我需要读取 csv 文件的每一行并分析每一行。 所以对于一个包含数千行的文件来说,基本上一切正常。但是,当尝试使用包含超过 100 万行的 CSV 文件时,出现内存不足异常。我认为 Stream Parallel 会执行得更快。所以我有点困惑为什么我会遇到这个内存不足的错误。 Java是如何处理并行读取的?

下面是顺序并行读取文件的测试代码

String filename = "c:\\devs\\files\\datas.csv"; // 193MB
Path path = Paths.get(filename);

@Test
public void testFileExist() {
    assertTrue(Files.exists(path));
}

@Test
public void testSingleThreadRead() {
    Function<Path, String> processfile = (Path p) -> {
        String result = "";
        try {
            result = Files.lines(p).collect(Collectors.joining(" ,"));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return result;
    };

    long start = System.currentTimeMillis();
    String result = processfile.apply(path);
    long end = System.currentTimeMillis();
    assertFalse(result.isEmpty());
    System.out.println(end -start + "ms");
}

@Test
public void testSingleThreadReadParallel() {
    Function<Path, String> processfile = (Path p) -> {
        String result = "";
        try {
            result = Files.lines(p).parallel().collect(Collectors.joining(" ,"));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        return result;
    };

    long start = System.currentTimeMillis();
    String result = processfile.apply(path);
    long end = System.currentTimeMillis();
    assertFalse(result.isEmpty());
    System.out.println(end -start + "ms");
}

异常

java.lang.OutOfMemoryError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
at java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
at java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.util.stream.ReferencePipeline.collect(Unknown Source)
at test.TestProcessFile.lambda$1(TestProcessFile.java:48)
at test.TestProcessFile.testSingleThreadReadParallel(TestProcessFile.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at 

更新

在单独的类中运行并行处理,仍然得到这个异常

Exception in thread "main" java.lang.OutOfMemoryError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
at java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
at java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.util.stream.ReferencePipeline.collect(Unknown Source)
at ProcessFileParallel.lambda$0(ProcessFileParallel.java:19)
at ProcessFileParallel.main(ProcessFileParallel.java:29)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at java.util.StringJoiner.merge(Unknown Source)
at java.util.stream.Collectors$$Lambda$5/990368553.apply(Unknown Source)
at java.util.stream.ReduceOps$3ReducingSink.combine(Unknown Source)
at java.util.stream.ReduceOps$3ReducingSink.combine(Unknown Source)
at java.util.stream.ReduceOps$ReduceTask.onCompletion(Unknown Source)
at java.util.concurrent.CountedCompleter.tryComplete(Unknown Source)
at java.util.stream.AbstractTask.compute(Unknown Source)
at java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source)
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)

最佳答案

您的代码在 testSingleThreadReadParallel 处失败,而不是并行。问题出在其他地方 - 可能将整个文件收集为字符串。

Files.lines 被缓冲(查看实现),因此读取文件很可能不会导致任何问题。

但将该文件收集到单个 String 显然需要大量内存,远远超过文件本身的大小。

实际上,按照我的理解,并行读取这些文件需要更多的内存,而不是顺序读取。每个线程都将并行地读取它的卡盘在内存中,因此您的并行方法将需要更多内存。我所说的更多是指来自 Stream.lines 的 CPU * BufferSize 数量。

EDIT2

花了一些时间后,我意识到您的问题必须出在其他地方。比如你的文件中有实际上行吗?或者您可能已达到极限 - 我的意思是并行确实会增加内存,但不会那么太多。可能您需要稍微增加 -Xms-Xmx

例如,我出于测试目的创建了一个包含 247MB 虚拟数据的文件,并在其上运行了以下代码:

 Path p = Paths.get("/private/tmp/myfile.txt");
 Stream<String> s = Files.lines(p).parallel(); // and without parallel 
 s.forEach(System.out::println);

我使用的设置是 -Xmx200m -Xms200m 用于并行顺序 处理。这小于实际文件大小。它仍然工作得很好。

您的主要 问题是您正在将所有内容 收集到一个字符串中,从而使其变得非常庞大。在我的机器上使用 jdk-8 将所有内容收集到 String 需要至少 1.5GB 的堆。

也是一本很好的读物 here

关于java - 并行读取文件时内存不足,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43975146/

相关文章:

java - jooq 转换器 : from java. sql.Date 到 java.time.LocalDate

java - ORMLite 可以与服务器一起使用吗?

excel - 当我保存 CSV 并再次编辑时,列合并了

python - 如何将 Unicode 数据作为西里尔字母符号写入文件?

r - R : %dopar% vs %do%. 中的并行化为什么使用单核可以获得更好的性能?

java - 使用 Spring 将 DataSource 与 Transaction 结合使用

java - Tomcat 服务器声明我的 PostgreSQL 数据库不存在,但它确实存在

windows - 如何批量读取 CSV 文件特定列中的值?

使用管理器、池和共享列表的 Python 多处理并发不起作用

c++ - 并发处理数据。我需要注意什么?