我正在进行测试以找出读取和处理 csv 文件的最佳方法。 所以我需要读取 csv 文件的每一行并分析每一行。 所以对于一个包含数千行的文件来说,基本上一切正常。但是,当尝试使用包含超过 100 万行的 CSV 文件时,出现内存不足异常。我认为 Stream Parallel 会执行得更快。所以我有点困惑为什么我会遇到这个内存不足的错误。 Java是如何处理并行读取的?
下面是顺序并行读取文件的测试代码
String filename = "c:\\devs\\files\\datas.csv"; // 193MB
Path path = Paths.get(filename);
@Test
public void testFileExist() {
assertTrue(Files.exists(path));
}
@Test
public void testSingleThreadRead() {
Function<Path, String> processfile = (Path p) -> {
String result = "";
try {
result = Files.lines(p).collect(Collectors.joining(" ,"));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return result;
};
long start = System.currentTimeMillis();
String result = processfile.apply(path);
long end = System.currentTimeMillis();
assertFalse(result.isEmpty());
System.out.println(end -start + "ms");
}
@Test
public void testSingleThreadReadParallel() {
Function<Path, String> processfile = (Path p) -> {
String result = "";
try {
result = Files.lines(p).parallel().collect(Collectors.joining(" ,"));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return result;
};
long start = System.currentTimeMillis();
String result = processfile.apply(path);
long end = System.currentTimeMillis();
assertFalse(result.isEmpty());
System.out.println(end -start + "ms");
}
异常
java.lang.OutOfMemoryError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
at java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
at java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.util.stream.ReferencePipeline.collect(Unknown Source)
at test.TestProcessFile.lambda$1(TestProcessFile.java:48)
at test.TestProcessFile.testSingleThreadReadParallel(TestProcessFile.java:58)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.lang.reflect.Method.invoke(Unknown Source)
at
更新
在单独的类中运行并行处理,仍然得到这个异常
Exception in thread "main" java.lang.OutOfMemoryError
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown Source)
at java.lang.reflect.Constructor.newInstance(Unknown Source)
at java.util.concurrent.ForkJoinTask.getThrowableException(Unknown Source)
at java.util.concurrent.ForkJoinTask.reportException(Unknown Source)
at java.util.concurrent.ForkJoinTask.invoke(Unknown Source)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(Unknown Source)
at java.util.stream.AbstractPipeline.evaluate(Unknown Source)
at java.util.stream.ReferencePipeline.collect(Unknown Source)
at ProcessFileParallel.lambda$0(ProcessFileParallel.java:19)
at ProcessFileParallel.main(ProcessFileParallel.java:29)
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Unknown Source)
at java.lang.AbstractStringBuilder.ensureCapacityInternal(Unknown Source)
at java.lang.AbstractStringBuilder.append(Unknown Source)
at java.lang.StringBuilder.append(Unknown Source)
at java.util.StringJoiner.merge(Unknown Source)
at java.util.stream.Collectors$$Lambda$5/990368553.apply(Unknown Source)
at java.util.stream.ReduceOps$3ReducingSink.combine(Unknown Source)
at java.util.stream.ReduceOps$3ReducingSink.combine(Unknown Source)
at java.util.stream.ReduceOps$ReduceTask.onCompletion(Unknown Source)
at java.util.concurrent.CountedCompleter.tryComplete(Unknown Source)
at java.util.stream.AbstractTask.compute(Unknown Source)
at java.util.concurrent.CountedCompleter.exec(Unknown Source)
at java.util.concurrent.ForkJoinTask.doExec(Unknown Source)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(Unknown Source)
at java.util.concurrent.ForkJoinPool.runWorker(Unknown Source)
at java.util.concurrent.ForkJoinWorkerThread.run(Unknown Source)
最佳答案
您的代码在 testSingleThreadReadParallel
处失败,而不是并行。问题出在其他地方 - 可能将整个文件收集为字符串。
Files.lines
被缓冲(查看实现),因此读取文件很可能不会导致任何问题。
但将该文件收集到单个 String
显然需要大量内存,远远超过文件本身的大小。
实际上,按照我的理解,并行读取这些文件需要更多的内存,而不是顺序读取。每个线程都将并行地读取它的卡盘在内存中,因此您的并行方法将需要更多内存。我所说的更多是指来自 Stream.lines
的 CPU * BufferSize 数量。
EDIT2
花了一些时间后,我意识到您的问题必须出在其他地方。比如你的文件中有实际上行吗?或者您可能已达到极限 - 我的意思是并行确实会增加内存,但不会那么太多。可能您需要稍微增加 -Xms
和 -Xmx
。
例如,我出于测试目的创建了一个包含 247MB
虚拟数据的文件,并在其上运行了以下代码:
Path p = Paths.get("/private/tmp/myfile.txt");
Stream<String> s = Files.lines(p).parallel(); // and without parallel
s.forEach(System.out::println);
我使用的设置是 -Xmx200m -Xms200m
用于并行
和顺序
处理。这小于实际文件大小。它仍然工作得很好。
您的主要 问题是您正在将所有内容 收集到一个字符串中,从而使其变得非常庞大。在我的机器上使用 jdk-8 将所有内容收集到 String 需要至少 1.5GB 的堆。
也是一本很好的读物 here
关于java - 并行读取文件时内存不足,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43975146/