java - 加入两个大量的 PCollection 有性能问题

标签 java google-cloud-dataflow apache-beam

使用 CoGroupsByKey 方法加入两个 Pcollection 需要数小时才能执行超过 8 百万条记录。从另一个 stackoverflow 帖子中注意到 CoGbkResult has more than 10000 elements,reiteration (which may be slow) is required “CoGbkResult 有超过 10000 个元素,需要重复(这可能很慢)。”

使用此方法改进此性能的任何建议。

这是代码片段,

PCollection<TableRow> pc1 = ...;
PCollection<TableRow> pc2 = ...;

WithKeys<String, TableRow> withKeyValue = 
  WithKeys.of((TableRow row) -> String.format("%s",row.get("KEYNAME")))
          .withKeyType(TypeDescriptors.strings());

PCollection<KV<String,TableRow>> keyed_pc1 =
  pc1.apply("WithKeys", withKeyValue );

PCollection<KV<String,TableRow>> keyed_pc2 = 
  pc2.apply("WithKeys", withKeyValue );

// (org.apache.beam.sdk.extensions.joinlibrary.Join class)
PCollection<KV<String,KV<TableRow,TableRow>>> joinedCollection = 
  Join.innerJoin(keyed_pc1, keyed_pc2); 

最佳答案

Apache Beam 规范没有定义连接的执行,除了 SDK 之外,没有更快的自己编写内部连接的方法。因此,这个问题的答案取决于执行连接的是什么,即哪个运行者。我不知道 Flink 或 Spark 运行器,所以这个答案将特定于 Dataflow 运行器。

如果您还没有,请查看关于此 topic 的博文.在博客文章中,它描述了可以手动启用的 Dataflow Shuffle 服务。此服务是比当前默认服务更好的实现,通常执行速度更快,尤其是对于联接。

要启用 Dataflow Shuffle 服务,请传入以下 flags :

--experiments=shuffle_mode=service
--region=<allowed region>

允许随机播放的区域是:“us-central1”、“europe-west1”、“europe-west4”、“asia-northeast1”。

关于java - 加入两个大量的 PCollection 有性能问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56950728/

相关文章:

Java 套接字流意外结束

java - Android 中类 fragment 膨胀的奇怪错误

google-cloud-dataflow - "GC overhead limit exceeded"用于长时间运行的流数据流作业

docker - 如何使用自定义 Docker 镜像运行 Python Google Cloud Dataflow 作业?

java - Apache Beam/Dataflow - PubSub 丢失消息

java - lang.ClassCastException : java. lang.Integer android xml rpc

java - 如何检查另一个Jframe中的按钮是否被单击

google-cloud-dataflow - 在批处理管道中,如何为来自批处理源的数据分配时间戳,例如 Beam 管道中的 csv 文件

python - 数据流: update BigQuery rows with python pipeline

python-2.7 - 导入 apache_beam 元类冲突