我是 Cloud Dataflow/Apache Beam 的新手,因此概念/编程对我来说仍然很模糊。
我想要做的是 Dataflow 监听 Pubsub 并获取 JSON 格式的消息:
{
"productId": "...",
"productName": "..."
}
并将其转换为:
{
"productId": "...",
"productName": "...",
"sku": "...",
"inventory": {
"revenue": <some Double>,
"stocks": <some Integer>
}
}
所以需要的步骤是:
(IngestFromPubsub) 通过监听主题从 Pubsub 获取记录(1 个 Pubsub 消息 = 1 条记录)
(EnrichDataFromAPI)
a.将有效负载的 JSON 字符串反序列化为 Java 对象
b.通过调用外部 API,使用
sku
,我可以通过添加inventory
属性来丰富每条记录的数据。c.再次序列化记录。
(WriteToGCS) 然后,每个
x
条(可以参数化)记录,我需要将它们写入 Cloud Storage。 另请考虑x=1
的简单情况。 (x=1
,是个好主意吗?恐怕云存储写入太多)
尽管我是一个 Python 爱好者,但在 Python 中我已经很难做到这一点,更需要用 Java 来编写。我在阅读 Beam 的 Java 示例时感到头疼,它太冗长且难以理解。我所理解的是,每个步骤都是对 PCollection 的 .apply
。
到目前为止,这是我微不足道的努力的结果:
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
// I don't really understand the next part, I just copied from official documentation and filled in some values
.apply(Window.<String>into(FixedWindows.of(Duration.millis(5000)))
.withAllowedLateness(Duration.millis(5000))
.triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1000)))
.discardingFiredPanes()
)
.apply("EnrichDataFromAPI", ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.element();
// help on this part, I heard I need to use Jackson but I don't know, for API HttpClient is sufficient
// ... deserialize, call API, serialize again ...
c.output(enrichedJSONString);
}
}
))
.apply("WriteToGCS",
TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()))
;
PipelineResult result = pipeline.run();
}
请填写缺少的部分,并给我一些有关窗口化的提示(例如,适当的配置是什么等)以及我应该在哪些步骤中插入/应用它。
最佳答案
我认为您不需要
IngestFromPubsub
和EnrichDataFromAPI
中的任何窗口。窗口的目的是将时间上接近的记录分组到窗口中,以便您可以对它们进行聚合计算。但由于您不进行任何聚合计算,并且有兴趣独立处理每个记录,因此不需要窗口。由于您总是将一条输入记录转换为一条输出记录,因此您的
EnrichDataFromAPI
应该是MapElements
。这应该会使代码更容易。有用于在 Apache Bean Java 中处理 JSON 的资源:Apache Beam stream processing of json data
您不一定需要使用 Jackson 将 JSON 映射到 Java 对象。您也许可以直接操作 JSON。您可以使用Java的native JSON API解析/操作/序列化。
关于java - 将来自 Pubsub 的每 X 条消息写入 Cloud Storage,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61043520/