我有大约六个 PCollection 作为 KV。我想通过将组合的 (6) 个 PCollection 作为 sideInput 来对另一个 PCollection 执行 ParDo。
我尝试将所有 6 个 PCollections 作为单独的 sideInput,如下所示
PCollection<TableRow> OutputRows = MyCollection.apply(ParDo.withSideInputs(Inp1, Inp2,...)
.of(new DoFn<KV<String, String>, TableRow>() {
...
}
但是当堆空间超出时,它会抛出 OutOfMemoryError 。请建议如何组合 PCollection 以作为另一个 PCollection 的输入。
最佳答案
Cloud Dataflow 提供了多种加入方式。
用作侧输入的PCollection
会广播给工作线程并加载到内存中。这听起来像是您正在做的事情,并且如果 PCollection
大小的总和太大,就会解释 OOM。
您提到这些值是键控的 - 另一种选择是使用 CoGroupByKey .
为此,您需要使用所有 PCollection
创建一个 KeyedPCollectionTuple
,然后您将得到一个包含每个键的所有值的结果。像这样使用 CoGroupByKey
会打乱数据,以便使用给定键的结果的 ParDo 只需读取关联的值:
PCollection<KV<K, V1>> inp1 = ...;
PCollection<KV<K, V2>> inp2 = ...;
final TupleTag<V1> t1 = new TupleTag<>();
final TupleTag<V2> t2 = new TupleTag<>();
PCollection<KV<K, CoGbkResult>> coGbkResultCollection =
KeyedPCollectionTuple.of(t1, inp1)
.and(t2, inp2)
.apply(CoGroupByKey.<K>create());
PCollection<T> finalResultCollection =
coGbkResultCollection.apply(ParDo.of(
new DoFn<KV<K, CoGbkResult>, T>() {
@Override
public void processElement(ProcessContext c) {
KV<K, CoGbkResult> e = c.element();
Iterable<V1> pt1Vals = e.getValue().getAll(t1);
V2 pt2Val = e.getValue().getOnly(t2);
... Do Something ....
c.output(...some T...);
}
}));
关于google-cloud-platform - 如何将多个 PCollection 组合在一起并将其作为 ParDo 函数的输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33100513/