google-cloud-dataflow - 水印卡住了

标签 google-cloud-dataflow

我正在通过 pub/sub 将数据摄取到以无限制模式运行的数据流管道。这些数据基本上是与从跟踪设备捕获的时间戳的坐标。这些消息分批到达,每批可能是 1..n 条消息。在一段时间内可能没有消息到达,稍后可能会重新发送(或不重新发送)。我们使用每个坐标的时间戳(以 UTC 为单位)作为 pub-sub 消息的属性。并通过时间戳标签读取管道:

pipeline.apply(PubsubIO.Read.topic("new").timestampLabel("timestamp")

坐标和延迟的示例如下所示:
36 points wait 0:02:24
36 points wait 0:02:55
18 points wait 0:00:45
05 points wait 0:00:01
36 points wait 0:00:33
36 points wait 0:00:43
36 points wait 0:00:34

一条消息可能如下所示:
2013-07-07 09:34:11;47.798766;13.050133
在第一批之后,水印是空的,在第二批之后,我可以在管道诊断中看到一个水印,只是它没有得到更新,尽管有新消息到达。同样根据堆栈驱动程序日志记录,PubSub 没有未传递或未确认的消息。

当具有新事件时间的消息到达时,水印不应该向前移动吗?

根据What is the watermark heuristic for PubsubIO running on GCD? WaterMark 也应该每 2 分钟向前移动一次,但它不是吗?

[..] In the case that we have not seen data on the subscription in more than two minutes (and there's no backlog), we advance the watermark to near real time. [..]



更新以解决 Bens 问题:

Is there a job ID that we could look at?



是的,我刚刚在 09:52 CET 即 07:52 UTC 重新启动了整个设置,作业 ID 为 2017-05-05_00_49_11-11176509843641901704。

What version of the SDK are you using?



1.9.0

How are you publishing the messages with the timestamp labels?



我们使用 python 脚本发布使用 pub sub sdk 的数据。
来自那里的消息可能如下所示:

{'data': {timestamp;lat;long;ele}, 'timestamp': '2017-05-05T07:45:51Z'}

我们将时间戳属性用于数据流中的时间戳标签。

What is the watermark stuck at?



对于这项工作,水印现在停留在 09:57:35(我在 10:10 左右发布),尽管发送了新数据,例如在
10:05:14
10:05:43
10:06:30

我还可以看到,我们可能会在延迟超过 10 秒的情况下将数据发布到 pub sub,例如在 10:07:47,我们发布了最高时间戳为 10:07:26 的数据。

几个小时后,水印 catch 了,但我不明白为什么它一开始就延迟/不动。

最佳答案

这是 PubSub 水印跟踪逻辑中的一个边缘案例,它有两个变通方法(见下文)。本质上,如果2分钟没有输入,那么水印会前进到当前时间。但是,如果数据的到达速度快于每 2 分钟一次,但 QPS 仍然非常低,那么就没有足够的数据来使估计的水印保持最新。

正如我所提到的,有几种解决方法:

  • 如果您处理更多数据,问题自然会得到解决。
  • 或者,如果您注入(inject)额外的消息(例如每秒 2 条),它将为水印提供足够的数据以更快地推进。这些只需要有时间戳,并且可以立即从管道中过滤掉。
  • 关于google-cloud-dataflow - 水印卡住了,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50718318/

    相关文章:

    java - 从 PubSubIO 读取 : fromTopic vs fromSubscription

    google-cloud-platform - 用于 Google Cloud Dataflow 管道的自定义 DNS 解析器

    java - 从谷歌数据流输出到谷歌云Firestore

    java - 为什么我在 Google Dataflow 上收到 java.lang.IllegalStateException?

    java - 无法创建谷歌云数据流eclipse项目

    google-cloud-dataflow - 带有触发器用例的数据流滑动窗口与全局窗口?

    java - Google Cloud Dataflow 服务帐户未传播给工作人员?

    google-cloud-platform - 如何使用 API 在 GCP 数据流中检索当前工作人员计数

    python - 使用 Apache Beam Python 通过 Google Dataflow 将小型集合输出分发给多个工作人员

    python - BigQuerySink 的 bigquery.TableSchema 的 JSON 表架构