我正在尝试在 Spark 上为 POC 运行我的 Beam 代码。我正在 Google Cloud Dataproc 上运行该应用程序进行测试。从 PubSub 主题读取并将消息写入 Google Cloud Storage 上的存储桶是一个非常简单的测试。 Dataproc 集群具有适用于 Spark 的正确版本,并且可以访问其他 GCP API。
我也尝试过 FileIO,但这也不起作用。我尝试发布到另一个 PubSub 主题而不是写作,这奏效了,但这不是我的用例。我在使用 TextIO 写入之前尝试打印,并确认我可以从 PubSub 读取消息。
这是管道:
PCollection<String> messages = pipeline
.apply(PubsubIO.readStrings().fromSubscription(sub))
.apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))));
messages.apply(TextIO.write().to("gs://...").withNumShards(1).withWindowedWrites());
pipeline.run();
我在 Dataproc 作业输出上没有看到任何日志。没有错误或任何东西。存储桶上也没有文件。
最佳答案
我发现这是触发的问题。以下是详细讨论:
https://lists.apache.org/thread.html/a831da3cd74159bf0e0f3fe77363b022cde943ba40c6ab68bb33d5bb@%3Cuser.beam.apache.org%3E
我通过将窗口转换更改为早期触发触发器来解决此问题:
.apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.alignedTo(Duration.standardSeconds(10))))
.withAllowedLateness(Duration.standardSeconds(10))
.discardingFiredPanes())
关于apache-beam - Apache Beam TextIO 不适用于 Spark Runner,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56276301/