scala - 在 Spark/Scala 中使用 ForEach 时的执行流程

标签 scala apache-spark hadoop apache-spark-sql hdfs

我对集群上的执行流程有一个奇怪的问题。

方法 A 调用
- 在 FOREACH 中调用的方法 B
- 方法 C

执行流程应该是

Method A --> Method B --> Method C 

但它是这样工作的:
1) Method A --> Method B (skips Method C) and continues to rest of Method B.
2) Method C is executed later separately. 

因为流程不是正确的流程,accum1.value在方法 B 中显示为 blank/null .
**CLASS A::METHOD A:**

object TakeDFs {

    def takeDFs(df: DataFrame): Unit = {
        println("---------------- takeNettedDFs::START ---------------- ")

        for(i <- 0 until bySecurityArray.length) {
            allocProcessDF = bySecurityArray(i).toDF()
            ....

            //WORKS
            AllocOneProcess.getAllocOneDFs(allocProcessDF)

            }
        println("---------------- takeNettedDFs::END ---------------- ")

    }
}

**CLASS B::METHOD B:**

object AllocOneProcess {

    def getAllocOneDFs(df: DataFrame): Unit = {
        println("---------------- getAllocOneDFs::START ---------------- ")

        df.coalesce(1).sort($"PRIORITY" asc).foreach( {
        row => AllocOneTest.allocProcessTest(row)
        })

        println("------------- getAllocOneDFs::accum1.value -------------" + accum1.value)

        println("---------------- getAllocOneDFs::END ---------------- ")

    }
}

**CLASS C::METHOD C:**

object AllocOneTest {

    def allocProcessTest(row: Row): Unit =  {
        println("---------------- AllocOneTest::allocProcessTest::START ---------------- ")

        accum1.add(RegRptPilotConstants.PairProcessCaseClass(row(0).asInstanceOf[String], row(1).asInstanceOf[String], row(2).asInstanceOf[String]))


        println("---------------- AllocationOneTest::allocProcessTest::END ---------------- ")

    }
}

**CLASS D::**

object RegRptPilotConstants {
    var pairedOneSeq = Seq[PairProcessCaseClass]()
    val accum1 = new ProcessAccumulator[ProcessCaseClass]()

}

最佳答案

对于上面的代码,foreach 是触发 Spark DAG 执行整个流程的 Action 。 Action ,即上述场景中的 foreach 是在每个分区上同时并行执行/调用的,因此最终调用了 foreach 内部的方法。

流程:方法 A --> 方法 B --> 方法 C、方法 C、方法 C ...

df.coalesce(1).sort($"PRIORITY" asc).foreach( {
    row => AllocOneTest.allocProcessTest(row)
})

引用:https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions

关于scala - 在 Spark/Scala 中使用 ForEach 时的执行流程,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58237522/

相关文章:

scala - 在Scala中从初始对象和生成下一个对象的函数创建O(1)内存可迭代

scala - 增加任务大小 spark

hadoop - HDFS中数据可用性的事件通知?

hadoop - 错误:SQoop在HDFS中添加记录

scala - 环境 monad 中术语环境的含义

scala - 使用 self 类型扩展 trait 时非法继承

斯卡拉玩 2.0 : Is it possible to use case classes in "routes"

java - "main"java.lang.ClassCastException : [Lscala. Tuple2;无法在 Spark MLlib LDA 中转换为 scala.Tuple2

apache-spark - 在 Apache Spark 中延迟加载分区 Parquet

apache-spark - 使用 Airflow dag run 创建 EMR 集群,任务完成后 EMR 将终止