java - 依赖前一个和下一个元素的流处理

标签 java lambda java-8 java-stream


0This is the header record------------------------------------
1This is another record always existing out of one lin--------
21This is a record that can be composed out of multiple parts.
22This is the second part of record type 2--------------------
21This is a new record of type 2, first part.-----------------
22This is the second part of record type 2--------------------
23This is the third part of record type 2---------------------

使用 Stream API,我想解析这个文件:

Stream<String> lines = Files.lines(Paths.get(args[1])); -> RecordFactory.createRecord(line)).collect(Collectors.toList());

但是由于这个流是逐行传递的,当它解析记录类型 2 的第一行(记录类型 2 序列 1)时,记录 2 的映射是不完整的。下一行(记录类型 2 序列 2)应添加到先前映射的结果中。

我怎样才能用 lambda 解决这个问题而不必破坏线程安全?


目前使用 Stream API 不容易实现对匹配谓词的连续元素的操作。

一个选择是使用 StreamEx提供 groupRuns 的图书馆操作:

Returns a stream consisting of lists of elements of this stream where adjacent elements are grouped according to supplied predicate.


private static final Pattern PATTERN = Pattern.compile("\\d(\\d+)");

public static void main(String[] args) throws IOException {
    try (StreamEx<String> stream = StreamEx.ofLines(Paths.get("..."))) {
        List<Record> records =
            stream.groupRuns((s1, s2) -> getRecordPart(s2) > getRecordPart(s1))

private static final int getRecordPart(String str) {
    Matcher matcher = PATTERN.matcher(str);
    if (matcher.find()) {
        return Integer.parseInt(;
    return 1; // if the pattern didn't find anything, it means the record is on a single line

这假设您的 RecordFactory会创建一个 Record来自List<String>而不是来自 String .请注意,此解决方案可以并行运行,但将文件内容存储到 List 中可能会更好。如果您想要更好的并行性能(以内存为代价),则对该列表进行后处理。

