scala - Databricks - 无法从 DataFrame 写入 Delta 位置

标签 scala apache-spark databricks delta-lake

我想更改 Databricks Delta 表的列名。

所以我做了以下事情:

// Read old table data
val old_data_DF = spark.read.format("delta")
.load("dbfs:/mnt/main/sales")

// Created a new DF with a renamed column
val new_data_DF = old_data_DF
      .withColumnRenamed("column_a", "metric1")
      .select("*")

// Dropped and recereated the Delta files location
dbutils.fs.rm("dbfs:/mnt/main/sales", true)
dbutils.fs.mkdirs("dbfs:/mnt/main/sales")

// Trying to write the new DF to the location
new_data_DF.write
.format("delta")
.partitionBy("sale_date_partition")
.save("dbfs:/mnt/main/sales")

在这里,我在写入 Delta 时的最后一步出现错误:
java.io.FileNotFoundException: dbfs:/mnt/main/sales/sale_date_partition=2019-04-29/part-00000-769.c000.snappy.parquet
A file referenced in the transaction log cannot be found. This occurs when data has been manually deleted from the file system rather than using the table `DELETE` statement

显然数据被删除了,很可能我在上面的逻辑中遗漏了一些东西。现在唯一包含数据的地方是 new_data_DF
写入 dbfs:/mnt/main/sales_tmp 之类的位置也会失败

我应该怎么做才能将 new_data_DF 中的数据写入 Delta 位置?

最佳答案

通常,避免在 Delta 表上使用 rm 是个好主意。 Delta 的事务日志在大多数情况下可以防止最终的一致性问题,但是,当您在很短的时间内删除和重新创建表时,不同版本的事务日志可能会出现和消失。

相反,我建议使用 Delta 提供的事务原语。例如,对于 overwrite the data in a table,您可以:

df.write.format("delta").mode("overwrite").save("/delta/events")

如果您有一个已经损坏的表,您可以使用 FSCK 修复它。

关于scala - Databricks - 无法从 DataFrame 写入 Delta 位置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56006982/

相关文章:

databricks - 我可以知道 z-order 列吗?

scala - 什么是 Cats 中的 simulacrum 中的 @noop

java - 如何从Elasticsearch读取数据到Spark?

scala - Spark joinWithCassandraTable() 映射多个分区键错误

scala - Spark Map/Filter引发java.io.IOException:换行符前的字节太多:2147483648

scala - 如何在 Spark Streaming Scala 中对 HBase 进行单元测试

apache-spark - 在 U-SQL 和 Spark/Databricks 之间进行选择

azure-devops - 'databricks configure --token' 挂起输入

scala - 如何在 Scala 中创建多维向量?

java - 如何从 Scala 中的辅助构造函数访问字段?