java - '线程 "dispatcher-event-loop-0"java.lang.OutOfMemoryError : Java heap space ' error in Spark Scala code 中出现异常

标签 java scala apache-spark apache-spark-sql out-of-memory

val data = spark.read
    .text(filePath)
    .toDF("val")
    .withColumn("id", monotonically_increasing_id())



    val count = data.count()



    val header = data.where("id==1").collect().map(s => s.getString(0)).apply(0)



    val columns = header
    .replace("H|*|", "")
    .replace("|##|", "")
    .split("\\|\\*\\|")


    val structSchema = StructType(columns.map(s=>StructField(s, StringType, true)))



    var correctData = data.where('id > 1 && 'id < count-1).select("val")
    var dataString = correctData.collect().map(s => s.getString(0)).mkString("").replace("\\\n","").replace("\\\r","")
    var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{ 
                                                          var arr = s.split("\\|\\*\\|")
                                                          while(arr.length < columns.length) arr = arr :+ ""
                                                          RowFactory.create(arr:_*)
                                                         })
    val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)

    display(finalDF)

这部分代码给出错误:

Exception in thread "dispatcher-event-loop-0" java.lang.OutOfMemoryError: Java heap space

经过几个小时的调试主要是以下部分:

var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{ 
                                                          var arr = s.split("\\|\\*\\|")
                                                          while(arr.length < columns.length) arr = arr :+ ""
                                                          RowFactory.create(arr:_*)
                                                         })
    val finalDF = spark.createDataFrame(sc.parallelize(dataArr),structSchema)

导致错误。

我将部分更改为

var dataArr = dataString.split("\\|\\#\\#\\|").map(s =>{
                                                          var arr = s.split("\\|\\*\\|")
                                                          while(arr.length < columns.length) arr = arr :+ ""
                                                          RowFactory.create(arr:_*)
                                                         }).toList
  val finalDF = sqlContext.createDataFrame(sc.makeRDD(dataArr),structSchema)

但错误仍然相同。我应该改变什么来避免这种情况?

当我在 databricks Spark 集群中运行此代码时,特定作业给出了此 Spark 驱动程序错误:

Job aborted due to stage failure: Serialized task 45:0 was 792585456 bytes, which exceeds max allowed: spark.rpc.message.maxSize (268435456 bytes).

我添加了这部分代码:

spark.conf.set("spark.rpc.message.maxSize",Int.MaxValue)

但是没有用。

最佳答案

我的猜测是

var dataString = correctData.collect().map(s => s.getString(0)).mkString("").replace("\\\n","").replace("\\\r","")

这就是问题所在,因为您将(几乎)所有数据收集到驱动程序,即 1 个 JVM。

也许此行会运行,但对 dataString 的后续操作将超出您的内存限制。您不应该收集您的数据!相反,使用分布式“数据结构”,例如 Dataframe 或 RDD。

我认为你可以省略上面一行中的collect

关于java - '线程 "dispatcher-event-loop-0"java.lang.OutOfMemoryError : Java heap space ' error in Spark Scala code 中出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60466188/

相关文章:

java - Spring mvc @DateTimeFormat 未按预期工作

java - AJP 连接器和 Tomcat 8.5.54 之间的网关超时问题

scala - 如何在 Spark SQL 中为自定义类型定义模式?

scala - 在 Scala 中将可变数量的元组序列减少到 Map[Key, List[Value]]

scala - 与条件语句匹配

scala - 如何在 zeppelin 中禁止打印变量值

python - Json 文件到 pyspark 数据帧

java - Jmonkey 漂亮。检测用户是否停止滑动

java - 编写按钮方法时出错

algorithm - 如何在 Scala 中的数据框中获取成对的 x 值?