scala - 我们不能在 map 函数中使用 sparkContext 吗?

标签 scala apache-spark hadoop apache-spark-sql

我正在尝试调用 map 中的用户定义函数,如下所示,

df.select("path").map(x => func1(sparkSession, fs, path))

def func1(sparkSession: SparkSession, fileSystem: FileSystem, path: String)
{
  read HDFS file path and count the records.
}

使用上述方法我无法从 HDFS 读取文件。真正的原因是什么?无法将上下文传递给 map 内的函数吗?

最佳答案

在高层次上,SparkSession 是允许驱动程序与执行程序通信的对象。相比之下,map 方法定义了一个将在执行器上执行的函数,这意味着它必须被序列化,以便执行器可以与其一起获取相关数据并实际运行它。您可以想象将此对象序列化给执行程序以供其使用时可能出现的 hell 般的情况。

在您的情况下,如果(如我所想)路径数量相对适中(想想数百万或更少),您可以在驱动程序上收集这些路径,然后使用它们。 Spark 将根据需要安排此调用。

val paths: Array[String] = df.select.paths.as[String].collect()
for (path <- paths) {
  func1(sparkSession, fs, path)
}

关于scala - 我们不能在 map 函数中使用 sparkContext 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51263926/

相关文章:

scala - 如何将 AND 添加到连接 SLICK

python - 如何在 pyspark 中使用具有多个条件的 join?

json - Spark : How to parse multiple json with List of arrays of Struct?

python - 在一次操作中使用 spark 通过 reduceByKey 查找值范围

python - 在Hive数据库中匹配两个字段的最有效方法

hadoop - HDFS元数据占用太多空间

Scala - 扩展与

scala - 在 Scala 中是否可以简化以下 if/else 语句?

java - ChainReducer.setReducer 方法抛出错误 "ChainReducer is not applicable for the arguments"

scala - '1.narrow' 的类型