scala - Spark : Read and Write to Parquet leads to OutOfMemoryError: Java heap space

标签 scala apache-spark apache-spark-sql

我编写了一些代码来读取 parquet 文件,稍微切换架构并将数据写入新的 parquet 文件。代码如下:

...
val schema = StructType(
  List(
    StructField("id", LongType, false),
    StructField("data", ArrayType(FloatType), false)
  )
)

val data = sqlContext.read.parquet(file.getAbsolutePath)
val revisedData = data.map(r =>  Row(r.getInt(0).toLong, r.getSeq[Float](1)))
val df = sqlContext.createDataFrame(revisedData,  schema)

Writer.writeToParquet(df)

Writer正在

object Writer {
    def writeToParquet(df : DataFrame) : Unit = {
       val future = Future {
         df.write.mode(SaveMode.Append).save(path)
       }

       Await.ready(future, Duration.Inf)
    }
}

对于大约 4 GB 的文件,我的程序中断,引发 OutOfMemoryError:Java 堆空间。我为执行器设置了 6 GB 内存(使用 -Dspark.executor.memory=6g ),提高了 JVM 堆空间(使用 -Xmx6g ),将 Kryo 序列化器缓冲区增加到 2 GB(使用 System.setProperty("spark.kryoserializer.buffer.mb", "2048") )。但是,我仍然收到错误。

这是堆栈跟踪:

java.lang.OutOfMemoryError: Java heap space
  at com.esotericsoftware.kryo.io.Output.<init>(Output.java:35)
  at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:76)
  at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:243)
  at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:243)
  at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:247)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:744)

如何避免此错误?

最佳答案

根据我的评论,有两件事:

1) 您需要注意 spark.kryoserializer.buffer.mb 属性名称,在最新的 Spark 中,他们将其更改为 spark.kryoserializer.buffer 并且spark.kryoserializer.buffer.max

2) 你必须小心缓冲区的大小和堆的大小,它必须足够大才能存储你正在编写的单个记录,但不能太多,因为 kryo 正在创建一个显式的 byte[ ] 该大小(并且为 2GB 分配单个 byte 数组通常是一个坏主意)。尝试使用适当的属性降低缓冲区大小。

关于scala - Spark : Read and Write to Parquet leads to OutOfMemoryError: Java heap space,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32498891/

相关文章:

scala - 在 Spark 中分解结构列时出错

apache-spark - Spark.sql.autoBroadcastJoinThreshold 是否适用于使用数据集的联接运算符的联接?

scala - 使用 scala 从 URL 中提取主机名

scala - 在 REPL 中查找类型?

python - 无法在 ipython 中正确创建 spark 上下文以链接到 MySQL - com.mysql.jdbc.Driver

maven - Apache Spark 依赖问题

java - 无法从 play scala 应用程序 Play wav 文件

scala - 多项目设置 - 未检测到主类

apache-spark - 避免排队 Spark 微批处理

apache-spark - unix_timestamp()是否可以在Apache Spark中返回以毫秒为单位的unix时间?