r - 如何使用 SparkR 从 delta lib 读取数据?

标签 r apache-spark databricks delta-lake

我找不到任何使用 SparkR 访问 Delta 数据的引用,所以我自己尝试了。因此,首先我在 Python 中创建了一个虚拟数据集:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
data2 = [("James","","Smith","36636","M",2000),
    ("Robert","","Williams","42114","M",5000),
    ("Maria","Anne","Jones","39192","F",5000),
    ("Jen","Mary","Brown","","F",-1)
  ]

schema = StructType([ \
    StructField("firstname",StringType(),True), \
    StructField("middlename",StringType(),True), \
    StructField("lastname",StringType(),True), \
    StructField("id", StringType(), True), \
    StructField("gender", StringType(), True), \
    StructField("salary", IntegerType(), True) \
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)

df.write \
  .format("delta")\
  .mode("overwrite")\
  .option("userMetadata", "first-version") \
  .save("/temp/customers")

您可以修改此代码以更改数据并再次运行以模拟随时间的变化。

我可以使用以下命令在 python 中进行查询:

df3 = spark.read \
  .format("delta") \
  .option("timestampAsOf", "2020-11-30 22:03:00") \
  .load("/temp/customers")
df3.show(truncate=False)

但是我不知道如何在 Spark R 中传递该选项,你能帮我吗?

%r
library(SparkR)
teste_r <- read.df("/temp/customers", source="delta")
head(teste_r)

它可以工作,但仅返回当前版本。

最佳答案

timestampAsOf 将用作 SparkR::read.df 中的参数。

SparkR::read.df("/temp/customers", source = "delta", timestampAsOf = "2020-11-30 22:03:00")

这也可以通过 SparkR::sql 来完成。

SparkR::sql('
SELECT *
FROM delta.`/temp/customers`
TIMESTAMP AS OF "2020-11-30 22:03:00"
')

或者,要在 sparklyr 中执行此操作,请使用 spark_read_delta 中的 timestamp 参数。

library(sparklyr)

sc <- spark_connect(method = "databricks")

spark_read_delta(sc, "/temp/customers", timestamp = "2020-11-30 22:03:00")

关于r - 如何使用 SparkR 从 delta lib 读取数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65090030/

相关文章:

R - 使用 data.table 有效测试多行和多列的滚动条件

使用 R 中的plotly包再现等值区域图

azure - Databricks/Spark 从 Parquet 文件读取自定义元数据

r - 在R中将字符串作为参数传递

r - 三维数组的边际总结

apache-spark - Apache Spark RDD过滤器分为两个RDD

apache-spark - 运行pyspark时没有此类文件或目录错误

Maven 目标无法执行

azure - 使用 GCM 模式解密 Databricks 中的加密字符串

json - pyspark 数据帧转换为有效的 json