java - 增强groupReduce变换的并行化程度

标签 java apache-flink

在我的 Flink 程序中,我使用 flatMap 操作转换数据,该操作将多个数据 block 划分为多个较小的 block 。这些 block 具有描述它们在各自原始 block 中的位置的“位置”属性。现在我使用groupReduce,它需要转换共享相同“位置”属性的所有小块。因此它应该可以轻松地分布在多个节点上。但是,当我在多个节点上运行程序时,groupReduce 将以 1 的 dop 执行。

我猜这是因为我只有一个 DataSet,但 Flink Java API 中似乎没有 GroupedDataSet 。是否有另一种可能性可以增强我的 groupReduce 转换的 dop?

这是我正在使用的代码(忽略“细节”的虚拟代码):

DataSet<SlicedTile> slicedTiles = tiles.flatMap()
    .groupBy(position)
    .sortGroup(time)
    .getDataSet()
    //Until here the dop is correct

DataSet<SlicedTile> processedSlicedTiles = slicedTiles.reduceGroup;

最佳答案

您的代码的问题在于 getDataSet() 调用。它返回分组操作的输入。因此,slicedTiles 表示的数据集既未分组,也未对其组进行排序,而是 flatMap 转换和 groupBy 和程序中根本不考虑 sortGroup 调用。

对非分组数据集应用groupReduce(或reduce)操作始终是非并行操作,因为输入数据集的所有元素都被处理为单组。

逻辑上,三个转换groupBy().sortGroup().reduceGroup()属于一起,并被转换为单个groupReduce运算符(如果GroupReduceFunction 是可组合的)。

如果您按如下方式更改实现,它应该按预期工作。

DataSet<SlicedTile> slicedTiles = tiles.flatMap()
    .groupBy(position)
    .sortGroup(time)
    .reduceGroup(yourFunction);

我将打开一个 JIRA 问题,将 JavaDocs 添加到 Grouping.getDataSet() 方法,以记录此函数的行为。

关于java - 增强groupReduce变换的并行化程度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34560124/

相关文章:

java - 谷歌 Java 风格 : checkstyle file with corresponding editorconfig file for Visual Studio Code

java - 监听器接口(interface)如何或为何工作?除了作为监听器之外,接口(interface)还有其他用途吗?

java - 如何在Java中对包含多个句点/点的字符串进行排序?

java - 带有图像的按钮

apache-flink - Flink,使用多个Kafka源时如何正确设置并行性?

amazon-web-services - Flink 到 dynamo Sink

apache-kafka - Flink Kafka Producer 中的 Exactly-once 语义

apache-flink - 在 Flink 中,我可以在同一个槽中有一个运算符(operator)的多个子任务吗?

java - 运行 Windows 服务时为 "Failed to create java"

docker - Flink 1.5.4 没有在 Kubernetes 中注册 Google Cloud Storage (GCS) 文件系统,尽管它可以在 docker 容器中运行