java - 从 Java 并行流生成其他并行流并且很少失败

标签 java lambda parallel-processing java-8 java-stream

考虑以下功能:

public void execute4() {
        File filePath = new File(filePathData);
        File[] files = filePath.listFiles((File filePathData) -> filePathData.getName().endsWith("CDR"));
        List<CDR> cdrs = new ArrayList<CDR>();
        Arrays.asList(files).parallelStream().forEach(file -> readCDRP(cdrs, file));
        cdrs.sort(cdrsorter);
    }

它读取包含 CDR 的文件列表并执行 readCDRP(),如下所示:

private void readCDRP(List<CDR> cdrs, File file) {
    final CDR cdr = new CDR(file.getName());
    try (BufferedReader bfr = new BufferedReader(new FileReader(file))) {
        List<String> lines = bfr.lines().collect(Collectors.toList());
        lines.parallelStream().forEach(e -> {
            String[] data = e.split(",", -1);
            CDREntry entry = new CDREntry(file.getName());
            for (int i = 0; i < data.length; i++) {
                entry.setField(i, data[i]);
            }
            cdr.addEntry(entry);
        });

        if (cdr != null) {
            cdrs.add(cdr);
        }
    } catch (IOException e) {
        e.printStackTrace();
    }
}

我观察到的是,偶尔而不是总是,我要么在 readCDRP 函数处遇到 ArrayIndexNotBound 异常(这很尴尬,因为 cdr 列表是一个 ArrayList() ):

cdr.addEntry(entry);

或者 在execute4() 的最后一行,我应用了排序。

我认为问题在于,execute4 中的第一个并行流与 readCDRP() 内的第二个并行流执行不在内存中的单独空间中,并且似乎也错误地共享了数据。使用“似乎”这个词,因为我无法确认,这只是一个小笼子。

问题是: 从 JDK8 的角度来看,我的代码是否存在严重缺陷? 是否有使用相同流程的解决方法,例如使用 CountDownLatch? ForkJoinPool 是否有限制?

感谢您的回复......

编辑(1): addEntry 是类本身的一部分:

class CDR {
        public final String fileName;
        private final List<CDREntry> entries = new ArrayList<CDREntry>();

        public CDR(String fileName) {
            super();
            this.fileName = fileName;
        }

        public List<CDREntry> getEntries() {
            return entries;
        }

        public List<CDREntry> addEntry(CDREntry e) {
            entries.add(e);
            return entries;
        }

        public String getFileName() {
            return this.fileName;
        }
    }

最佳答案

从线程安全的角度来看,您的代码已被破坏。在 readCDR 中,您将元素添加到 cdrs 列表中,该列表是一个不支持并发写入的 ArrayList。这就是它破裂的原因。

更好的方法是让 readCDR 返回一个 cdr 对象并执行以下操作:

 List<CDR> cdrs = Arrays.stream(files)
                        .parallel()
                        .map(this::readCDR)
                        .collect(Collectors.toList());

此外,使用并行流进行 IO 相关操作通常是一个坏主意,但这是另一个讨论。

关于java - 从 Java 并行流生成其他并行流并且很少失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30122668/

相关文章:

java - 无法读取Java中pdf文件生成的文本

Java swing多个JTables一个ColumnModel

Java Mail "From"发件人姓名错误字符

c# - 'await' 运算符只能在异步 lambda 表达式中使用

java - 类型变量、方法内联和 "Bad return type in lambda expression"

java - google-oauth-java-client 因抽搐而失败

python-3.x - lambda pandas 数据框中的行减法

r - 为Ax = b并行求解Solve()吗?

xcode - 我什么时候应该检查 "Parallelize Build"以获得 Xcode 方案?

python - 当axis=0时, Pandas 并行应用