我们发现自己经常在数据流中使用以下模式:
- 从 BigQuery TableRow 执行 key 提取
ParDo
- 对 1 的结果执行
GroupByKey
- 对 2 的结果执行展平
ParDo
Dataflow 中是否有一种操作可以一次性实现此目的(至少从 API 角度来看)?
我看过Combine操作,但这似乎更适合在计算值时使用,例如总和/平均值等。
最佳答案
您的问题没有太多细节,我只能提供一般建议。
您可以创建一个PTransform
,将上述模式组合成单个复合变换。这使您可以将常用的操作组合到一个可重用的组件中。
下面的代码应该能让你明白我的意思:
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
class ExtractKeyFn extends DoFn<TableRow, KV<String, TableRow>> {
@Override
public void processElement(ProcessContext c) throws Exception {
TableRow row = c.element();
Object key = row.get("key");
if (key != null) {
c.output(KV.of(key.toString(), row));
}
}
}
class CompositeTransform extends PTransform<PCollection<TableRow>, PCollection<TableRow>> {
public CompositeTransform(String name) {
super(name);
}
public static CompositeTransform named(String name) {
return new CompositeTransform(name);
}
@Override
public PCollection<TableRow> apply(PCollection<TableRow> input) {
return input.apply(ParDo.named("parse").of(new ExtractKeyFn()))
.apply(GroupByKey.create())
// potentially more transformations
.apply(Values.create()) // get only the values ( because we have a kv )
.apply(Flatten.iterables()); // flatten them out
}
}
public class Main {
public static void run(PipelineOptions options) {
Pipeline p = Pipeline.create(options);
// read input
p.apply(BigQueryIO.Read.from("inputTable...").named("inputFromBigQuery"))
// apply fancy transform
.apply(CompositeTransform.named("FancyKeyGroupAndFlatten"))
// write output
.apply(BigQueryIO.Write.to("outputTable...").named("outputToBigQuery"));
p.run();
}
}
关于google-cloud-dataflow - 通用云数据流模式 - 有更好的方法吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34388500/