google-cloud-dataflow - 如何为 Beam 管道中的 session 窗口编写单元测试?

标签 google-cloud-dataflow apache-beam

我正在编写一个处理产品事件(创建、更新、删除)的管道。每个产品都属于具有特定持续时间的销售。我希望能够对给定销售中的所有产品执行一些聚合。出于本示例的目的,假设我只需要每次销售的唯一产品 ID 列表。

因此,我的管道在间隔持续时间很长的销售 ID 上使用 session 窗口(因此,当销售结束并且没有发布更多产品更新时,该销售窗口也会关闭)。我的问题是,如何为此编写单元测试?

为了这个测试,我们假设如下:

  • 事件只是带有销售 ID 和产品 ID 的字符串,以空格分隔,
  • applyDistinctProductsTransform基本上会执行我上面所说的。创建KV<String, String>关键是销售 ID 的元素;将 session 窗口设置为 600 秒的间隔时间;最后为每次销售创建所有产品 ID 的串联字符串。

这是我目前所拥有的:

我创建了一个 TestStream并添加一些元素:sale1 的 3 个产品.接下来,我将水印提高到 700,远远超过间隙持续时间。添加另一个产品,最后将水印推进到无穷大。

@Test
public void TestSessionWindow() {
    Coder<String> utfCoder = StringUtf8Coder.of();
    TestStream<String> onTimeProducts = 
TestStream.create(utfCoder).addElements(
            TimestampedValue.of("sale1 product1", new Instant(0)),
            TimestampedValue.of("sale1 product2", new Instant(0)),
            TimestampedValue.of("sale1 product3", new Instant(0))
    )
            .advanceWatermarkTo(new Instant(700)) // watermark passes trigger time
    .addElements(
            TimestampedValue.of("campaign1 product9", new Instant(710))
    )
    .advanceWatermarkToInfinity();

    PCollection<KV<String, String>> results = applyDistinctProductsTransform(pipeline, onTimeProducts);

    PAssert.that(results).containsInAnyOrder(
            KV.of("sale1", "product1,product2,product3"),
            KV.of("sale1", "product9")
    );
    pipeline.run().waitUntilFinish();
}

但是,

  1. 管道输出 sale1 的 KV , product1,product2,product3,product9所以product9附加到窗口。我本来希望这个产品在一个单独的窗口中处理,因此最终出现在输出 PCollection 的不同行中。
  2. 如何在 PAssert 中只获取单个窗口的结果?我知道有 inWindow函数,我找到了一个 fixed time window 的例子但我不知道如何为 session 窗口做同样的事情。

您可以查看 PTransform 的完整代码和 unit test .

最佳答案

1) 我相信你有一个简单的单位问题。以秒为单位指定窗口间隙持续时间 600 Duration.standardSecondsnew Instant(long)使用毫秒,这意味着 600 秒的间隔大于导致 session 合并的 700 毫秒的时间间隔。

2) session 在内部仍然使用间隔窗口。您将需要根据您的触发策略计算所有 session 合并后的输出窗口。默认情况下, session 窗口使用 IntervalWindow(timestamp, gap duration) , 并合并所有 overlapping windows创建一个更大的窗口。例如,如果您有相同 session key 的窗口(开始时间、结束时间)、[10、14]、[12、18]、[4、14],它们将全部合并生成一个 [4, 18] 窗口。

关于google-cloud-dataflow - 如何为 Beam 管道中的 session 窗口编写单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51994579/

相关文章:

google-cloud-dataflow - 数据流工作人员无法连接到数据流服务

python - 数据流SDK版本

java - 无法传递 FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards

java - Apache Beam GroupByKey 重复事件

python - 在 Dataflow 管道中写入 BigQuery 时捕获失败

python - 遇到从 Dataflow 管道向 BigQuery 进行缓慢流式写入的问题?

google-bigquery - 当使用无界 PCollection 从 TextIO 到 BigQuery 时,数据卡在 BigQueryIO 内部的 Reshuffle/GroupByKey 中

google-cloud-dataflow - com.google.cloud.spanner.SpannerException : DEADLINE_EXCEEDED

python - 谷歌数据流 : global name is not defined - apache beam

java - BigQueryIO.Read.fromQuery 位于欧盟的数据集出现问题