java - 添加执行器时引发 OutOfMemoryError

标签 java apache-spark

我尝试使用DISK_ONLY持久性在大型数据集(~100GB)上运行MLlib的LBFGS示例:https://spark.apache.org/docs/1.0.0/mllib-optimization.html#limited-memory-bfgs-l-bfgs。我为驱动程序使用 16GB,为每个执行程序使用 16GB。

当我使用很少的执行器(10)时,一切都会顺利进行。但是当我尝试使用更多执行器时,我在驱动程序上遇到了 OutOfMemoryError: Java heap space (40)。我认为这可能与使用的并行级别有关(如 https://spark.apache.org/docs/latest/tuning.html#level-of-parallelism 中所示)。

我尝试将 spark.default.parallelism 设置为较大的值(从 5000 到 15000),但我仍然遇到同样的问题,而且似乎没有考虑到这一点(有每个作业大约 500 个任务),即使它是在环境选项卡中设置的。

我在 Yarn 集群上使用 Spark 1.0.0 和 Java。我设置默认并行度 SparkConf conf = new SparkConf().set("spark.default.parallelism", "15000");.

堆栈跟踪:

14/10/20 11:25:16 INFO TaskSetManager: Starting task 30.0:20 as TID 60630 on executor 17: a4-5d-36-fc-ef-54.hpc.criteo.preprod (PROCESS_LOCAL)
14/10/20 11:25:16 INFO TaskSetManager: Serialized task 30.0:20 as 127544326 bytes in 227 ms
14/10/20 11:25:59 INFO TaskSetManager: Starting task 30.0:68 as TID 60631 on executor 10: a4-5d-36-fc-9f-2c.hpc.criteo.preprod (PROCESS_LOCAL)
14/10/20 11:25:59 ERROR ActorSystemImpl: Uncaught fatal error from thread [spark-akka.actor.default-dispatcher-5] shutting down ActorSystem [spark]
java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Arrays.java:2271)
    at java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:113)
    at java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:93)
    at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:140)
    at java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1852)
    at java.io.ObjectOutputStream.write(ObjectOutputStream.java:708)
    at java.nio.channels.Channels$WritableByteChannelImpl.write(Channels.java:458)
    at org.apache.spark.util.SerializableBuffer.writeObject(SerializableBuffer.scala:49)
    at sun.reflect.GeneratedMethodAccessor98.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$launchTasks$1.apply(CoarseGrainedSchedulerBackend.scala:145)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$launchTasks$1.apply(CoarseGrainedSchedulerBackend.scala:143)
    at scala.collection.mutable.ArraySeq.foreach(ArraySeq.scala:73)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.launchTasks(CoarseGrainedSchedulerBackend.scala:143)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor.makeOffers(CoarseGrainedSchedulerBackend.scala:131)
    at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend$DriverActor$$anonfun$receive$1.applyOrElse(CoarseGrainedSchedulerBackend.scala:103)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
14/10/20 11:25:59 INFO DAGScheduler: Failed to run aggregate at LBFGS.scala:201
14/10/20 11:25:59 INFO ApplicationMaster: finishApplicationMaster with FAILED
14/10/20 11:25:59 INFO AMRMClientImpl: Waiting for application to be successfully unregistered.
Exception in thread "Thread-4" java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:187)
Caused by: org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:639)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:638)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:638)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1215)
    at akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:201)
    at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:163)
    at akka.actor.ActorCell.terminate(ActorCell.scala:338)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:431)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:447)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)
    at akka.dispatch.Mailbox.run(Mailbox.scala:218)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

关于为什么会发生此错误以及如何解决它有什么想法吗?

最佳答案

遵循此邮件建议 http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3C49229E870391FC49BBBED818C268753D70587CCC@SZXEMA501-MBX.china.huawei.com%3E我认为这是Spark使用的聚合方法造成的。我升级到 Spark 1.1,一切都很好。

关于java - 添加执行器时引发 OutOfMemoryError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26466866/

相关文章:

java - 如何使用 ExoPlayer 播放 DES 加密文件

java - Dining Philosophers 代码中发生的饥饿

java - 与 firebase 数据库连接的问题(每当我尝试保存到数据库时,应用程序都会崩溃)

scala - Spark 流 |将不同的数据帧写入Synapse DW中的多个表

scala - 尝试创建 jar 时出现 UNRESOLVED DEPENDENCIES 错误

java - 如何在 Java 中添加、删除和更新 GlazedLists/JTable 中的行?

java - 如何在 java eclipse 中导入/使用 StdDraw?

apache-spark - 如何从计算机中删除现有的 spark 环境和相关包?

python - 来自 Hive 查询的持久 PySpark Dataframe

apache-spark - Spark CountVectorizer 返回一个 TinyInt