我正在使用 Python 在 Google Cloud Platform 上构建管道。我的数据位于 Cloud Pub/Sub 中。我想使用 Dataflow 将其存储到 Bigtable 中。到目前为止,我有一些在 java 中将数据从 Pub/Sub 流式传输到 Bigtable 的示例。
任何人都可以帮助我提供一些资源或链接,了解如何使用 Python 中的 Dataflow 将数据从 Pub/Sub 流式传输到 Bigtable 吗?
最佳答案
萨姆, 我不确定我们如何在 Python 中做到这一点。但我已经用Java做到了这一点。希望这个想法可以帮助您解决您的问题。
执行此操作时应记住的步骤是
从 Pub/Sub 读取,将流式传输设置为 true
PubsubIO.readStrings().fromTopic(PUBSUB_SUBSCRIPTION))
使用常量键对集合进行分组
PCollection<KV<String, String>> keyedStream = streamData .apply(WithKeys.of(new SerializableFunction<String, String>() { /** * serial version id */ private static final long serialVersionUID = 1L; public String apply(String s) { return CONSTANT_KEY; } }));
创建一个转换,用于将
PCollection
中的元素划分到窗口中,并在输出这些元素时触发控件。Window.<String>into(new GlobalWindows()) .triggering(Repeatedly .forever(AfterProcessingTime .pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(30)) )).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes()
将
PCollection
写入 Bigtable。
关于python - 谷歌云平台: Pub/Sub to Bigtable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45831262/