google-cloud-dataflow - 为什么在 Dataflow beta 中 #sideInput() 方法从 Context 移至 ProcessContext

标签 google-cloud-dataflow

我想知道为什么 #sideInput() 方法移至 ProcessContext 类? 以前,我可以在 #startBundle() 方法中进行一些额外的处理并缓存结果。 在 #processElement() 中执行此操作听起来效率较低。当然,我可以在将数据传递到 View 之前进行预处理,但是仍然存在为每个元素调用 #sideInput() 的开销...

谢谢, G

最佳答案

很好的问题。原因是我们添加了对窗口 PCollections 作为侧面输入的支持。这支持其他场景,包括在流模式下使用带有无界 PCollection 的侧面输入。

在更改之前,我们仅支持全局窗口化的侧输入,然后在处理主输入 PCollection 的每个元素时,整个侧输入 PCollection 都可用。这对于传统批处理样式处理中的有界 PCollection 效果很好,但不能扩展到窗口或无界 PCollection。

更改后,您在 ParDo 中处理的当前元素的窗口控制侧输入的哪些子集可见。 (因此您无法访问 startBundle() 中的侧面输入,其中没有当前元素,因此也没有当前窗口。)

例如,考虑一个示例,其中您有一个流处理管道处理您的网站日志并向实时使用仪表板提供实时更新。您有两个无限制的输入 PCollection:一个包含新用户注册,另一个包含用户点击。您可以通过按小时对两个 PCollection 进行窗口化并对用户点击执行 ParDo(将新用户注册作为辅助输入)来识别哪些用户点击来自新用户。现在,当您处理给定小时内的用户点击时,您会自动看到同一小时内新用户注册的子集。您可以通过更改窗口函数并在侧面输入上及时向前移动元素时间戳来对此进行不同的变体,例如继续对用户每小时的点击次数进行窗口化,但使用过去 24 小时内的新注册。

我确实同意这一更改使得在您的侧面输入上缓存任何后处理变得更加困难。我们添加了 View.asMultimap 来处理将 Iterable 转换为查找表的常见情况。如果您的后处理是按元素进行的,则可以在创建 PCollectionView 之前使用 ParDo 来完成。对于现在的其他任何事情,我建议从 processElement 中延迟执行。我有兴趣了解其他发生的模式,这样我们就可以设法提高它们的效率。

关于google-cloud-dataflow - 为什么在 Dataflow beta 中 #sideInput() 方法从 Context 移至 ProcessContext,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29718820/

相关文章:

python - 通过 Airflow 中的 PythonVirtualenvOperator 成功运行多次数据流管道

python - 在工作人员上加载本地(不可序列化)对象

javascript - 解析 JSON 时 Google Cloud Dataflow Javascript UDF 错误

java - 如何向谷歌数据流添加依赖项?

java - Apache Beam 从 Pub/Sub 流式传输到 ElasticSearch

google-bigquery - 如何通过Java程序获取Bigquery表的架构?

java - 如何使用 Cloud Dataflow 在两个或多个键上加入 BQ 表?

python - 有没有办法读取除了 python apache beam 中定义的文件列表之外的所有文件?

python - Google Cloud Dataflow 访问云存储上的.txt 文件

java - 写入 BigQuery 时处理卡住