为了熟悉流 API,我尝试编写一个非常简单的模式。
问题:有一个包含非嵌套文本 block 的文本文件。所有 block 均由开始/结束模式标识(例如 <start>
和 <stop>
。 block 的内容在语法上无法与 block 之间的噪声区分开。因此不可能使用简单(无状态)的 lambda。
我只能实现一些丑陋的东西,例如:
Files.lines(path).collect(new MySequentialParseAndProsessEachLineCollector<>());
说实话,这不是我想要的。
我正在寻找一个映射器,例如:
Files.lines(path).map(MyMapAllLinesOfBlockToBuckets()).parallelStream().collect(new MyProcessOneBucketCollector<>());
is there a good way to extract chunks of data from a java 8 stream似乎包含解决方案的框架。不幸的是,我很愚蠢地将其转化为我的问题。 ;-)
有什么提示吗?
最佳答案
这是一个可用于转换 Stream<String>
的解决方案,每个元素代表一条线,到 Stream<List<String>>
,每个元素代表使用指定分隔符找到的 block :
public class ChunkSpliterator implements Spliterator<List<String>> {
private final Spliterator<String> source;
private final Predicate<String> start, end;
private final Consumer<String> getChunk;
private List<String> current;
ChunkSpliterator(Spliterator<String> lineSpliterator,
Predicate<String> chunkStart, Predicate<String> chunkEnd) {
source=lineSpliterator;
start=chunkStart;
end=chunkEnd;
getChunk=s -> {
if(current!=null) current.add(s);
else if(start.test(s)) current=new ArrayList<>();
};
}
public boolean tryAdvance(Consumer<? super List<String>> action) {
while(current==null || current.isEmpty()
|| !end.test(current.get(current.size()-1)))
if(!source.tryAdvance(getChunk)) return false;
current.remove(current.size()-1);
action.accept(current);
current=null;
return true;
}
public Spliterator<List<String>> trySplit() {
return null;
}
public long estimateSize() {
return Long.MAX_VALUE;
}
public int characteristics() {
return ORDERED|NONNULL;
}
public static Stream<List<String>> toChunks(Stream<String> lines,
Predicate<String> chunkStart, Predicate<String> chunkEnd,
boolean parallel) {
return StreamSupport.stream(
new ChunkSpliterator(lines.spliterator(), chunkStart, chunkEnd),
parallel);
}
}
与谓词匹配的行不包含在 block 中;如果需要的话,改变这种行为很容易。
可以这样使用:
ChunkSpliterator.toChunks( Files.lines(Paths.get(myFile)),
Pattern.compile("^<start>$").asPredicate(),
Pattern.compile("^<stop>$").asPredicate(),
true )
.collect(new MyProcessOneBucketCollector<>())
模式指定为 ^word$
要求整行仅由单词组成;如果没有这些 anchor ,包含模式的行可以开始和结束一个 block 。源流的性质不允许创建 block 时的并行性,因此当与立即收集操作链接时,整个操作的并行性相当有限。这取决于MyProcessOneBucketCollector
如果可以有任何并行性的话。
如果您的最终结果不依赖于源文件中存储桶出现的顺序,强烈建议您的收集器将自身报告为 UNORDERED
或者您插入 unordered()
在流的方法链中 collect
之前.
关于java - 如何使用 java.util.stream 处理文件 block ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26463771/