scala - 无法使用Spark获取Delta Lake表的元数据信息

标签 scala apache-spark delta-lake

我正在尝试获取使用 DataFrame 创建的 Delta Lake 表的元数据信息。有关版本、时间戳的信息。

尝试:spark.sql("describe deltaSample").show(10,false) - 这没有提供与版本和时间戳相关的信息:

我想知道有多少个带有时间戳信息的版本

+--------+---------+-------+
|col_name|data_type|comment|
+--------+---------+-------+
|_c0     |string   |null   |
|_c1     |string   |null   |
+--------+---------+-------+

下面是代码: //在spark-shell中下载增量

spark2-shell --packages io.delta:delta-core_2.11:0.2.0
val data = spark.read.csv("/xyz/deltaLake/deltaLakeSample.csv")

//保存数据框

data.write.format("delta").save("/xyz/deltaLake/deltaSample")

//创建三角洲湖表

spark.sql("create table deltaSample using delta location '/xyz/deltaLake/deltaSample'")
val updatedInfo = data.withColumn("_c1",when(col("_c1").equalTo("right"), "updated").otherwise(col("_c1")) )

//更新三角洲湖表

updatedInfo.write.format("delta").mode("overwrite").save("/xyz/deltaLake/deltaSample")
spark.read.format("delta").option("versionAsOf", 0).load("/xyz/deltaLake/deltaSample/").show(10,false)
+---+-----+
|_c0|_c1  |
+---+-----+
|rt |right|
|lt |left |
|bk |back |
|frt|front|

+---+-----+

spark.read.format("delta").option("versionAsOf", 1).load("/xyz/deltaLake/deltaSample/").show(10,false)
+---+-------+
|_c0|_c1    |
+---+-------+
|rt |updated|
|lt |left   |
|bk |back   |
|frt|front  |
+---+-------+

//获取创建的表的元数据。带有版本、时间戳信息。

spark.sql("describe history deltaSample") -- not working
org.apache.spark.sql.AnalysisException: Table or view was not found: history;
  at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:47)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupTableFromCatalog(Analyzer.scala:733)
  at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.resolveRelation(Analyzer.scala:685)

预期的表格显示(例如:添加的列版本、时间戳):

+--------+---------+-------+-------+------------
|_c0     |_c1      |Version|timestamp           |
+--------+---------+-------+-------+------------
|rt      |right    |0      |2019-07-22 00:24:00|
|lt      |left     |0      |2019-07-22 00:24:00|
|rt      |updated  |1      |2019-08-22 00:25:60|
|lt      |left     |1      |2019-08-22 00:25:60|
+--------+---------+-------+------------------+

最佳答案

最近宣布的 0.3.0 版本包含查看 Delta Lake 表历史记录的功能 Announcing the Delta Lake 0.3.0 Release .

目前您可以使用 Scala API 来完成此操作;当前路线图上有能力在 SQL 中执行此操作。对于 Scala API 示例,使用 0.3.0,

import io.delta.tables._

val deltaTable = DeltaTable.forPath(spark, pathToTable)

val fullHistoryDF = deltaTable.history()    // get the full history of the table.

val lastOperationDF = deltaTable.history(1) // get the last operation.

fullHistoryDF 的结果类似于:

+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|version|          timestamp|userId|userName|operation| operationParameters| job|notebook|clusterId|readVersion|isolationLevel|isBlindAppend|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+
|      5|2019-07-29 14:07:47|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          4|          null|        false|
|      4|2019-07-29 14:07:41|  null|    null|   UPDATE|[predicate -> (id...|null|    null|     null|          3|          null|        false|
|      3|2019-07-29 14:07:29|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          2|          null|        false|
|      2|2019-07-29 14:06:56|  null|    null|   UPDATE|[predicate -> (id...|null|    null|     null|          1|          null|        false|
|      1|2019-07-29 14:04:31|  null|    null|   DELETE|[predicate -> ["(...|null|    null|     null|          0|          null|        false|
|      0|2019-07-29 14:01:40|  null|    null|    WRITE|[mode -> ErrorIfE...|null|    null|     null|       null|          null|         true|
+-------+-------------------+------+--------+---------+--------------------+----+--------+---------+-----------+--------------+-------------+

关于scala - 无法使用Spark获取Delta Lake表的元数据信息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57166757/

相关文章:

scala - Scala 2.8 集合库是 "the longest suicide note in history"的情况吗?

用于复杂验证但具有良好错误输出的 scala 提取器模式

amazon-web-services - 您可以运行具有多个 EMR 集群的交易数据湖(Hudi、Delta Lake)吗?

apache-spark - PySpark:减去数据框并忽略某些列

apache-spark - 什么是 hadoop(单节点和多节点)、spark-master 和 spark-worker?

pyspark - databricks delta 在哪里存储它的元数据?

apache-spark - 三角洲湖上的 hive 表

scala - 如何使用 Squeryl 的 KeyedEntity?

scala - 按键合并 map

python - 从 pyspark session 中获取 hive 和 hadoop 版本