scala - 依靠 Spark Dataframe 的速度非常慢

标签 scala apache-spark count spark-dataframe rdd

我正在创建一个新的 DataFrame,其中包含来自 Join 的少量记录。

val joined_df = first_df.join(second_df, first_df.col("key") ===
second_df.col("key") && second_df.col("key").isNull, "left_outer")
joined_df.repartition(1)
joined_df.cache()
joined_df.count()

除了计数操作外,一切都很快(不到一秒)。 RDD 转换开始了,实际上需要几个小时才能完成。有什么办法可以加快速度吗?
INFO MemoryStore: Block rdd_63_140 stored as values in memory (estimated size 16.0 B, free 829.3 MB)
INFO BlockManagerInfo: Added rdd_63_140 in memory on 192.168.8.52:36413 (size: 16.0 B, free: 829.8 MB)
INFO Executor: Finished task 140.0 in stage 10.0 (TID 544). 4232 bytes result sent to driver
INFO TaskSetManager: Starting task 142.0 in stage 10.0 (TID 545, localhost, executor driver, partition 142, PROCESS_LOCAL, 6284 bytes)
INFO Executor: Running task 142.0 in stage 10.0 (TID 545)
INFO TaskSetManager: Finished task 140.0 in stage 10.0 (TID 544) in 16 ms on localhost (executor driver) (136/200)
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms

最佳答案

Everything is fast (under one second) except the count operation.



其理由如下:count 之前的所有操作被称为转换,这种类型的 Spark 操作是惰性的,即它在调用操作之前不进行任何计算(在您的示例中为 count)。

第二个问题是在repartition(1) :

请记住,您将失去 spark 提供的所有并行性,并且您的计算将在一个执行程序中运行(如果您处于独立模式,则为核心),因此您必须删除此步骤或更改 1 与 CPU 内核数量(独立模式)或执行程序数量(集群模式)相关的数字。

The RDD conversion kicks in and literally takes hours to complete.



如果我理解正确,您会隐藏 DataFrameRDD ,这在 Spark 中确实是一种不好的做法,您应该尽可能避免这种转换。
这是因为 DataFrame 中的数据和 Dataset使用 编码特殊 Spark 编码器 (如果我记得很清楚,它被称为 tungstant)它比 JVM 序列化编码器占用的内存少得多,所以这种转换意味着 spark 将改变你自己的数据类型(需要 少得多的内存 并让 spark 优化 很多交换,只处理编码数据而不是序列化要使用的数据,然后将其反序列化)到 JVM 数据类型,这就是为什么 DataFrame s 和 Dataset s 比RDD 非常强大秒

希望这对你有帮助

关于scala - 依靠 Spark Dataframe 的速度非常慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45142105/

相关文章:

scala - Play2 和 Scala,我应该如何配置集成测试以使用正确的数据库运行

apache-spark - Spark 中的 saveAsTextFile 函数是否将数据传输到驱动程序?

sql - 如何对具有 NULL 值的字段进行分组?

sql - Count的两种使用方式,是等价的吗?

c++ - 计算 map 中相同值的数量

Scala扩展while循环到do-until表达式

scala - 无法使用Gradle和Java库依赖项构建Scala程序

scala - 为什么在这个 Scala 方法定义中有两组参数/括号?

apache-spark - Airflow SparkKubernetes运算符(operator)日志记录

python - Apache Spark 读取 S3 : can't pickle thread. 锁对象