我正在创建一个新的 DataFrame,其中包含来自 Join 的少量记录。
val joined_df = first_df.join(second_df, first_df.col("key") ===
second_df.col("key") && second_df.col("key").isNull, "left_outer")
joined_df.repartition(1)
joined_df.cache()
joined_df.count()
除了计数操作外,一切都很快(不到一秒)。 RDD 转换开始了,实际上需要几个小时才能完成。有什么办法可以加快速度吗?
INFO MemoryStore: Block rdd_63_140 stored as values in memory (estimated size 16.0 B, free 829.3 MB)
INFO BlockManagerInfo: Added rdd_63_140 in memory on 192.168.8.52:36413 (size: 16.0 B, free: 829.8 MB)
INFO Executor: Finished task 140.0 in stage 10.0 (TID 544). 4232 bytes result sent to driver
INFO TaskSetManager: Starting task 142.0 in stage 10.0 (TID 545, localhost, executor driver, partition 142, PROCESS_LOCAL, 6284 bytes)
INFO Executor: Running task 142.0 in stage 10.0 (TID 545)
INFO TaskSetManager: Finished task 140.0 in stage 10.0 (TID 544) in 16 ms on localhost (executor driver) (136/200)
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
最佳答案
Everything is fast (under one second) except the count operation.
其理由如下:
count
之前的所有操作被称为转换,这种类型的 Spark 操作是惰性的,即它在调用操作之前不进行任何计算(在您的示例中为 count
)。第二个问题是在
repartition(1)
:请记住,您将失去 spark 提供的所有并行性,并且您的计算将在一个执行程序中运行(如果您处于独立模式,则为核心),因此您必须删除此步骤或更改 1 与 CPU 内核数量(独立模式)或执行程序数量(集群模式)相关的数字。
The RDD conversion kicks in and literally takes hours to complete.
如果我理解正确,您会隐藏
DataFrame
到 RDD
,这在 Spark 中确实是一种不好的做法,您应该尽可能避免这种转换。这是因为
DataFrame
中的数据和 Dataset
使用 编码特殊 Spark 编码器 (如果我记得很清楚,它被称为 tungstant)它比 JVM 序列化编码器占用的内存少得多,所以这种转换意味着 spark 将改变你自己的数据类型(需要 少得多的内存 并让 spark 优化 很多交换,只处理编码数据而不是序列化要使用的数据,然后将其反序列化)到 JVM 数据类型,这就是为什么 DataFrame
s 和 Dataset
s 比RDD
非常强大秒希望这对你有帮助
关于scala - 依靠 Spark Dataframe 的速度非常慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45142105/