我编写了一些代码来读取 parquet 文件,稍微切换架构并将数据写入新的 parquet 文件。代码如下:
...
val schema = StructType(
List(
StructField("id", LongType, false),
StructField("data", ArrayType(FloatType), false)
)
)
val data = sqlContext.read.parquet(file.getAbsolutePath)
val revisedData = data.map(r => Row(r.getInt(0).toLong, r.getSeq[Float](1)))
val df = sqlContext.createDataFrame(revisedData, schema)
Writer.writeToParquet(df)
与Writer
正在
object Writer {
def writeToParquet(df : DataFrame) : Unit = {
val future = Future {
df.write.mode(SaveMode.Append).save(path)
}
Await.ready(future, Duration.Inf)
}
}
对于大约 4 GB 的文件,我的程序中断,引发 OutOfMemoryError:Java 堆空间。我为执行器设置了 6 GB 内存(使用 -Dspark.executor.memory=6g
),提高了 JVM 堆空间(使用 -Xmx6g
),将 Kryo 序列化器缓冲区增加到 2 GB(使用 System.setProperty("spark.kryoserializer.buffer.mb", "2048")
)。但是,我仍然收到错误。
这是堆栈跟踪:
java.lang.OutOfMemoryError: Java heap space
at com.esotericsoftware.kryo.io.Output.<init>(Output.java:35)
at org.apache.spark.serializer.KryoSerializer.newKryoOutput(KryoSerializer.scala:76)
at org.apache.spark.serializer.KryoSerializerInstance.output$lzycompute(KryoSerializer.scala:243)
at org.apache.spark.serializer.KryoSerializerInstance.output(KryoSerializer.scala:243)
at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:247)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:236)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:744)
如何避免此错误?
最佳答案
根据我的评论,有两件事:
1) 您需要注意 spark.kryoserializer.buffer.mb
属性名称,在最新的 Spark 中,他们将其更改为 spark.kryoserializer.buffer
并且spark.kryoserializer.buffer.max
。
2) 你必须小心缓冲区的大小和堆的大小,它必须足够大才能存储你正在编写的单个记录,但不能太多,因为 kryo 正在创建一个显式的 byte[ ]
该大小(并且为 2GB 分配单个 byte
数组通常是一个坏主意)。尝试使用适当的属性降低缓冲区大小。
关于scala - Spark : Read and Write to Parquet leads to OutOfMemoryError: Java heap space,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32498891/