我有一个如下所示的数据框 df
+--------+--------------------+--------+------+
| id| path|somestff| hash1|
+--------+--------------------+--------+------+
| 1|/file/dirA/fileA.txt| 58| 65161|
| 2|/file/dirB/fileB.txt| 52| 65913|
| 3|/file/dirC/fileC.txt| 99|131073|
| 4|/file/dirF/fileD.txt| 46|196233|
+--------+--------------------+--------+------+
注意一点:/file/dir 有所不同。并非所有文件都存储在同一目录中。事实上,各个目录中有数百个文件。
这里我想要完成的是读取列路径中的文件并对文件中的记录进行计数,并将行计数的结果写入数据帧的新列中。
我尝试了以下函数和udf:
def executeRowCount(fileCount: String): Long = {
val rowCount = spark.read.format("csv").option("header", "false").load(fileCount).count
rowCount
}
val execUdf = udf(executeRowCount _)
df.withColumn("row_count", execUdf (col("path"))).show()
这会导致以下错误
org.apache.spark.SparkException: Failed to execute user defined fu
nction($anonfun$1: (string) => bigint)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:28)
at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)
... 19 more
我尝试在收集时迭代该列,例如
val te = df.select("path").as[String].collect()
te.foreach(executeRowCount)
在这里它工作得很好,但我想将结果存储在 df...
我尝试了几种解决方案,但我在这里面临着死胡同。
最佳答案
这不起作用,因为数据帧只能在驱动程序 JVM 中创建,但 UDF 代码在执行程序 JVM 中运行。您可以做的是将 CSV 加载到单独的数据框中,并使用文件名列丰富数据:
val csvs = spark
.read
.format("csv")
.load("/file/dir/")
.withColumn("filename", input_file_name())
然后将原始的df
加入到filename
列
关于scala - 无法在 Spark (Scala) 中的数据帧上执行用户定义的函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55457701/