python - 谷歌云平台: Pub/Sub to Bigtable

标签 python google-cloud-platform google-cloud-dataflow google-cloud-pubsub google-cloud-bigtable

我正在使用 Python 在 Google Cloud Platform 上构建管道。我的数据位于 Cloud Pub/Sub 中。我想使用 Dataflow 将其存储到 Bigtable 中。到目前为止,我有一些在 java 中将数据从 Pub/Sub 流式传输到 Bigtable 的示例。

任何人都可以帮助我提供一些资源或链接,了解如何使用 Python 中的 Dataflow 将数据从 Pub/Sub 流式传输到 Bigtable 吗?

最佳答案

萨姆, 我不确定我们如何在 Python 中做到这一点。但我已经用Java做到了这一点。希望这个想法可以帮助您解决您的问题。

执行此操作时应记住的步骤是

  1. 从 Pub/Sub 读取,将流式传输设置为 true

    PubsubIO.readStrings().fromTopic(PUBSUB_SUBSCRIPTION))
    
  2. 使用常量键对集合进行分组

    PCollection<KV<String, String>> keyedStream = streamData
            .apply(WithKeys.of(new SerializableFunction<String, String>() {
                /**
                 * serial version id
                 */
                private static final long serialVersionUID = 1L;
    
                public String apply(String s) {
                    return CONSTANT_KEY;
                }
            }));
    
  3. 创建一个转换,用于将 PCollection 中的元素划分到窗口中,并在输出这些元素时触发控件。

    Window.<String>into(new GlobalWindows())
            .triggering(Repeatedly
                    .forever(AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardSeconds(30))
                        )).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes()
    
  4. PCollection 写入 Bigtable。

关于python - 谷歌云平台: Pub/Sub to Bigtable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45831262/

相关文章:

Python继承 - 在子类中调用基类方法?

google-cloud-platform - 用于 HTTP 负载平衡的 Google 云后端服务忽略自定义端口

python - 如何使用 python 将流管道发布订阅到数据存储?

java - 如何创建从 postgres 到 parquet 的管道?

python - 从 .py 文件中获取包根和完整模块名称以进行导入

python - SECRET_KEY 设置不能为空 ||在 Settings.py 中可用

google-bigquery - Apache Beam/Google Dataflow PubSub 到 BigQuery 管道 : Handling Insert Errors and Unexpected Retry Behavior

google-cloud-platform - 调度数据流管道

python - python 中的 Gzip 和子进程的标准输出

networking - GCP - 无 Cloud NAT,但给定的公共(public) IP 离开 VPC