google-cloud-dataflow - 通用云数据流模式 - 有更好的方法吗?

标签 google-cloud-dataflow

我们发现自己经常在数据流中使用以下模式:

  1. 从 BigQuery TableRow 执行 key 提取 ParDo
  2. 对 1 的结果执行 GroupByKey
  3. 对 2 的结果执行展平 ParDo

Dataflow 中是否有一种操作可以一次性实现此目的(至少从 API 角度来看)?

我看过Combine操作,但这似乎更适合在计算值时使用,例如总和/平均值等。

最佳答案

您的问题没有太多细节,我只能提供一般建议。

您可以创建一个PTransform,将上述模式组合成单个复合变换。这使您可以将常用的操作组合到一个可重用的组件中。

下面的代码应该能让你明白我的意思:

import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;


class ExtractKeyFn extends DoFn<TableRow, KV<String, TableRow>> {

    @Override
    public void processElement(ProcessContext c) throws Exception {
        TableRow row = c.element();
        Object key = row.get("key");
        if (key != null) {
            c.output(KV.of(key.toString(), row));
        }
    }
}

class CompositeTransform extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {

    public CompositeTransform(String name) {
        super(name);
    }

    public static CompositeTransform named(String name) {
        return new CompositeTransform(name);
    }

    @Override
    public PCollection<TableRow> apply(PCollection<TableRow> input) {
        return input.apply(ParDo.named("parse").of(new ExtractKeyFn()))
             .apply(GroupByKey.create())
             // potentially more transformations
            .apply(Values.create()) // get only the values ( because we have a kv )
            .apply(Flatten.iterables()); // flatten them out
    }
}

public class Main {

    public static void run(PipelineOptions options) {
        Pipeline p = Pipeline.create(options);

        // read input
        p.apply(BigQueryIO.Read.from("inputTable...").named("inputFromBigQuery"))

         // apply fancy transform
         .apply(CompositeTransform.named("FancyKeyGroupAndFlatten"))

         // write output
         .apply(BigQueryIO.Write.to("outputTable...").named("outputToBigQuery"));


        p.run();
    }
}

关于google-cloud-dataflow - 通用云数据流模式 - 有更好的方法吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34388500/

相关文章:

scala - 如何在java中使用数据流文本io动态目标

python - 使用 write_truncate 通过 Google Dataflow/Beam 将数据加载到 Biqquery 分区表中

google-cloud-dataflow - 数据流批处理作业卡在 GroupByKey.create() 中

java - @OnTimer 在窗口后不触发

java - Apache Beam 中的嵌套前 N 个

java - 使用 CombineFn 从所有节点累积数据后,合并每个键的所有值

java - 如何在数据流中有效处理大型 gzip 压缩文件?

java - 使用 Java API/Dataflow 将重复记录插入 Big Query - "Repeated field must be imported as a JSON array"

python - 在 CI 管道中部署 Dataflow

google-bigquery - Dataflow Runner - 尝试刷新以获取初始 access_token