java - 实现未知大小的非并行 Spliterator?

标签 java lambda parallel-processing java-8 spliterator

我对我的所有研究感到有点困惑。我有一个名为 TabularResultSet 的自定义界面(为了举例,我已经淡化了它)它遍历任何本质上是表格的数据集。它有一个类似于迭代器的 next() 方法,它可以循环遍历 QueryResultSet、剪贴板中的选项卡式表格、CSV 等...

但是,我正在尝试创建一个环绕我的 TabularResultSet 并轻松将其转换为流的 Spliterator。我无法想象一种安全的并行化方法,因为 TabularResultSet 可能正在遍历 QueryResultSet,同时调用 next() 可能会造成严重破坏。我认为可以安全地完成并行化的唯一方法是让单个工作线程调用 next() 并将数据传递给并行线程以对其进行处理。

所以我认为并行化不是一个容易的选择。我怎样才能让这个东西在没有并行化的情况下流式传输?这是我到目前为止的工作...

public final class SpliteratorTest {

    public static void main(String[] args) {
       TabularResultSet rs = null; /* instantiate an implementation; */

       Stream<TabularResultSet> rsStream = StreamSupport.stream(new TabularSpliterator(rs), false);
    }

    public static interface TabularResultSet {
        public boolean next();

        public List<Object> getData();
    }

    private static final class TabularSpliterator implements Spliterator<TabularResultSet> {

        private final TabularResultSet rs;

        public TabularSpliterator(TabularResultSet rs) {
            this.rs = rs;
        }
        @Override
        public boolean tryAdvance(Consumer<? super TabularResultSet> action) {
            action.accept(rs);
            return rs.next();
        }

        @Override
        public Spliterator<TabularResultSet> trySplit() {
            return null;
        }

        @Override
        public long estimateSize() {
            return Long.MAX_VALUE;
        }

        @Override
        public int characteristics() {
            return 0;
        }
    }
}

最佳答案

扩展 Spliterators.AbstractSpliterator 可能是最简单的.如果你这样做,你只需要实现 tryAdvance .这可以变成并行流;并行性来自调用 tryAdvance 的流实现多次,对接收到的数据进行批处理,并在不同的线程中进行处理。

如果TabularResultSet是什么像 JDBC ResultSet , 我不认为你想要 Spliterator<TabularResultSet>Stream<TabularResultSet> .相反,它看起来像 TabularResultSet代表整个表格数据集,因此您可能希望每个拆分器或流元素代表该表中的一行 -- List<Object> getData() 返回的?如果是这样,您会想要类似以下内容。

class TabularSpliterator extends Spliterators.AbstractSpliterator<List<Object>> {
    private final TabularResultSet rs;

    public TabularSpliterator(TabularResultSet rs) {
        super(...);
        this.rs = rs;
    }

    @Override public boolean tryAdvance(Consumer<? super List<Object>> action) {
        if (rs.next()) {
            action.accept(rs.getData());
            return true;
        } else {
            return false;
        }
    }
}

然后您可以通过调用 StreamSupport.stream() 将此拆分器的实例转换为流.

注意:通常,Spliterator 实例不会从多个线程调用,甚至不需要是线程安全的。查看Spliterator class documentation在“尽管...”开头的段落中了解详细信息。

关于java - 实现未知大小的非并行 Spliterator?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28933797/

相关文章:

c# - .NET 操作在多核机器上的非线性扩展

python - 进程陷入 PyInstaller 可执行文件循环

java - 我可以只在引用类型上使用泛型吗?

java - 方法 getTagValue() 在 JAVA 中未定义

java - 我们可以使用 java 从 MySql 数据库中获取添加的图像吗?

c# - 我在哪里标记 lambda 表达式异步?

ruby-on-rails - 具有多个参数的 Rails 作用域

c# - 如何在 .ForEach() 方法中使用 continue 语句

java - 如何从 Java 控制台应用程序中的扫描器读取字符串?

c# - 并行线程中的多个 dbcontext,EntityException "Rerun your statement when there are fewer active users"