scala - Spark数据框似乎被重新计算了两次

标签 scala apache-spark hadoop hive apache-spark-sql

解决了:我解决了这个问题,这是由于流程的第一个段落中有一个非常愚蠢,愚蠢,愚蠢的错误。
基本上,我正在计算一个写入Hive表的数据框。然后,需要经过很多次使用才能使用此数据框来创建temporaryDF,但我最初是从头开始查询表,而不是使用要在表中写入的数据框的副本。错误的原因在于,刚刚计算出的数据帧缺少先前的分区(由于流程的特定逻辑),而用于创建temporaryDF的后续计算也至少需要两个先前的分区。我不知道为什么,我不记得什么时候,我决定缓存刚刚计算出的分区,从而丢失了信息并在Oozie下得到了一个空分区(在Spark-Shell中,我总是至少使用三个分区,因为一段时间后手动更新表格-每个新分区每15分钟出现一次)。我可能正在深夜工作,而我的大脑认为值得将其弄乱。
我赞成并接受@thebluephantom的答案,因为他在我描述的特定情况下是正确的。
原始:
我在Oozie工作流程下与Spark-Submit一起在Hadoop 2中将Spark-Shell与Spark v.2.2.0.2.6.4.105-1(使用Scala)结合使用时出现了奇怪的行为。
我有一个Hive表,其中包含每15分钟跟踪一些进程的记录。每次都用仍满足目标过程逻辑的新记录或“旧”记录覆盖该表。
我通过称为times_investigated的列(从1到9)跟踪记录的“年龄”。
我创建了一个临时数据框,我们将其称为temporayDF,其中包含旧条目和新条目(两种类型都必须存在才能运行有用的计算)。然后根据temporayDF$"times_investigated" === 1(或$"times_investigated > 1")在新条目和旧条目之间划分此=!= 1
然后,将处理后的条目与最终数据帧中的union合并,然后将其写入原始Hive表中。

// Before, I run the query on the 'old' Hive table and the logic over old and new entries.
// I now have a temporary dataframe
val temporaryDF = previousOtherDF
                  .withColumn("original_col_new", conditions)
                  .withColumn("original_other_col_new", otherConditions)
                  .withColumn("times_investigated_new", nvl($"times_investigated" + 1, 1))
                  .select(
                    previousColumns,
                    $"original_col_new".as("original_col"),
                    $"original_other_col_new".as("original_other_col"),
                    $"times_investigated_new".as("times_investigated"))
                    .cache
                  
                  

// Now I need to split the temporayDF in 2 to run some other logic on the new entries.
val newEntriesDF = temporaryDF
                    .filter($"times_investigated" === 1)
                    .join(neededDF, conditions, "leftouter")
                    .join(otherNeededDF, conditions, "leftouter")
                    .groupBy(cols)
                    .agg(min(colOne),
                         max(colTwo),
                         min(colThree),
                         max(colFour))
                    .withColumn("original_col_five_new",
                                when(conditions).otherwise(somethingElse))
                    .withColumn("original_col_six_new",
                                when(conditions).otherwise(somethingElse)) 
                    .select(orderedColumns)
                    

val oldEntriesDF = temporaryDF.filter($"times_investigated" > 1)

val finalTableDF = oldEntriesDF.union(newEntriesDF)

// Now I write the table
finalTableDF.createOrReplaceTempView(tempFinalTableDF)
sql("""INSERT OVERWRITE TABLE  $finalTableDF 
       SELECT * FROM  tempFinalTableDF """)

// I would then need to re-use the newly-computed table to process further information...
问题:
Hive表不显示times_investigated = 1的新条目。它仅处理旧条目,因此,在这9次条目可以保留在表中之后,它完全变空。
我在Spark-Shell中运行了一些测试,并且所有迭代都能完美运行,即使从 shell 手动写入Hive表也能在Hive表中产生预期的结果,但是当我在Oozie下启动工作流程时,奇怪的行为再次出现。
我在Spark-Shell中注意到的是,在编写Hive表之后,如果我要计算temporaryDF.show(),则新条目将更新为$"times_investigated" = 2!
我试图创建temporaryDF的副本,以使用新条目和旧条目在单独的数据帧上工作,但是在编写Hive表之后,此copyOfTemporaryDF也得到了更新。
似乎这种重新计算是在写Oozie下的Hive表之前发生的。
我知道我可以用不同的方式来计算操作,但是如果可能的话,我需要找到一种针对当前流程的快速临时解决方案。
最重要的是,我很想了解引擎盖下发生的事情,以避免以后再陷入这种情况。
你们有什么线索和/或建议吗?
我尝试缓存中间数据帧,但没有成功。
附言对不起,可能是不好的编码习惯
编辑。 更多上下文:temporaryDF来自其他中间数据帧,仅用于一次计算感兴趣的数据。创建temporaryDF的最后一段是withColumn操作,其中$"times_investigated"是使用自定义nvl函数更新的(与SQL完全一样),并且从未在旧版本的流程中引起问题(请参见下面的段落)。
Edit2:我还尝试将一个长链序列中的新条目和旧条目的操作合并,以便temopraryDF实际上是要在Hive表中写入的最终数据帧,但仍不考虑times_investigated = 1的新条目。 (但是,写入表后,通过数据表的Spark-Shell.show对数据帧进行重新计算使其没有问题,因此调查的时间为+1)。

最佳答案

使用.cache,否则您将得到重新计算。如果要在单个Spark应用程序中多次使用RDD或DF,则应该对相应的数据帧或RDD执行此操作-甚至不依赖于Action,有时甚至会“跳过阶段”。

val temporaryDF = previousOperations...cache()
如您所见,2个v​​al使用temporaryDF,并且无需缓存重新计算,它们可能会给出不同的结果。那应该被缓存。
当然,如果 worker 死亡,或分区被驱逐,则需要重新计算。
.cache对于大于可用群集内存的数据集可能不是理想的选择。逐出的每个分区都将从源中重建,这是一项昂贵的事务。
同样,使用适当的分区和迭代几次比持久/缓存要好;但这一切取决于。

关于scala - Spark数据框似乎被重新计算了两次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62533853/

相关文章:

Scala Curried 类型不匹配

scala - 使用私有(private)构造函数参数扩展特征

scala - akka中FastFuture有什么用

java - sbt 不会组装 Spark

hadoop - Hive命令行-列名中的反引号问题

scala - jOOQ 将自定义 SQL 获取到映射器中

apache-spark - 获取数组中项目的索引,该数组是 Spark 数据帧中的一列

python - (Py)Spark中如何使用JDBC源读写数据?

hadoop - 通过 Knox 获取与 Hive 的 JDBC 连接时出错

hadoop - 在 Hadoop 上并行化执行决策树 ID3/C4.5