我将 Spark 与 YARN 集群管理器结合使用。
我从一个 Cassandra 表创建了一个数据集,该表包含大约 700 行,其中有 5 列,其中一列包含 JSON 格式的数据。数据量仅以 MB 为单位。
我用 spark-shell
运行:
- spark.executor.memory=4g
- spark.driver.memory=2g
我收到这个错误:
org.apache.spark.shuffle.FetchFailedException: Failed to connect to bosirahtaicas02.bscuat.local/172.17.0.1:53093 at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
当我尝试从我的 dataFrame
收集数据时
我直接在 spark-shell 中运行以下代码(逐行):
import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import com.datastax.spark.connector.cql._
import com.datastax.spark.connector._
import org.json._
import java.sql.Timestamp
val sqc = new SQLContext(sc)
val csc = new CassandraSQLContext(sc)
val allRecordsDF = sqc.read.format("org.apache.spark.sql.cassandra").
option("table", "xyz").
option("keyspace", 'pqr").load().cache()
val allRowsDF = allRecordsDF.select(allRecordsDF.col("record_type"), allRecordsDF.col("process_time"))
val allPatientsTS = allRowsDF.where(allRowsDF("record_type") === "patient").select("process_time").sort("process_time")
这里,当我尝试收集 allPatientsTS
Dataframe
时,它显示错误。
最佳答案
引用我关于 FetchFailedException 的笔记:
FetchFailedException
exception may be thrown when a task runs (andShuffleBlockFetcherIterator
did not manage to fetch shuffle blocks).
FetchFailedException
的根本原因是通常,因为执行器(带有随机 block 的 BlockManager)由于以下原因丢失(即不再可用):
可能会抛出 OutOfMemoryError(也称为 OOMed)或其他一些未处理的异常。
管理 Worker 和 Spark 应用程序执行者的集群管理器,例如YARN(这里就是这种情况),强制执行容器内存限制,并最终由于内存使用过多而决定终止执行程序。
您应该使用 Web UI、Spark History Server 或特定于集群的工具(例如用于 Hadoop YARN 的 yarn logs -applicationId
)查看 Spark 应用程序的日志。
通常的解决方案是调整 Spark 应用程序的内存。
如您所见,如果不对您的数据集(最重要的是它的大小和 Spark 上的分区方案)和 YARN 中的日志进行广泛审查,就很难确切地说出导致 FetchFailedException 的原因。
关于scala - 为什么收集数据集会因 org.apache.spark.shuffle.FetchFailedException 而失败?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44284472/