java - 如何在谷歌数据流中创建个性化WindowFn

标签 java google-cloud-dataflow dataflow apache-beam

我想创建一个不同的 WindowFn ,以便根据另一个字段而不是根据我的输入条目的时间戳将 Windows 分配给我的任何输入元素。我知道 Google DataFlow SDK 中的预定义 WindowFn 使用时间戳作为分配窗口的标准。

更具体地说,我想创建一种 SlidingWindows,但我不想将时间戳视为窗口分配标准,而是将另一个字段视为该标准。

如何创建自定义的 WindowFn?创建自己的 WindowFn 时应该考虑哪些要点?

谢谢。

最佳答案

要创建新的WindowFn,只需继承WindowFn即可或子类并重写各种抽象方法。

在您的情况下,您不需要窗口合并,因此您可以从 NonMergingWindowFn 继承,并且您的代码可能类似于

public class MyWindowFn extends NonMergingWindowFn<ElementT, IntervalWindow> {
  public Collection<W> assignWindows(AssignContext c) {
    return setOfWindowsElementShouldBeIn(c.element());
  }

  public boolean isCompatible(WindowFn other) {
    return other instanceof MyWindowFn;
  }

  public Coder<IntervalWindow> windowCoder() {
    return IntervalWindow.getCoder();
  }

  public W getSideInputWindow(final BoundedWindow window) {
    // You may not need this if you won't ever be using PCollections windowed 
    // with this as side inputs.  If that's the case, just throw.
    // Otherwise you'll need to figure out how to map the main input windows
    // into the windows generated by this WindowFn.
  }
}

关于java - 如何在谷歌数据流中创建个性化WindowFn,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37897452/

相关文章:

java - 使用 Selenium 和 Java 进行自动化,如何制作脚本的面向对象部分 "actionable?"

java - Derby Eclipse 类路径

java - GCP Dataflow-从存储中读取 CSV 文件并写入 BigQuery

elasticsearch - 如何为Google数据流实例分配IP范围?

gcc - C 程序的数据流图生成

java - 如何在 android 中的对象框数据库中插入对象列表?喜欢房间数据库

java - 日期比较问题

etl - Pipeline从GCS输入80亿行并做了GroupByKey以防止融合,group step运行非常慢

java - 如何使用 avro 在 parquet 文件模式中创建 REPEATED 类型?

php - 以编程方式运行 Magento 1.9.1.0 数据流导入配置文件