python - Pyspark java.lang.OutOfMemoryError : Requested array size exceeds VM limit 错误

标签 python scala hadoop apache-spark pyspark

我正在运行 Pyspark 作业:

spark-submit --master yarn-client --driver-memory 150G --num-executors 8 --executor-cores 4 --executor-memory 150G benchmark_script_1.py hdfs:///tmp/data/sample150k 128 hdfs:///tmp/output/sample150k | tee ~/output/sample150k.log

工作本身非常标准。它只是抓取一些文件并对它们进行计数。:

print(str(datetime.now()) + " - Ingesting files...")
files = sc.wholeTextFiles(inputFileDir, partitions)
fileCount = files.count()
print(str(datetime.now()) + " - " + str(fileCount) + " files ingested")

源文件夹包含约 150'000 个文件。没有复制大约是 35G,有复制大约是 105G。相当沉重但不疯狂。

运行上面的代码会得到以下堆栈跟踪:

15/08/11 15:39:20 WARN TaskSetManager: Lost task 61.3 in stage 0.0 (TID 76, <NODE>): java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
        at java.io.DataInputStream.read(DataInputStream.java:100)
        at org.spark-project.guava.io.ByteStreams.copy(ByteStreams.java:207)
        at org.spark-project.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:83)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)

可以在有问题的执行程序日志中找到更多信息:

15/08/11 12:28:18 ERROR executor.CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM
15/08/11 12:28:18 ERROR util.Utils: Uncaught exception in thread stdout writer for python
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.lang.StringCoding.encode(StringCoding.java:350)
        at java.lang.String.getBytes(String.java:939)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:573)
        at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:395)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 162, in manager
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 60, in worker
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/worker.py", line 126, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/serializers.py", line 528, in read_int
15/08/11 12:28:18 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[stdout writer for python,5,main]
java.lang.OutOfMemoryError: Requested array size exceeds VM limit
        at java.lang.StringCoding.encode(StringCoding.java:350)
        at java.lang.String.getBytes(String.java:939)
        at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:573)
        at org.apache.spark.api.python.PythonRDD$.org$apache$spark$api$python$PythonRDD$$write$1(PythonRDD.scala:395)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$1.apply(PythonRDD.scala:405)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
    raise EOFError
EOFError
Traceback (most recent call last):
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 162, in manager
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/daemon.py", line 60, in worker
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/worker.py", line 126, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/jars/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar/pyspark/serializers.py", line 528, in read_int
15/08/11 12:28:18 ERROR executor.Executor: Exception in task 7.0 in stage 0.0 (TID 5)
java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:794)
        at org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:833)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:897)
        at java.io.DataInputStream.read(DataInputStream.java:100)
        at org.spark-project.guava.io.ByteStreams.copy(ByteStreams.java:207)
        at org.spark-project.guava.io.ByteStreams.toByteArray(ByteStreams.java:252)
        at org.apache.spark.input.WholeTextFileRecordReader.nextKeyValue(WholeTextFileRecordReader.scala:83)
        at org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader.nextKeyValue(CombineFileRecordReader.java:69)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
        at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28)
        at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
        at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
        at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
    raise EOFError
EOFError

我已经禁用了 HDFS 缓存:

conf.set("fs.hdfs.impl.disable.cache", True)

请注意,在 Scala 中完全相同的脚本根本没有任何问题。

尽管这是一项大型工作,但仍有大量可用内存。任何人都知道问题可能是什么?

更新

为 JVM 分配更多内存。

export set JAVA_OPTS="-Xmx6G -XX:MaxPermSize=2G -XX:+UseCompressedOops"

遗憾的是,没有任何改善。

最佳答案

我在使用 spark-submit 和 Java 时遇到了类似的问题,保存了一个 8GB 的​​ DataFrame。具有 16 核、300GB RAM 的 Docker 容器。我还没有解决问题,但我遇到了几个可能的解决方法:

从第 77 页开始,Lightbend表明这是 shell 的问题,使用 @transient 或封装在对象中可能是一种解决方法。这似乎不适用于我们的任何一种情况。

DataBricks建议增加 spark.sql.shuffle.partitions可能有帮助。他们建议将默认的“200”更改为“400”。我在 spark-defaults.conf 中尝试过“800”和“2000”但仍然出现 OOM 错误。

DataBricks还建议调用 DataFrame.repartition(400)在代码中。或者,增加 partitions 的数量作为调用 sc.wholeTextFiles(inputFileDir, partitions) 的最后一个参数

JAVA_OPTS来自 StackOverflow 的建议不适用于 -XX:+UseCompressedOops如果堆大小大于 32GB,则禁用(在 Java 8 中)

编辑

还试过:

  • spark.default.parallelism=1000 (默认为核心数)。仍然是 OOM 错误。
  • dataFrame.repartition(1000)在代码中。仍然是 OOM 错误。

可能的解决方法

  • 使用中间体 RDD<LabeledPoint>允许我创建 DataFrame,但 Spark 反射模式不适用于 MLLib(缺少 numClasses 属性)。

    DataFrame df = sqlContext.createDataFrame(sc.parallelize(List<LabeledPoint>),LabeledPoint.class)

  • 使用中间 JSON 文件允许我创建 DataFrame 以使用 MLLib。

    saveAsJson(List<Row>/*generated data*/, filename); DataFrame df = sqlContext.read().json(filename)

关于python - Pyspark java.lang.OutOfMemoryError : Requested array size exceeds VM limit 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31945007/

相关文章:

python - 如何在 Python 中调整视频剪辑的大小

python - 高效计算 Khatri-Rao 类总和(成对行总和)

sqlite - Python sqlite3 : how to quickly and cleanly interrupt long running query with, 例如,KeyboardInterrupt

scala - 多个 onCompletion 在 Scala 的 Futures 中起作用

scala - 在 Java 中的 scala.collection.immutable.list 中间添加元素

linux - 如何创建具有不同操作系统的 Hadoop 集群?

python - 如果python脚本包含#!/usr/bin/python3,是否需要在外部指定python解释器?

json - 如何让 Spark 将 JSON 转义字符串字段解析为 JSON 对象以推断数据帧中的正确结构?

hadoop - 如何使用 ResourceManager HA wrt Hortowork 的 HDP 将 MR 作业提交到 YARN 集群?

hadoop - 从 SQL Server 导入数据的 Sqoop 作业卡在 Map 0%