java - 将来自 Pubsub 的每 X 条消息写入 Cloud Storage

标签 java google-cloud-dataflow apache-beam google-cloud-pubsub

我是 Cloud Dataflow/Apache Beam 的新手,因此概念/编程对我来说仍然很模糊。

我想要做的是 Dataflow 监听 Pubsub 并获取 JSON 格式的消息:

{
  "productId": "...",
  "productName": "..."
}

并将其转换为:

{
  "productId": "...",
  "productName": "...",
  "sku": "...",
  "inventory": {
    "revenue": <some Double>,
    "stocks":  <some Integer>
  }
}

所以需要的步骤是:

  1. (IngestFromPubsub) 通过监听主题从 Pubsub 获取记录(1 个 Pubsub 消息 = 1 条记录)

  2. (EnrichDataFromAPI)

    a.将有效负载的 JSON 字符串反序列化为 Java 对象

    b.通过调用外部 API,使用 sku,我可以通过添加 inventory 属性来丰富每条记录的数据。

    c.再次序列化记录。

  3. (WriteToGCS) 然后,每个 x 条(可以参数化)记录,我需要将它们写入 Cloud Storage。 另请考虑 x=1 的简单情况。 (x=1,是个好主意吗?恐怕云存储写入太多)

尽管我是一个 Python 爱好者,但在 Python 中我已经很难做到这一点,更需要用 Java 来编写。我在阅读 Beam 的 Java 示例时感到头疼,它太冗长且难以理解。我所理解的是,每个步骤都是对 PCollection 的 .apply

到目前为止,这是我微不足道的努力的结果:

public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("IngestFromPubsub", PubsubIO.readStrings().fromTopic(options.getTopic()))
        // I don't really understand the next part, I just copied from official documentation and filled in some values
        .apply(Window.<String>into(FixedWindows.of(Duration.millis(5000)))
            .withAllowedLateness(Duration.millis(5000))
            .triggering(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(1000)))
            .discardingFiredPanes()
        )
        .apply("EnrichDataFromAPI", ParDo.of(
            new DoFn<String, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    c.element();
                    // help on this part, I heard I need to use Jackson but I don't know, for API HttpClient is sufficient
                    // ... deserialize, call API, serialize again ...
                    c.output(enrichedJSONString);
                }
            }
        ))
        .apply("WriteToGCS", 
            TextIO.write().withWindowedWrites().withNumShards(1).to(options.getOutput()))
    ;


    PipelineResult result = pipeline.run();
}

请填写缺少的部分,并给我一些有关窗口化的提示(例如,适当的配置是什么等)以及我应该在哪些步骤中插入/应用它。

最佳答案

  • 我认为您不需要 IngestFromPubsubEnrichDataFromAPI 中的任何窗口。窗口的目的是将时间上接近的记录分组到窗口中,以便您可以对它们进行聚合计算。但由于您不进行任何聚合计算,并且有兴趣独立处理每个记录,因此不需要窗口。

  • 由于您总是将一条输入记录转换为一条输出记录,因此您的 EnrichDataFromAPI 应该是 MapElements 。这应该会使代码更容易。

  • 有用于在 Apache Bean Java 中处理 JSON 的资源:Apache Beam stream processing of json data

  • 您不一定需要使用 Jackson 将 JSON 映射到 Java 对象。您也许可以直接操作 JSON。您可以使用Java的native JSON API解析/操作/序列化。

关于java - 将来自 Pubsub 的每 X 条消息写入 Cloud Storage,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61043520/

相关文章:

java - List<A> 引用 List<subclass-of-A>

java - 基于PHP的网络爬虫或基于JAVA的网络爬虫

java - for循环内的System.arraycopy不是切换矩阵行吗?

python - apache_beam.transforms.util.Reshuffle() 不适用于 GCP 数据流

Java - 将 MessagePack 时间戳转换为日期

java - MongoDB Change Stream读取旧更新数据

google-bigquery - 如何在 Dataflow 中使用 BigQuery Standard SQL?

java - 加入两个大量的 PCollection 有性能问题

python-3.x - 在 Apache Beam (python) 中使用 ToList 输出作为 AsSingleton 或 AsList 的输入

java - Beam Java SDK 2.4/2.5 PAssert 与 CoGroupByKey