val data = spark.read
.text(filePath)
.toDF("val")
.withColumn("id", monotonically_increasing_id())
val count = data.count()
val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0)
val columns = header
.replace("H|*|", "")
.replace("|##|", "")
.split("\\|\\*\\|")
val structSchema = StructType(columns.map(s=>StructField(s, StringType, true)))
var correctData = data.where('id > 1 && 'id < count-1).select("val")
var dataString = correctData.collect().map(s => s.getString(0)).mkString("").replace("\\\n","").replace("\\\r","")
var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{
var arr = s.split("\\|\\*\\|")
while(arr.length < columns.length) arr = arr :+ ""
RowFactory.create(arr:_*)
})
val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)
display(finalDF)
这部分代码给出错误:
Exception in thread "dispatcher-event-loop-0" java.lang.OutOfMemoryError: Java heap space
经过几个小时的调试主要是以下部分:
var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{
var arr = s.split("\\|\\*\\|")
while(arr.length < columns.length) arr = arr :+ ""
RowFactory.create(arr:_*)
})
val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)
导致错误。
我将部分更改为
var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{
var arr = s.split("\\|\\*\\|")
while(arr.length < columns.length) arr = arr :+ ""
RowFactory.create(arr:_*)
}).toList
val finalDF = sqlContext.createDataFrame(sc.makeRDD(dataArr),structSchema)
但错误仍然相同。我应该改变什么来避免这种情况?
当我在 databricks Spark 集群中运行此代码时,特定作业给出了此 Spark 驱动程序错误:
Job aborted due to stage failure: Serialized task 45:0 was 792585456 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes).
我添加了这部分代码:
spark.conf.set("spark.rpc.message.maxSize",Int.MaxValue)
但是没有用。
最佳答案
我的猜测是
var dataString = correctData.collect().map(s => s.getString(0)).mkString("").replace("\\\n","").replace("\\\r","")
这就是问题所在,因为您将(几乎)所有数据收集到驱动程序,即 1 个 JVM。
也许此行会运行,但对 dataString
的后续操作将超出您的内存限制。您不应该收集您的数据!相反,使用分布式“数据结构”,例如 Dataframe 或 RDD。
我认为你可以省略上面一行中的collect
关于java - '线程 "dispatcher-event-loop-0"java.lang.OutOfMemoryError : Java heap space ' error in Spark Scala code 中出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60466188/