我对集群上的执行流程有一个奇怪的问题。
方法 A 调用
- 在 FOREACH 中调用的方法 B
- 方法 C
执行流程应该是
Method A --> Method B --> Method C
但它是这样工作的:
1) Method A --> Method B (skips Method C) and continues to rest of Method B.
2) Method C is executed later separately.
因为流程不是正确的流程,
accum1.value
在方法 B 中显示为 blank/null
.**CLASS A::METHOD A:**
object TakeDFs {
def takeDFs(df: DataFrame): Unit = {
println("---------------- takeNettedDFs::START ---------------- ")
for(i <- 0 until bySecurityArray.length) {
allocProcessDF = bySecurityArray(i).toDF()
....
//WORKS
AllocOneProcess.getAllocOneDFs(allocProcessDF)
}
println("---------------- takeNettedDFs::END ---------------- ")
}
}
**CLASS B::METHOD B:**
object AllocOneProcess {
def getAllocOneDFs(df: DataFrame): Unit = {
println("---------------- getAllocOneDFs::START ---------------- ")
df.coalesce(1).sort($"PRIORITY" asc).foreach( {
row => AllocOneTest.allocProcessTest(row)
})
println("------------- getAllocOneDFs::accum1.value -------------" + accum1.value)
println("---------------- getAllocOneDFs::END ---------------- ")
}
}
**CLASS C::METHOD C:**
object AllocOneTest {
def allocProcessTest(row: Row): Unit = {
println("---------------- AllocOneTest::allocProcessTest::START ---------------- ")
accum1.add(RegRptPilotConstants.PairProcessCaseClass(row(0).asInstanceOf[String], row(1).asInstanceOf[String], row(2).asInstanceOf[String]))
println("---------------- AllocationOneTest::allocProcessTest::END ---------------- ")
}
}
**CLASS D::**
object RegRptPilotConstants {
var pairedOneSeq = Seq[PairProcessCaseClass]()
val accum1 = new ProcessAccumulator[ProcessCaseClass]()
}
最佳答案
对于上面的代码,foreach 是触发 Spark DAG 执行整个流程的 Action 。 Action ,即上述场景中的 foreach 是在每个分区上同时并行执行/调用的,因此最终调用了 foreach 内部的方法。
流程:方法 A --> 方法 B --> 方法 C、方法 C、方法 C ...
df.coalesce(1).sort($"PRIORITY" asc).foreach( {
row => AllocOneTest.allocProcessTest(row)
})
引用:https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions
关于scala - 在 Spark/Scala 中使用 ForEach 时的执行流程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58237522/