我正在尝试调用 map 中的用户定义函数,如下所示,
df.select("path").map(x => func1(sparkSession, fs, path))
def func1(sparkSession: SparkSession, fileSystem: FileSystem, path: String)
{
read HDFS file path and count the records.
}
使用上述方法我无法从 HDFS 读取文件。真正的原因是什么?无法将上下文传递给 map 内的函数吗?
最佳答案
在高层次上,SparkSession
是允许驱动程序与执行程序通信的对象。相比之下,map
方法定义了一个将在执行器上执行的函数,这意味着它必须被序列化,以便执行器可以与其一起获取相关数据并实际运行它。您可以想象将此对象序列化给执行程序以供其使用时可能出现的 hell 般的情况。
在您的情况下,如果(如我所想)路径数量相对适中(想想数百万或更少),您可以在驱动程序上收集这些路径,然后使用它们。 Spark 将根据需要安排此调用。
val paths: Array[String] = df.select.paths.as[String].collect()
for (path <- paths) {
func1(sparkSession, fs, path)
}
关于scala - 我们不能在 map 函数中使用 sparkContext 吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51263926/