apache-spark - Spark SQL SaveMode.Overwrite,获取java.io.FileNotFoundException并需要“REFRESH TABLE tableName”

标签 apache-spark apache-spark-sql spark-dataframe

对于Spark sql,我们应该如何通过覆盖保存模式从HDFS中的一个文件夹中获取数据,进行一些修改,并将更新的数据保存到HDFS中的同一文件夹中,而不会出现FileNotFoundException?

import org.apache.spark.sql.{SparkSession,SaveMode}
import org.apache.spark.SparkConf

val sparkConf: SparkConf = new SparkConf()
val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate()
val df = sparkSession.read.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20")
val newDF = df.select("a","b","c")

newDF.write.mode(SaveMode.Overwrite)
     .parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20") // doesn't work
newDF.write.mode(SaveMode.Overwrite)
     .parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-21") // works


当我们从hdfs目录“ d = 2017-03-20”读取数据并将更新的数据保存(SaveMode.Overwrite)到相同的hdfs目录“ d = 2017-03-20”时,发生FileNotFoundException

Caused by: org.apache.spark.SparkException: Task failed while writing rows
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
  at org.apache.spark.scheduler.Task.run(Task.scala:99)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20/part-05020-35ea100f-829e-43d9-9003061-1788904de770.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:157)
  at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
  at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
  ... 8 more


以下尝试仍然会出现相同的错误,如何使用Spark sql解决此问题?谢谢!

val hdfsDirPath = "hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20"

val df= sparkSession.read.parquet(hdfsDirPath)

val newdf = df
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)


要么

val df= sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")

sparkSession.sql("TRUNCATE TABLE orgtable")
sparkSession.sql("INSERT INTO orgtable SELECT * FROM tmptable")

val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)


要么

val df= sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")

sparkSession.sql("REFRESH TABLE orgtable")
sparkSession.sql("ALTER VIEW tmptable RENAME TO orgtable")

val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)

最佳答案

我解决了这个问题,首先将Dataframe写入临时目录,然后删除读取的源,并将临时目录重命名为源名称。质量检查

关于apache-spark - Spark SQL SaveMode.Overwrite,获取java.io.FileNotFoundException并需要“REFRESH TABLE tableName”,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42920748/

相关文章:

python - "normalize"将句子的数据帧转换为更大的单词数据帧

java - 为什么我的 java lambda 表达式不能工作,而它的命令式风格可以正常工作?

sql - 使用 Spark SQL 将一列拆分为多列

python - spark 1.3.0、python、avro 文件、在 spark-defaults.conf 中设置的驱动程序类路径,但从属设备看不到

postgresql - Apache Spark : JDBC connection not working

scala - 如何在不使用 Spark/Scala 中的 collect 方法的情况下从数据帧中读取数据

java - Spark 数据集 - NumberFormatException : Zero length BigInteger

python - 如何在 pyspark 中按列名称映射值

scala - 如何将两个具有不同结构类型字段的 spark Dataframe 联合起来?

scala - Apache Spark 读取 UTF-16 CSV 文件