考虑以下功能:
public void execute4() {
File filePath = new File(filePathData);
File[] files = filePath.listFiles((File filePathData) -> filePathData.getName().endsWith("CDR"));
List<CDR> cdrs = new ArrayList<CDR>();
Arrays.asList(files).parallelStream().forEach(file -> readCDRP(cdrs, file));
cdrs.sort(cdrsorter);
}
它读取包含 CDR 的文件列表并执行 readCDRP(),如下所示:
private void readCDRP(List<CDR> cdrs, File file) {
final CDR cdr = new CDR(file.getName());
try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
List<String> lines = bfr.lines().collect(Collectors.toList());
lines.parallelStream().forEach(e -> {
String[] data = e.split(",", -1);
CDREntry entry = new CDREntry(file.getName());
for (int i = 0; i < data.length; i++) {
entry.setField(i, data[i]);
}
cdr.addEntry(entry);
});
if (cdr != null) {
cdrs.add(cdr);
}
} catch (IOException e) {
e.printStackTrace();
}
}
我观察到的是,偶尔而不是总是,我要么在 readCDRP 函数处遇到 ArrayIndexNotBound 异常(这很尴尬,因为 cdr 列表是一个 ArrayList() ):
cdr.addEntry(entry);
或者 在execute4() 的最后一行,我应用了排序。
我认为问题在于,execute4 中的第一个并行流与 readCDRP() 内的第二个并行流执行不在内存中的单独空间中,并且似乎也错误地共享了数据。使用“似乎”这个词,因为我无法确认,这只是一个小笼子。
问题是: 从 JDK8 的角度来看,我的代码是否存在严重缺陷? 是否有使用相同流程的解决方法,例如使用 CountDownLatch? ForkJoinPool 是否有限制?
感谢您的回复......
编辑(1): addEntry 是类本身的一部分:
class CDR {
public final String fileName;
private final List<CDREntry> entries = new ArrayList<CDREntry>();
public CDR(String fileName) {
super();
this.fileName = fileName;
}
public List<CDREntry> getEntries() {
return entries;
}
public List<CDREntry> addEntry(CDREntry e) {
entries.add(e);
return entries;
}
public String getFileName() {
return this.fileName;
}
}
最佳答案
从线程安全的角度来看,您的代码已被破坏。在 readCDR 中,您将元素添加到 cdrs 列表中,该列表是一个不支持并发写入的 ArrayList。这就是它破裂的原因。
更好的方法是让 readCDR
返回一个 cdr
对象并执行以下操作:
List<CDR> cdrs = Arrays.stream(files)
.parallel()
.map(this::readCDR)
.collect(Collectors.toList());
此外,使用并行流进行 IO 相关操作通常是一个坏主意,但这是另一个讨论。
关于java - 从 Java 并行流生成其他并行流并且很少失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30122668/