java - 无法在 PySpark 本地模式下加载 25GB 数据集,可用 56GB RAM

标签 java python apache-spark pyspark heap-memory

我在具有 12 核/64GB RAM 的本地模式下在一台功能强大的机器上加载和处理 25GB Parquet 数据集(来自 stackoverflow.com 帖子)时遇到问题。

我的机器上分配给 pyspark 的内存比 Parquet 数据集(更不用说数据集的两列)的大小还多,但我无法运行任何操作一旦我加载它,就在 DataFrame 上。这很令人困惑,我不知道该怎么做。

具体来说,我有一个 25GB 的 Parquet 数据集:

$ du -sh data/stackoverflow/parquet/Posts.df.parquet

25G data/stackoverflow/parquet/Posts.df.parquet

我有一台有 56GB 可用内存的机器:

$ free -h

              total        used        free      shared  buff/cache   
available
Mem:            62G        4.7G         56G         23M        1.7G         
57G
Swap:           63G          0B         63G

我已将 PySpark 配置为使用 50GB RAM(已尝试调整 maxResultSize 无效)。

我的配置是这样的:

$ cat ~/spark/conf/spark-defaults.conf

spark.io.compression.codec org.apache.spark.io.SnappyCompressionCodec
spark.driver.memory 50g
spark.jars ...
spark.executor.cores 12
spark.driver.maxResultSize 20g

我的环境是这样的:

$ cat ~/spark/conf/spark-env.sh

PYSPARK_PYTHON=python3
PYSPARK_DRIVER_PYTHON=python3
SPARK_WORKER_DIR=/nvm/spark/work
SPARK_LOCAL_DIRS=/nvm/spark/local
SPARK_WORKER_MEMORY=50g
SPARK_WORKER_CORES=12

我这样加载数据:

$ pyspark

>>> posts = spark.read.parquet('data/stackoverflow/parquet/Posts.df.parquet')

它加载正常,但任何操作 - 包括如果我首先在 DataFrame 上运行 limit(10) - 都会导致堆空间不足错误。

>>> posts.limit(10)\
    .select('_ParentId','_Body')\
    .filter(posts._ParentId == 9915705)\
    .show()

[Stage 1:>                                                       (0 + 12) / 195]19/06/30 17:26:13 ERROR Executor: Exception in task 7.0 in stage 1.0 (TID 8)
java.lang.OutOfMemoryError: Java heap space
19/06/30 17:26:13 ERROR Executor: Exception in task 3.0 in stage 1.0 (TID 4)
java.lang.OutOfMemoryError: Java heap space
19/06/30 17:26:13 ERROR Executor: Exception in task 5.0 in stage 1.0 (TID 6)
java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
at org.apache.parquet.bytes.HeapByteBufferAllocator.allocate(HeapByteBufferAllocator.java:32)
at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1166)
at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
19/06/30 17:26:13 ERROR Executor: Exception in task 10.0 in stage 1.0 (TID 11)
java.lang.OutOfMemoryError: Java heap space
    at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
    at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
    at org.apache.parquet.bytes.HeapByteBufferAllocator.allocate(HeapByteBufferAllocator.java:32)
    at org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1166)
    at org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.checkEndOfRowGroup(VectorizedParquetRecordReader.java:301)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:256)
    at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:181)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.scan_nextBatch_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
19/06/30 17:26:13 ERROR Executor: Exception in task 6.0 in stage 1.0 (TID 7)
java.lang.OutOfMemoryError: Java heap space
19/06/30 17:26:13 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 7,5,main]
java.lang.OutOfMemoryError: Java heap space
19/06/30 17:26:13 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 11,5,main]
java.lang.OutOfMemoryError: Java heap space
...

以下将运行,表明问题出在 _Body 字段(显然是最大的):

>>> posts.limit(10).select('_Id').show()

+---+
|_Id|
+---+
|  4|
|  6|
|  7|
|  9|
| 11|
| 12|
| 13|
| 14|
| 16|
| 17|
+---+

我该怎么办?我可以使用 EMR,但我希望能够在本地加载此数据集,在这种情况下,这似乎是一件完全合理的事情。

最佳答案

Spark 存储和计算的默认内存分数是0.6。根据您的配置,它将是 0.6 * 50GB = 30GB。但是数据在内存中的表示可能比序列化磁盘版本占用更多空间。

请查看Memory Management部分以获取更多详细信息。

关于java - 无法在 PySpark 本地模式下加载 25GB 数据集,可用 56GB RAM,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56829010/

相关文章:

python - python flask OIDC 应用程序的 flask-OIDC 替代品

python - matplotlib动画电影: quality of movie decreasing with time

sql - Spark SQL中如何计算满足某个条件的总数的百分比

java - 如何使用非 Lambda 函数定义 Spark RDD 转换

java - 如何避免 Java 中不必要的类引用?

java - 如何在 Joda 日期/时间中添加 1 秒?

java - 在 Java 中隐藏输出文本

java - Micronaut 中是否有 DropwizardTestSupport 的等效项?

python - databricks:检查挂载点是否已挂载

java - 为什么在设置spark上下文时出现此错误?