java - 你能重新平衡一个未知大小的不平衡 Spliterator 吗?

标签 java java-8 java-stream spliterator

我想使用 Stream 并行处理一组未知数量的异构远程存储 JSON 文件(文件数量预先未知)。这些文件的大小差异很大,从每个文件 1 个 JSON 记录到某些其他文件中的 100,000 条记录。在这种情况下,JSON 记录表示一个独立的 JSON 对象,表示为文件中的一行。

我真的很想为此使用 Streams,因此我实现了这个 Spliterator:

public abstract class JsonStreamSpliterator<METADATA, RECORD> extends AbstractSpliterator<RECORD> {

    abstract protected JsonStreamSupport<METADATA> openInputStream(String path);

    abstract protected RECORD parse(METADATA metadata, Map<String, Object> json);

    private static final int ADDITIONAL_CHARACTERISTICS = Spliterator.IMMUTABLE | Spliterator.DISTINCT | Spliterator.NONNULL;
    private static final int MAX_BUFFER = 100;
    private final Iterator<String> paths;
    private JsonStreamSupport<METADATA> reader = null;

    public JsonStreamSpliterator(Iterator<String> paths) {
        this(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths);
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths) {
        super(est, additionalCharacteristics);
        this.paths = paths;
    }

    private JsonStreamSpliterator(long est, int additionalCharacteristics, Iterator<String> paths, String nextPath) {
        this(est, additionalCharacteristics, paths);
        open(nextPath);
    }

    @Override
    public boolean tryAdvance(Consumer<? super RECORD> action) {
        if(reader == null) {
            String path = takeNextPath();
            if(path != null) {
                open(path);
            }
            else {
                return false;
            }
        }
        Map<String, Object> json = reader.readJsonLine();
        if(json != null) {
            RECORD item = parse(reader.getMetadata(), json);
            action.accept(item);
            return true;
        }
        else {
            reader.close();
            reader = null;
            return tryAdvance(action);
        }
    }

    private void open(String path) {
        reader = openInputStream(path);
    }

    private String takeNextPath() {
        synchronized(paths) {
            if(paths.hasNext()) {
                return paths.next();
            }
        }
        return null;
    }

    @Override
    public Spliterator<RECORD> trySplit() {
        String nextPath = takeNextPath();
        if(nextPath != null) {
            return new JsonStreamSpliterator<METADATA,RECORD>(Long.MAX_VALUE, ADDITIONAL_CHARACTERISTICS, paths, nextPath) {
                @Override
                protected JsonStreamSupport<METADATA> openInputStream(String path) {
                    return JsonStreamSpliterator.this.openInputStream(path);
                }
                @Override
                protected RECORD parse(METADATA metaData, Map<String,Object> json) {
                    return JsonStreamSpliterator.this.parse(metaData, json);
                }
            };              
        }
        else {
            List<RECORD> records = new ArrayList<RECORD>();
            while(tryAdvance(records::add) && records.size() < MAX_BUFFER) {
                // loop
            }
            if(records.size() != 0) {
                return records.spliterator();
            }
            else {
                return null;
            }
        }
    }
}

我遇到的问题是,虽然 Stream 一开始并行得很好,但最终最大的文件还是在单个线程中处理。我相信最直接的原因是有据可查的: split 器“不平衡”。

更具体地说,在 Stream.forEach 生命周期的某个点之后,似乎不会调用 trySplit 方法,因此在trySplit 的末尾很少被执行。

请注意从 trySplit 返回的所有 spliterator 如何共享相同的 paths 迭代器。我认为这是平衡所有 spliterator 之间工作的一种非常聪明的方法,但它还不足以实现完全并行性。

我希望首先跨文件进行并行处理,然后当少数大文件仍然处于 split 状态时,我想跨剩余文件的 block 进行并行处理。这就是 trySplit 末尾的 else block 的意图。

有没有一种简单/简单/规范的方法可以解决这个问题?

最佳答案

您的 trySplit 应该输出相同大小的分割,无论底层文件的大小如何。您应该将所有文件视为一个单元,并每次使用相同数量的 JSON 对象填充 ArrayList 支持的分割器。对象的数量应确保处理一个拆分需要 1 到 10 毫秒:低于 1 毫秒,您开始接近将批处理移交给工作线程的成本,高于此值,您开始面临 CPU 负载不均匀的风险,因为任务粒度太粗。

分割器没有义务报告大小估计,并且您已经正确执行了此操作:您的估计是Long.MAX_VALUE,这是一个特殊值,表示“无界”。但是,如果您有许多包含单个 JSON 对象的文件,导致批量大小为 1,这将以两种方式损害您的性能:打开-读取-关闭文件的开销可能会成为瓶颈,并且如果您设法逃脱也就是说,与处理一项的成本相比,线程切换的成本可能会很大,从而再次导致瓶颈。

五年前我正在解决类似的问题,你可以看看my solution .

关于java - 你能重新平衡一个未知大小的不平衡 Spliterator 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58601518/

相关文章:

java - 如何将 POST 变量传递到运行 lambda 函数的 AWS API 网关?

error-handling - 在java 8流foreach中抛出异常

java - 常见的If代码else代码java编程实践

java - 为什么 Collection.parallelStream() 存在而 .stream().parallel() 做同样的事情?

java - 如何使用 Java Stream 从列表中获取任何数据的特定索引?

java - 如何使用流编译具有给定属性最大值的所有对象的列表?

java - Spring Boot安全性在https之后阻止静态资源

Java Swing Mac OSX 首选项菜单

java - JodaTime 在不同环境中的奇怪行为

java - 在 Java 8 中使用Optional进行多次空值检查