scala - 为什么收集数据集会因 org.apache.spark.shuffle.FetchFailedException 而失败?

标签 scala apache-spark apache-spark-sql cassandra spark-cassandra-connector

我将 Spark 与 YARN 集群管理器结合使用。

我从一个 Cassandra 表创建了一个数据集,该表包含大约 700 行,其中有 5 列,其中一列包含 JSON 格式的数据。数据量仅以 MB 为单位。

我用 spark-shell 运行:

  • spark.executor.memory=4g
  • spark.driver.memory=2g

我收到这个错误:

org.apache.spark.shuffle.FetchFailedException: Failed to connect to bosirahtaicas02.bscuat.local/172.17.0.1:53093 at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)

当我尝试从我的 dataFrame 收集数据时

我直接在 spark-shell 中运行以下代码(逐行):

import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext

import com.datastax.spark.connector.cql._
import com.datastax.spark.connector._

import org.json._
import java.sql.Timestamp

val sqc = new SQLContext(sc)
val csc = new CassandraSQLContext(sc)

val allRecordsDF = sqc.read.format("org.apache.spark.sql.cassandra").
    option("table", "xyz").
    option("keyspace", 'pqr").load().cache()

val allRowsDF = allRecordsDF.select(allRecordsDF.col("record_type"), allRecordsDF.col("process_time"))

val allPatientsTS = allRowsDF.where(allRowsDF("record_type") === "patient").select("process_time").sort("process_time")

这里,当我尝试收集 allPatientsTS Dataframe 时,它显示错误。

最佳答案

引用我关于 FetchFailedException 的笔记:

FetchFailedException exception may be thrown when a task runs (and ShuffleBlockFetcherIterator did not manage to fetch shuffle blocks).

FetchFailedException 的根本原因是通常,因为执行器(带有随机 block 的 BlockManager)由于以下原因丢失(即不再可用):

  • 可能会抛出 OutOfMemoryError(也称为 OOMed)或其他一些未处理的异常。

  • 管理 Worker 和 Spark 应用程序执行者的集群管理器,例如YARN(这里就是这种情况),强制执行容器内存限制,并最终由于内存使用过多而决定终止执行程序。

您应该使用 Web UI、Spark History Server 或特定于集群的工具(例如用于 Hadoop YARN 的 yarn logs -applicationId)查看 Spark 应用程序的日志。

通常的解决方案是调整 Spark 应用程序的内存。

如您所见,如果不对您的数据集(最重要的是它的大小和 Spark 上的分区方案)和 YARN 中的日志进行广泛审查,就很难确切地说出导致 FetchFailedException 的原因。

关于scala - 为什么收集数据集会因 org.apache.spark.shuffle.FetchFailedException 而失败?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44284472/

相关文章:

string - 有没有办法扩展 Scala 的 RichString?

arrays - 使用一系列数字范围创建新列

java - 使用 Spark 高效读取 PDF/文本/word 文件

sql - PySpark 将 datetime2 数据类型转换为 datetime 数据类型导致值超出范围

scala - 将 Spark Dataframe 中的一列转换为多列

javascript - PDF.js 不会重定向到位于 Firefox 中启用 CORS 的另一台服务器上的 pdf 文件

java - 在 Neo4J 图上进行多次更新的正确方法是什么?

用于非泛型隐式参数的 Scala 糖

python - 从 json 创建数据框时如何不推断架构?

hadoop - java.io.IOException : org. apache.hadoop.security.AccessControlException : Client cannot authenticate via:[TOKEN, KERBEROS]