我仅在一个 Parallelism = 1
节点上运行 Flink
,以便将其性能与单线程应用程序进行比较。我想知道 Flink 是否仍在使用 Shuffle,尽管它不是并行运行的。所以如果例如执行以下命令:
var counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
.map { (_, 1) }
.groupBy(0)
.sum(1)
Shuffle
会在 groupBy
之前使用吗?有没有办法检查这个?
(在交互式 Scala Shell
的输出中,有一个 FlatMap
、Map
、Combine
以及最后a Reduce
进行观察。这同样适用于使用 Parallelism > 1
运行时。)
最佳答案
Flink 为操作 ds.groupBy(0).sum(1)
生成独立于实际并行度的作业图 ... ->Combiner ->Reducer
。在 Combiner
和 Reducer
之间引入了一个哈希分区器(洗牌步骤)。这对于所有并行度 > 1
都有意义。
对于parallelism = 1
,优化器理论上可以删除洗牌步骤,因为它不是必需的。然而,它实际上不应该影响程序的性能。
原因是对于parallelism = 1
,所有工作都将在本地组合器中完成。这意味着组合器计算结果总和,然后仅将单个元素发送到 reducer 。此外,由于组合器和 reducer 在同一台机器上运行,因此不涉及网络通信。数据只是通过移交内存段来传输。由于 Flink 还支持流式混洗,因此组合器甚至不必在第一个结果发送到 reducer 之前完成。组合器和 reducer 可以同时运行,从而避免中间结果的具体化。
关于apache-flink - 弗林克 : Shuffle with Parallelism = 1,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34256315/