java - Apache Beam TestStream finalPane 未按预期触发

标签 java unit-testing testing apache-beam

我正在编写一个简单的测试来验证提前/准时/延迟 Pane 的语义。管道结合了每个键的元素数量。我的早期和准时 Pane 按预期工作,尽管我的最后一个 Pane 似乎始终是空的。

private static final Duration WINDOW_LENGTH = Duration.standardMinutes(2);
private static final Duration LATENESS_HORIZON = Duration.standardDays(1);

我的测试如下:

@Test
@Category(ValidatesRunner.class)
public void simpleTest() throws Exception {
    Instant baseTime = new Instant(0L);
    Duration one_min = Duration.standardMinutes(1);


    TestStream<KV<String, Long>> events = TestStream.create(KvCoder.of(StringUtf8Coder.of(), VarLongCoder.of()))
            .advanceWatermarkTo(baseTime)

            // First element arrives
            .addElements(
                    TimestampedValue.of(KV.of("laurens", 0L), baseTime.plus(one_min))
            )
            .advanceProcessingTime(Duration.standardMinutes(5))

            // Second element arrives
            .addElements(
                    TimestampedValue.of(KV.of("laurens", 0L), baseTime.plus(one_min))
            )
            .advanceProcessingTime(Duration.standardMinutes(5))

            // Third element arrives
            .addElements(
                    TimestampedValue.of(KV.of("laurens", 0L), baseTime.plus(one_min))
            )
            .advanceProcessingTime(Duration.standardMinutes(5))

            // Window ends
            .advanceWatermarkTo(baseTime.plus(WINDOW_LENGTH).plus(one_min))

            // Late element arrives
            .addElements(
                    TimestampedValue.of(KV.of("laurens", 0L), baseTime.plus(one_min))
            )
            .advanceProcessingTime(Duration.standardMinutes(5))

            // Fire all
            .advanceWatermarkToInfinity();

    PCollection<KV<String, Long>> userCount = p.apply(events).apply(new CountPipeline());

    IntervalWindow window = new IntervalWindow(baseTime, WINDOW_LENGTH);

    PAssert.that(userCount)  // This test works
            .inEarlyPane(window)
            .containsInAnyOrder(
                KV.of("laurens", 1L),  // First firing
                KV.of("laurens", 2L),  // Second firing
                KV.of("laurens", 3L)   // Third firing
            );

    PAssert.that(userCount) // This test works as well
            .inOnTimePane(window)
            .containsInAnyOrder(
                    KV.of("laurens", 3L) // On time firing
            );

    PAssert.that(userCount) // Test fails
            .inFinalPane(window)
            .containsInAnyOrder(
                    KV.of("laurens", 4L) // Late firing
            );

    p.run().waitUntilFinish();
}

流水线代码如下:

public static class CountPipeline extends PTransform<PCollection<KV<String, Long>>, PCollection<KV<String, Long>>> {

    @Override
    public PCollection<KV<String, Long>> expand(PCollection<KV<String, Long>> events) {
        return events.apply("window", Window.<KV<String, Long>>into(FixedWindows.of(WINDOW_LENGTH))
                        .triggering(AfterWatermark
                                .pastEndOfWindow()
                                .withEarlyFirings(AfterProcessingTime
                                        .pastFirstElementInPane())
                                .withLateFirings(AfterProcessingTime
                                        .pastFirstElementInPane())
                        )
                        .withAllowedLateness(LATENESS_HORIZON)
                        .accumulatingFiredPanes()
                        .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
                ).apply("Count", Count.perKey());
    }
}

错误:

Expected: iterable over [<KV{laurens, 4}>] in any order
     but: No item matches: <KV{laurens, 4}> in []

如您所见,最后一个元素肯定是在水印之后摄取的,根据定义,这应该使其延迟。但是,最终 Pane 不包含对原始结果的改进。老实说,我不知道为什么没有发出迟到的 Pane 。任何见解都值得赞赏。

最佳答案

FinalPane 与 LatePane 不同。

FinalPane 在您的测试中应该是空的,因为您的测试用例会为每个元素触发触发器,因此 没有一个留在 FinalPane 中。

正如我从评论中读到的那样,您的意图是正确的,即针对 LatePane 进行测试。由于未知原因,PAsert 实用程序函数列表中缺少 LatePane 的这种特殊情况。我做了一个 PR 来解决这个问题:https://github.com/apache/beam/pull/8587

关于java - Apache Beam TestStream finalPane 未按预期触发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56132551/

相关文章:

c# - dotnet cli - 指定的 .runsettings 文件未在代码覆盖率运行中使用

python - 在 Django 中测试 POST 端点时如何包含 csrf token ?

java - mysql View 的结果集不匹配

java - 如何通过网络传递对象的静态数据?

javascript - 在 Promise 中调用 setState 时 React Jest 测试失败

java - 如何对复杂对象进行单元测试?

java - 添加附加项目后 ListView 不刷新

java - 是否可以计算 HTML 格式文件中可见文本的行数?

android - 使用 retrofit2 和 rxjava2 对 android 应用程序进行单元测试

forms - 编写 Web 表单测试用例