每次从 Pubsub 收到消息时,如何写入 GCS,它会进行窗口写入,但不会按元素写入。非常感谢有关此事的任何提示。
运行此示例代码后,它会写入发送到 GCS 的发布-订阅消息。但是,当持续时间设置为 1 分钟时,它会保存所有消息,然后在一分钟后写入 1 个文件,但我希望它将每条消息写入不同的文件。
最佳答案
如果您需要每条消息一个文件,一个选择是创建简单的转换,如下所示:
package com.myapp.dataflow.transform;
import org.apache.beam.sdk.transforms.DoFn;
import com.google.cloud.storage.*;
import static java.nio.charset.StandardCharsets.UTF_8;
public class StringToGcsFile extends DoFn<String, Blob> {
private Storage storage;
private String bucketName = "my-bucket";
@Setup
public void setup() {
storage = StorageOptions.getDefaultInstance().getService();
}
@ProcessElement
public void processElement(ProcessContext c) {
// consider some strategy for object names, UUID or something
String blobName = "my_blob_name";
// Upload a blob to the bucket
BlobId blobId = BlobId.of(bucketName, blobName);
BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("text/plain").build();
Blob blob = storage.create(blobInfo, c.element().getBytes(UTF_8));
c.output(blob);
}
}
Maven 依赖:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-storage</artifactId>
<version>1.35.0</version>
</dependency>
关于google-cloud-storage - Cloud Pub/Sub 到 GCS,按元素写入(数据流管道),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50991930/