scala - Spark 中两个大型数据集之间的交叉连接

标签 scala apache-spark apache-spark-sql

我有 2 个大型数据集。 第一个数据集包含大约 1.3 亿个条目。
第二个数据集包含大约 40000 个条目。 数据是从 MySQL 表中获取的。

我需要做一个交叉连接,但我得到了

java.sql.SQLException: GC overhead limit exceeded

在 Scala 中执行此操作的最佳技术是什么?

以下是我的代码片段:

val df1 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table1,"id",100,100000,40, MySqlConnection.getConnectionProperties))
val df2 = (spark.read.jdbc(jdbcURL,configurationLoader.mysql_table2, MySqlConnection.getConnectionProperties))
val df2Cache = df2.repartition(40).cache()
val crossProduct = df1.join(df2Cache)

df1 是较大的数据集,df2 是较小的数据集。

最佳答案

130M*40K = 52 万亿条记录是存储此数据所需的 52 TB 内存,这是假设每条记录为 1 字节,这肯定不是真的。如果它多达 64 字节(我认为这也是一个非常保守的估计),您将需要 3.32 PB (!) 的内存来存储数据。这是一个非常大的数量,因此除非您有一个非常大的集群并且该集群内有非常快的网络,否则您可能需要重新考虑您的算法以使其正常工作。

也就是说,当您对两个 SQL 数据集/数据帧进行连接时,Spark 用于存储连接结果的分区数量由 spark 控制。 sql.shuffle.partitions 属性(参见 here )。您可能希望将其设置为一个非常大的数字,并将执行程序的数量设置为您可以设置的最大数量。然后您可能能够将处理运行到最后。

此外,您可能需要查看 spark.shuffle.minNumPartitionsToHighlyCompress选项;如果您将它设置为小于您的随机分区数,您可能会得到另一个内存提升。请注意,在最近的 Spark 版本之前,此选项是一个硬编码常量设置为 2000,因此根据您的环境,您只需将 spark.sql.shuffle.partitions 设置为大于 2000 的数字即可使用它。

关于scala - Spark 中两个大型数据集之间的交叉连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54154181/

相关文章:

scala - 获取 Scala 中 List[Option] 中具有值的所有条目

java - 如何在 Spark Streaming 应用程序中从 Kafka 接收 Java 对象

python - 如何将 Vector 拆分为列 - 使用 PySpark

apache-spark - 如何使用 yarn-cluster master 获取进度条(带有阶段和任务)?

java - 从数据框中的列中删除特殊字符

python - 有没有办法计算 Spark df 中每行的非空值?

scala - 带有可变参数的案例类的隐式 jsonFormat

scala - ~> 运算符在 Spray.io 中到底意味着什么?

python - Spark Streaming - 计算状态中的不同元素

hadoop - 无法通过 Gremlin Shell 安装 Hadoop 和 Spark