google-cloud-storage - Cloud Pub/Sub 到 GCS,按元素写入(数据流管道)

标签 google-cloud-storage google-cloud-dataflow publish-subscribe apache-beam

每次从 Pubsub 收到消息时,如何写入 GCS,它会进行窗口写入,但不会按元素写入。非常感谢有关此事的任何提示。

示例链接 ( https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/main/java/com/google/cloud/teleport/templates/PubsubToText.java )

运行此示例代码后,它会写入发送到 GCS 的发布-订阅消息。但是,当持续时间设置为 1 分钟时,它会保存所有消息,然后在一分钟后写入 1 个文件,但我希望它将每条消息写入不同的文件。

最佳答案

如果您需要每条消息一个文件,一个选择是创建简单的转换,如下所示:

package com.myapp.dataflow.transform;

import org.apache.beam.sdk.transforms.DoFn;
import com.google.cloud.storage.*;
import static java.nio.charset.StandardCharsets.UTF_8;

public class StringToGcsFile extends DoFn<String, Blob> {
    private Storage storage;
    private String bucketName = "my-bucket";

    @Setup
    public void setup() {
        storage = StorageOptions.getDefaultInstance().getService();
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        // consider some strategy for object names, UUID or something
        String blobName = "my_blob_name";

        // Upload a blob to the bucket
        BlobId blobId = BlobId.of(bucketName, blobName);
        BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
        Blob blob = storage.create(blobInfo, c.element().getBytes(UTF_8));

        c.output(blob);
    }
}

Maven 依赖:

<dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-storage</artifactId>
    <version>1.35.0</version>
</dependency>

关于google-cloud-storage - Cloud Pub/Sub 到 GCS,按元素写入(数据流管道),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50991930/

相关文章:

c# - Redis Pub/Sub ServiceStack,取消线程

google-cloud-platform - 如何使用 gcloud 终端将文件从 Google Cloud Storage 传输到 Compute Engine 实例?

google-cloud-storage - 为什么运行管道时会向 GCS 写入零字节文件?

google-bigquery - 如何将 PCollection 转移到普通列表

google-cloud-dataflow - 当 CoGroupByKey 与 CalendarWindows 时,Flatten 的输入具有不兼容的窗口 windowFns

redis - 发布/订阅和消息队列中多个消费者的消息传递保证

firebase - getDownloadUrl 是付费操作吗?

java - Google Cloud Dataflow 服务帐户未传播给工作人员?

go - Apache Beam Go SDK - 数据流无法正确自动缩放(并行化步骤)

javascript - 如何处理事件冒泡?