java - 如何使用 java.util.stream 处理文件 block

标签 java parallel-processing java-8 java-stream

为了熟悉流 API,我尝试编写一个非常简单的模式。

问题:有一个包含非嵌套文本 block 的文本文件。所有 block 均由开始/结束模式标识(例如 <start><stop> 。 block 的内容在语法上无法与 block 之间的噪声区分开。因此不可能使用简单(无状态)的 lambda。

我只能实现一些丑陋的东西,例如:
Files.lines(path).collect(new MySequentialParseAndProsessEachLineCollector<>());
说实话,这不是我想要的。

我正在寻找一个映射器,例如:
Files.lines(path).map(MyMapAllLinesOfBlockToBuckets()).parallelStream().collect(new MyProcessOneBucketCollector<>());

is there a good way to extract chunks of data from a java 8 stream似乎包含解决方案的框架。不幸的是,我很愚蠢地将其转化为我的问题。 ;-)

有什么提示吗?

最佳答案

这是一个可用于转换 Stream<String> 的解决方案,每个元素代表一条线,到 Stream<List<String>> ,每个元素代表使用指定分隔符找到的 block :

public class ChunkSpliterator implements Spliterator<List<String>> {
    private final Spliterator<String> source;
    private final Predicate<String> start, end;
    private final Consumer<String> getChunk;
    private List<String> current;

    ChunkSpliterator(Spliterator<String> lineSpliterator,
        Predicate<String> chunkStart, Predicate<String> chunkEnd) {
        source=lineSpliterator;
        start=chunkStart;
        end=chunkEnd;
        getChunk=s -> {
            if(current!=null) current.add(s);
            else if(start.test(s)) current=new ArrayList<>();
        };
    }
    public boolean tryAdvance(Consumer<? super List<String>> action) {
        while(current==null || current.isEmpty()
                            || !end.test(current.get(current.size()-1)))
            if(!source.tryAdvance(getChunk)) return false;
        current.remove(current.size()-1);
        action.accept(current);
        current=null;
        return true;
    }
    public Spliterator<List<String>> trySplit() {
        return null;
    }
    public long estimateSize() {
        return Long.MAX_VALUE;
    }
    public int characteristics() {
        return ORDERED|NONNULL;
    }

    public static Stream<List<String>> toChunks(Stream<String> lines,
        Predicate<String> chunkStart, Predicate<String> chunkEnd,
        boolean parallel) {

        return StreamSupport.stream(
            new ChunkSpliterator(lines.spliterator(), chunkStart, chunkEnd),
            parallel);
    }
}

与谓词匹配的行不包含在 block 中;如果需要的话,改变这种行为很容易。

可以这样使用:

ChunkSpliterator.toChunks( Files.lines(Paths.get(myFile)),
    Pattern.compile("^<start>$").asPredicate(),
    Pattern.compile("^<stop>$").asPredicate(),
    true )
   .collect(new MyProcessOneBucketCollector<>())

模式指定为 ^word$要求整行仅由单词组成;如果没有这些 anchor ,包含模式的行可以开始和结束一个 block 。源流的性质不允许创建 block 时的并行性,因此当与立即收集操作链接时,整个操作的并行性相当有限。这取决于MyProcessOneBucketCollector如果可以有任何并行性的话。

如果您的最终结果不依赖于源文件中存储桶出现的顺序,强烈建议您的收集器将自身报告为 UNORDERED 或者您插入 unordered() 在流的方法链中 collect 之前.

关于java - 如何使用 java.util.stream 处理文件 block ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26463771/

相关文章:

java - 如何从该 Java 代码中获取有问题文件的目录

c++ - 是否可以判断哪个数组元素比较结果为真?

python - 并行运行 Python 脚本并等待所有脚本完成,然后再执行更多并行脚本

java - 是否可以使用不同的实现来反序列化 SerializedLambda?

java - 如何在 VTD-XML 中创建/传递大量 AutoPilot 对象实例?

java - 处理 PrintWriter 无法正常工作

java - 从 Java 运行 SQL Loader - 并非所有行都被加载

matlab - 在 MATLAB 中使用 parfor 时避免竞争条件

java - 经典操作枚举示例中的 Lambda

android - android opennlp构建错误:无法访问找不到java.nio.file.Path的路径类文件