scala - 无法在 Spark (Scala) 中的数据帧上执行用户定义的函数

标签 scala apache-spark user-defined-functions

我有一个如下所示的数据框 df

+--------+--------------------+--------+------+
|      id|                path|somestff| hash1|
+--------+--------------------+--------+------+
|       1|/file/dirA/fileA.txt|      58| 65161|
|       2|/file/dirB/fileB.txt|      52| 65913|
|       3|/file/dirC/fileC.txt|      99|131073|
|       4|/file/dirF/fileD.txt|      46|196233|
+--------+--------------------+--------+------+

注意一点:/file/dir 有所不同。并非所有文件都存储在同一目录中。事实上,各个目录中有数百个文件。

这里我想要完成的是读取列路径中的文件并对文件中的记录进行计数,并将行计数的结果写入数据帧的新列中。

我尝试了以下函数和udf:

def executeRowCount(fileCount: String): Long = {
  val rowCount = spark.read.format("csv").option("header", "false").load(fileCount).count
  rowCount
}

val execUdf = udf(executeRowCount _)

df.withColumn("row_count", execUdf (col("path"))).show()

这会导致以下错误

org.apache.spark.SparkException: Failed to execute user defined fu
nction($anonfun$1: (string) => bigint)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:28)
        at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)
        ... 19 more

我尝试在收集时迭代该列,例如

val te = df.select("path").as[String].collect()
te.foreach(executeRowCount)

在这里它工作得很好,但我想将结果存储在 df...

我尝试了几种解决方案,但我在这里面临着死胡同。

最佳答案

这不起作用,因为数据帧只能在驱动程序 JVM 中创建,但 UDF 代码在执行程序 JVM 中运行。您可以做的是将 CSV 加载到单独的数据框中,并使用文件名列丰富数据:

val csvs = spark
 .read
 .format("csv")
 .load("/file/dir/")
 .withColumn("filename", input_file_name())

然后将原始的df加入到filename

关于scala - 无法在 Spark (Scala) 中的数据帧上执行用户定义的函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55457701/

相关文章:

scala - Akka 流 : dealing with futures within graph stage

scala - 总结数据框中所有行的列值 - scala/spark

scala - 为什么Scala将Seq转换为List?

apache-spark - 无法从应用程序连接到独立集群

user-defined-functions - Aerospike:获取 upsert 时间而不显式存储它用于带有 TTL 的记录

php - 如何在php中编写mysql用户自定义函数

generics - 在 Scala 中是否可以强制调用者为多态方法指定类型参数?

hadoop - RDD分区和切片有什么区别?

apache-spark - 如何在 Apache Spark (PySpark 1.4.1) 中可视化/绘制决策树?

c++ - 头文件中函数原型(prototype)的语法