google-cloud-platform - 如何将多个 PCollection 组合在一起并将其作为 ParDo 函数的输入

标签 google-cloud-platform google-cloud-dataflow

我有大约六个 PCollection 作为 KV。我想通过将组合的 (6) 个 PCollection 作为 sideInput 来对另一个 PCollection 执行 ParDo。

我尝试将所有 6 个 PCollections 作为单独的 sideInput,如下所示

PCollection<TableRow> OutputRows = MyCollection.apply(ParDo.withSideInputs(Inp1, Inp2,...)
    .of(new DoFn<KV<String, String>, TableRow>() {
        ...
    }

但是当堆空间超出时,它会抛出 OutOfMemoryError 。请建议如何组合 PCollection 以作为另一个 PCollection 的输入。

最佳答案

Cloud Dataflow 提供了多种加入方式。

用作侧输入的

PCollection 会广播给工作线程并加载到内存中。这听起来像是您正在做的事情,并且如果 PCollection 大小的总和太大,就会解释 OOM。

您提到这些值是键控的 - 另一种选择是使用 CoGroupByKey .

为此,您需要使用所有 PCollection 创建一个 KeyedPCollectionTuple,然后您将得到一个包含每个键的所有值的结果。像这样使用 CoGroupByKey 会打乱数据,以便使用给定键的结果的 ParDo 只需读取关联的值:

PCollection<KV<K, V1>> inp1 = ...;
PCollection<KV<K, V2>> inp2 = ...;

final  TupleTag<V1> t1 = new  TupleTag<>();
final  TupleTag<V2> t2 = new  TupleTag<>();
PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
  KeyedPCollectionTuple.of(t1, inp1)
                       .and(t2, inp2)
                       .apply(CoGroupByKey.<K>create());

PCollection<T> finalResultCollection =
  coGbkResultCollection.apply(ParDo.of(
   new  DoFn<KV<K, CoGbkResult>, T>() {
     @Override
     public void processElement(ProcessContext c) {
      KV<K, CoGbkResult> e = c.element();
      Iterable<V1> pt1Vals = e.getValue().getAll(t1);
      V2 pt2Val = e.getValue().getOnly(t2);
      ... Do Something ....
     c.output(...some T...);
   }
 }));

关于google-cloud-platform - 如何将多个 PCollection 组合在一起并将其作为 ParDo 函数的输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33100513/

相关文章:

google-cloud-dataflow - 是否可以使用自定义包运行 Cloud Dataflow?

google-bigquery - Google Dataflow 作业和 BigQuery 在不同区域失败

hadoop - 谷歌云存储的 S3Guard 或 s3committer

docker - 单节点kubernetes集群扩展

google-compute-engine - 如何获取 Google Compute Engine 实例的机器类型?

java - Apache Beam/Google 数据流 - 错误处理

google-bigquery - 从 ParDo 函数内写入 BigQuery

google-cloud-dataflow - 运行数据流时出现问题

google-cloud-platform - 如何为 Google Cloud Compute Engine 打开一个或所有端口

google-cloud-platform - 谷歌云 Stackdriver : Metric grouped by ip