在我的 Flink 程序中,我使用 flatMap
操作转换数据,该操作将多个数据 block 划分为多个较小的 block 。这些 block 具有描述它们在各自原始 block 中的位置的“位置”属性。现在我使用groupReduce
,它需要转换共享相同“位置”属性的所有小块。因此它应该可以轻松地分布在多个节点上。但是,当我在多个节点上运行程序时,groupReduce
将以 1 的 dop 执行。
我猜这是因为我只有一个 DataSet
,但 Flink Java API 中似乎没有 GroupedDataSet
。是否有另一种可能性可以增强我的 groupReduce
转换的 dop?
这是我正在使用的代码(忽略“细节”的虚拟代码):
DataSet<SlicedTile> slicedTiles = tiles.flatMap()
.groupBy(position)
.sortGroup(time)
.getDataSet()
//Until here the dop is correct
DataSet<SlicedTile> processedSlicedTiles = slicedTiles.reduceGroup;
最佳答案
您的代码的问题在于 getDataSet()
调用。它返回分组操作的输入。因此,slicedTiles
表示的数据集既未分组,也未对其组进行排序,而是 flatMap
转换和 groupBy
和程序中根本不考虑 sortGroup
调用。
对非分组数据集应用groupReduce
(或reduce
)操作始终是非并行操作,因为输入数据集的所有元素都被处理为单组。
逻辑上,三个转换groupBy().sortGroup().reduceGroup()
属于一起,并被转换为单个groupReduce
运算符(如果GroupReduceFunction
是可组合的)。
如果您按如下方式更改实现,它应该按预期工作。
DataSet<SlicedTile> slicedTiles = tiles.flatMap()
.groupBy(position)
.sortGroup(time)
.reduceGroup(yourFunction);
我将打开一个 JIRA 问题,将 JavaDocs 添加到 Grouping.getDataSet()
方法,以记录此函数的行为。
关于java - 增强groupReduce变换的并行化程度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34560124/