pandas - pyspark中的java.lang.OutOfMemoryError

标签 pandas apache-spark pyspark

嗨,

我在 sparkcontext 中有一个数据框,有 400k 行和 3 列。
驱动程序有 143.5 的存储内存

16/03/21 19:52:35 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55613 with 143.5 GB RAM, BlockManagerId(driver, localhost, 55613)
16/03/21 19:52:35 INFO BlockManagerMaster: Registered BlockManager

我想将此 DataFrame 的内容作为 Pandas 返回

我做了
df_users =  UserDistinct.toPandas()

但我有这个错误
16/03/21 20:01:08 ERROR Executor: Exception in task 7.0 in stage 6.0 (TID 439)
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
16/03/21 20:01:08 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-0,5,main]
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

如果我有 143.5 GB RAM,这怎么可能?
我能做什么?

编辑

我的 Spark 默认值
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Default system properties included when running spark-submit.
# This is useful for setting default environmental settings.

# Example:
# spark.master                     spark://master:7077
#spark.eventLog.enabled           true
# spark.eventLog.dir               hdfs://namenode:8021/directory
# spark.serializer                 org.apache.spark.serializer.KryoSerializer
spark.driver.memory              200g
# spark.executor.extraJavaOptions  -XX:+PrintGCDetails -Dkey=value -Dnumbers="one two three"

我的 Spark 上下文
conf = SparkConf()

conf.set("spark.app.name","teste")
conf.set("spark.driver.maxResultSize","0")

sc = SparkContext(conf=conf)

enter image description here

编辑

所有步骤
#import data for a pandas dataframe

df_ora = pd.read_sql(query, con=connection)

#change for Spark dataframe and some transformation

sqlContext = SQLContext(sc)
df_oraAS = sqlContext.createDataFrame(df_ora)
df_oraAS.registerTempTable("df_oraAS")

#new column
df_with_C = df_oraAS.withColumn("BUY", lit(1))

indexer = StringIndexer(inputCol="ENT_EMAIL", outputCol="user")

#index because I want use recommendation system
user_PK = indexer.fit(df_with_C).transform(df_with_C)

#distinct
UserDistinct = user_PK.dropDuplicates(['ENT_EMAIL' ,'user'])

#data in Pandas dataframe
df_users =  UserDistinct.toPandas()

新编辑

Driver 60g 和 Executor 60g 的变化

错误:
16/03/22 09:53:40 INFO MemoryStore: Block taskresult_446 stored as bytes in memory (estimated size 1978.5 MB, free 22.5 GB)
16/03/22 09:53:40 INFO BlockManagerInfo: Added taskresult_446 in memory on localhost:56281 (size: 1978.5 MB, free: 20.4 GB)
16/03/22 09:53:40 INFO Executor: Finished task 14.0 in stage 6.0 (TID 446). 2074557399 bytes result sent via BlockManager)
16/03/22 09:53:40 INFO TaskSetManager: Starting task 25.0 in stage 6.0 (TID 457, localhost, partition 25,NODE_LOCAL, 1999 bytes)
16/03/22 09:53:40 INFO Executor: Running task 25.0 in stage 6.0 (TID 457)
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:53:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:54:04 ERROR Executor: Exception in task 18.0 in stage 6.0 (TID 450)
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
16/03/22 09:54:04 INFO TaskSetManager: Starting task 26.0 in stage 6.0 (TID 458, localhost, partition 26,NODE_LOCAL, 1999 bytes)
16/03/22 09:54:04 INFO Executor: Running task 26.0 in stage 6.0 (TID 458)
16/03/22 09:54:04 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-5,5,main]
java.lang.OutOfMemoryError
    at java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:239)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
16/03/22 09:54:05 INFO ShuffleBlockFetcherIterator: Getting 8 non-empty blocks out of 8 blocks
16/03/22 09:54:05 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms
16/03/22 09:54:05 INFO SparkContext: Invoking stop() from shutdown hook
16/03/22 09:54:06 WARN QueuedThreadPool: 6 threads could not be stopped
16/03/22 09:54:06 INFO SparkUI: Stopped Spark web UI at http://10.10.5.105:4040
16/03/22 09:54:08 INFO DAGScheduler: ResultStage 6 (toPandas at <stdin>:1) failed in 385.120 s
16/03/22 09:54:08 INFO DAGScheduler: Job 3 failed: toPandas at <stdin>:1, took 398.921433 s
16/03/22 09:54:09 ERROR Utils: Uncaught exception in thread task-result-getter-1
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Exception in thread "task-result-getter-1" java.lang.Error: java.lang.InterruptedException
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
    ... 3 more
16/03/22 09:54:09 ERROR Utils: Uncaught exception in thread task-result-getter-2
java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(Unknown Source)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
    at scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:202)
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:102)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:588)
    at org.apache.spark.storage.BlockManager$$anonfun$doGetRemote$2.apply(BlockManager.scala:585)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.storage.BlockManager.doGetRemote(BlockManager.scala:585)
    at org.apache.spark.storage.BlockManager.getRemoteBytes(BlockManager.scala:578)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:70)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
    at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:/Apache/spark-1.6.0/python/pyspark\sql\dataframe.py", line 1378, in toPandas
    return pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:/Apache/spark-1.6.0/python/pyspark\sql\dataframe.py", line 280, in collect
    port = self._jdf.collectToPython()
  File "C:\Users\user\Anaconda\lib\site-packages\py4j\java_gateway.py", line 813, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:/Apache/spark-1.6.0/python/pyspark\sql\utils.py", line 45, in deco
    return f(*a, **kw)
  File "C:\Users\user\Anaconda\lib\site-packages\py4j\protocol.py", line 308, in get_return_value
    format(target_id, ".", name), value)

最佳答案

出于某种原因,Spark 想要序列化一些数据。显然,它是通过写信给 ByteArrayOutputStream 来实现的。 .从文档:

This class implements an output stream in which the data is written into a byte array. The buffer automatically grows as data is written to it. The data can be retrieved using toByteArray() and toString().



这里的关键词是 (一!)字节数组。 Java 字节数组的最大长度为 2^31-1=2147483647字节 = 2GB。因此,只要 Spark 尝试序列化大于 2GB 的任何内容,您就会得到 OutOfMemoryError .

而这正是这里发生的事情。

要解决此问题,请使用 Spark 提交错误报告。罪魁祸首是org.apache.spark.serializer.JavaSerializerInstance.serialize() ,它假设您想要序列化的任何内容在其序列化形式中都不能大于 2GB。

关于pandas - pyspark中的java.lang.OutOfMemoryError,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36140493/

相关文章:

python - 如果另一列中的值匹配,如何填充列中的 nan 值

python - 将 pandas 数据帧的一行转换为多行

hadoop - 由于空间问题导致 Spark 作业失败

postgresql - 有没有办法使用 Postgis 几何类型将 Spark 连接到表?

python - Pandas/Pyplot 中的散点图 : How to plot by category with different markers

python - 如何从带有日期时间索引的 pandas 数据框中获取选定列的下个月值

apache-spark - Spark JDBC从Hive读取和写入

apache-spark - 在 pyspark[non pandas] 中为数据框的每一行调用一个函数

python - Pyspark 多 groupby 具有不同的列

java - Spark 将数组列分解为列