使用并行流将具有大量数据的大量文件写入特定格式。
这是代码:
public static void main(String[] args) throws Exception {
mergeController.compactFiles();
mergeController.writeMergedFlag();
}
private void compactFiles() {
Set<String> events = this.listSubDirectoryNames(inputDir);
events.parallelStream().forEach(event -> writeEvent(event, eventSchemaMap.get(event), this.configuration));
}
这些方法不会返回任何内容,因为它们只是在写入文件。我看到 writeMergedFlag()
主要是在进程运行 1.5 小时后被调用。
这里的问题是什么?是堆空间问题还是其他?
我以前从未遇到过此类问题。
我认为这是因为并行流使用了一个具有固定数量线程的 ForkJoinPool
。如果这些 writeEvent
任务很小,我建议改用缓存线程池:
public static void main(String[] args) throws Exception {
mergeController.compactFiles();
mergeController.writeMergedFlag();
}
private void compactFiles() {
Set<String> events = this.listSubDirectoryNames(inputDir);
ExecutorService service = Executors.newCachedThreadPool();
events.forEach(event -> service.execute(() -> writeEvent(event, eventSchemaMap.get(event), configuration)));
service.shutdown();
service.awaitTermination(1, TimeUnit.DAYS); // Arbitrary value
}