java - GCP数据流流模板: Not able to customize google provided java based PubSubToBQ template

标签 java for-loop google-cloud-platform google-cloud-dataflow google-cloud-pubsub

问题陈述是我们正在自定义 Google 提供的 PubSubToBQ Dataflow 流式 java 模板,其中我们配置要读取的多个订阅/主题并将数据推送到多个 Bigquery 表中,这需要作为单个执行数据流管道,用于从源读取所有流并推送到 Bigquery 表中。但是,当我们从 eclipse 执行模板时,我们必须传递订阅/主题和 BQ 详细信息,以及 gcs 存储桶上的模板阶段,然后当我们使用具有不同订阅和 BQ 详细信息的 gcloud 命令运行模板时。数据流作业不会被新的订阅或 BQ 表覆盖。

目标:我的目标是使用 Google 提供的 PubSubTOBQ.java 类模板并传递具有相应 Bigquery 表的订阅列表,并创建每个表传递订阅的管道。因此,单个作业中有 n-n、n 个管道。

我正在使用 Google 提供的 PubSubTOBQ.java 类模板,该模板将输入作为单个订阅或单个主题以及相应的大查询表详细信息。

现在我需要对其进行自定义,以将输入作为主题列表或以逗号分隔的订阅列表。我可以使用 ValueProvider> 并在 main 或 run 方法内部迭代字符串数组并将订阅/主题和 bq 表作为字符串传递。请查看下面的代码以获取更多信息。

我在 gcp 文档上读到的是,如果我们想在 rumtime 期间覆盖或使用值来创建动态 Piepline,则我们无法在 DoFn 之外传递 ValueProvider 变量。不确定我们是否可以阅读 DoFn 内的消息。

**PubsubIO.readMessagesWithAttributes().fromSubscription(providedSubscriptionArray[i])** 

如果是,请告诉我。这样我的目的就达到了。

代码:

public static void main(String[] args) {
        StreamingDataflowOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(StreamingDataflowOptions.class);

    List<String> listOfSubStr = new ArrayList<String>();
    List<String> listOfTopicStr = new ArrayList<String>();
    List<String> listOfTableStr = new ArrayList<String>();

    String[] providedSubscriptionArray = null;
    String[] providedTopicArray = null;
    String[] providedTableArray = null;

    if (options.getInputSubscription().isAccessible()) {
        listOfSubStr = options.getInputSubscription().get();
        providedSubscriptionArray = new String[listOfSubStr.size()];
        providedSubscriptionArray = createListOfProvidedStringArray(listOfSubStr);
    }

    if (options.getInputTopic().isAccessible()) {
        listOfTopicStr = options.getInputTopic().get();
        providedTopicArray = new String[listOfSubStr.size()];
        providedTopicArray = createListOfProvidedStringArray(listOfTopicStr);
    }

    if (options.getOutputTableSpec().isAccessible()) {
        listOfTableStr = options.getOutputTableSpec().get();
        providedTableArray = new String[listOfSubStr.size()];
        providedTableArray = createListOfProvidedStringArray(listOfTableStr);
    }

    Pipeline pipeline = Pipeline.create(options);

    PCollection<PubsubMessage> readPubSubMessage = null;

    for (int i = 0; i < providedSubscriptionArray.length; i++) {

        if (options.getUseSubscription()) {
            readPubSubMessage = pipeline
                    .apply(PubsubIO.readMessagesWithAttributes().fromSubscription(providedSubscriptionArray[i]));
        } else {
            readPubSubMessage = pipeline.apply(PubsubIO.readMessagesWithAttributes().fromTopic(providedTopicArray[i]));
        }

        readPubSubMessage
                /*
                 * Step #2: Transform the PubsubMessages into TableRows
                 */
                .apply("Convert Message To TableRow", ParDo.of(new PubsubMessageToTableRow()))
                .apply("Insert Data To BigQuery",
                        BigQueryIO.writeTableRows().to(providedTableArray[i])
                                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    }

    pipeline.run().waitUntilFinish();
}

应该能够将单个 Dataflow PubSubTOBQ 模板用于与单个 Dataflow 流处理作业中的 bigquery 模板数量相对应的订阅数量的多个管道。

最佳答案

问题是,到目前为止,数据流模板需要知道暂存/创建时的管道图,以便它在运行时不能有所不同。如果您仍然想使用非模板化管道来执行此操作,并将逗号分隔的 Pub/Sub 主题列表传递为 --topicList选项参数,那么你可以这样做:

String[] listOfTopicStr = options.getTopicList().split(",");

PCollection[] p = new PCollection[listOfTopicStr.length];

for (int i = 0; i < listOfTopicStr.length; i++) {
    p[i] = pipeline
        .apply(PubsubIO.readStrings().fromTopic(listOfTopicStr[i]))
        .apply(ParDo.of(new DoFn<String, Void>() {
            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                Log.info(String.format("Message=%s", c.element()));
            }
        }));
}

完整代码here .

如果我们用 3 个主题来测试它,例如:

mvn -Pdataflow-runner compile -e exec:java \
 -Dexec.mainClass=com.dataflow.samples.MultipleTopics \
      -Dexec.args="--project=$PROJECT \
      --topicList=projects/$PROJECT/topics/topic1,projects/$PROJECT/topics/topic2,projects/$PROJECT/topics/topic3 \
      --stagingLocation=gs://$BUCKET/staging/ \
      --runner=DataflowRunner"

gcloud pubsub topics publish topic1 --message="message 1"
gcloud pubsub topics publish topic2 --message="message 2"
gcloud pubsub topics publish topic3 --message="message 3"

输出和数据流图将符合预期:

enter image description here

将这种方法强制纳入模板的一个可能的解决方法是拥有足够多的主题 N对于最坏的情况。当我们使用 n 执行模板时主题(满足 n <= N )我们需要指定 N - n要填写的未使用/虚拟主题。

关于java - GCP数据流流模板: Not able to customize google provided java based PubSubToBQ template,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57053573/

相关文章:

kubernetes - Kubernetes Engine和Container Engine有什么区别?为什么后者没有出现在我的仪表板上?

python-3.x - 如何为 BigQuery 的 Google Cloud 远程实例提供身份验证?

python - IO错误: [Errno 2] No such file or directory when run in Flex App Engine

java - Jtree getTreeCellRendererComponent 执行多次

java - 我如何从枚举中选择随机值?

java - 生成线程利用率摘要

for-loop - Clojure:如何返回在函数的 for 循环内计算的值

Java 是一个数组列表按顺序包含另一个数组列表

java - 无法在 child() 中为参数 'pathString' 传递 null

r - 以增长率预测时避免循环