java - DataFlow (Apache Beam) 中 Pub/Sub 的自定义时间戳和窗口

标签 java python google-cloud-dataflow apache-beam spotify-scio

我想使用 Apache Beam 中的流处理管道(并在 Google DataFlow 上运行)来实现以下场景:

  1. 从 Pub/Sub 读取消息(JSON 字符串)
  2. 反序列化 JSON
  3. 使用自定义字段(例如 timeStamp)作为处理元素的时间戳值
  4. 应用 60 秒的固定窗口
  5. 从元素中提取键并按键分组
  6. <<执行进一步处理>>

我尝试使用 Java(Scala) 和 Python 来解决这个问题,但没有一个解决方案有效。

  1. Python 解决方案
# p is beam.Pipeline()
_ = (p | beam.io.ReadFromPubSub(subscription="my_sub")
        | beam.Map(add_timestamping)
        | beam.WindowInto(window.FixedWindows(60))
        | beam.Map(lambda elem: elem) # exracting the key somehow, not relevant here
        | beam.GroupByKey()
        # (...)
        | beam.io.WriteToPubSub("output_topic")
        )
p.run()

add_timestamping 函数按照documentation :

def add_timestamping(elem):
    import json
    import apache_beam as beam
    msg = json.loads(elem)
    unix_timestamp = msg['timeStamp'] / 1000
    return beam.window.TimestampedValue(msg, unix_timestamp)

Python 解决方案的输出:

  1. 使用 DirectRunner 时,会发出窗口,并且窗口本身或多或少是合适的,具体取决于延迟。
  2. 使用 DataFlowRunner 时,所有消息都会被跳过,并且 DataFlow UI 中会显示计数器:droppedDueToLateness
<小时/>
  • Java/Scala 解决方案 (我使用过Scio,但这也发生在 Java 的干净 Beam SDK 中)
  • sc.pubsubSubscription[String]("my_sub")
        .applyTransform(ParDo.of(new CustomTs()))
        .withFixedWindows(Duration.standardSeconds(60))
        .map(x => x) // exracting the key somehow, not relevant here
        .groupByKey
        // (...)
        .saveAsPubsub("output_topic")
    

    按照 documentation 添加自定义时间戳:

    import io.circe.parser._
    class CustomTs extends DoFn[String, String] {
      @ProcessElement
      def processElement(@Element element: String, out: DoFn.OutputReceiver[String]): Unit = {
        val json = parse(element).right.get
        val timestampMillis: Long = json.hcursor.downField("timeStamp").as[Long].getOrElse(0)
        out.outputWithTimestamp(element, new Instant(timestampMillis))
      }
    }
    

    Java/Scala 解决方案的输出:

    Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
    java.lang.IllegalArgumentException:
     Cannot output with timestamp 2019-03-02T00:51:39.124Z. 
     Output timestamps must be no earlier than the timestamp of the current input
     (2019-03-28T14:57:53.195Z) minus the allowed skew (0 milliseconds).
    

    我无法在此处使用 DoFn.getAllowedTimestampSkew,因为它已被弃用,而且我不知道将发送哪些范围的历史数据。

    <小时/>

    拥有处理历史数据的能力对于我的项目至关重要(这些数据将从某个商店发送到 Pub/Sub)。该管道必须同时适用于当前数据和历史数据。

    我的问题是: 如何使用自定义时间戳处理数据,并能够在使用 Beam API 定义的窗口上进行操作?

    最佳答案

    如果您能够将插入点处的时间戳提取到 PubSub,则您将能够使用用户指定的时间戳作为元数据。有关如何操作的信息记录在 1.9 SDK 中。

    https://cloud.google.com/dataflow/model/pubsub-io#timestamps-and-record-ids

    “您可以使用用户指定的时间戳来精确控制如何将从 Cloud Pub/Sub 读取的元素分配给 Dataflow 管道中的窗口。”

    由于 1.9 已弃用,在 2.11 中您将需要 https://beam.apache.org/releases/javadoc/2.11.0/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.Read.html#withTimestampAttribute-java.lang.String-

    关于java - DataFlow (Apache Beam) 中 Pub/Sub 的自定义时间戳和窗口,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55401081/

    相关文章:

    java - SQLite 中的 android errno

    java - 无论如何在 finally 声明中获得返回值?

    python - Flask 结构——无法从 __init__.py 导入应用程序

    python - 通过 Pandas 中的单个字典映射多个列

    java - 转移的 Java 日期已过去

    java - 我们如何解析这个json数据?

    python - 正常随边界变化?

    java - 云数据流: change bigquery destination on the fly

    bitbucket - 在 Dataflow Worker 上使用 SSH key 来拉取私有(private)库

    java - 数据流单元测试