apache-spark - 在 Apache Hudi 表中写入 Spark DataFrame

标签 apache-spark hive apache-hudi

我是 apace hudi 的新手,并尝试使用 Spark shell 在我的 Hudi 表中写入我的数据帧。 对于第一次输入,我没有创建任何表并以覆盖模式写入,所以我希望它会创建 hudi 表。我正在编写下面的代码。

    spark-shell \
    --packages org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.1 \
    --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'

    //Initialize a Spark Session for Hudi

    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions._
    import org.apache.hudi.DataSourceWriteOptions
    import org.apache.hudi.config.HoodieWriteConfig
    import org.apache.hudi.hive.MultiPartKeysValueExtractor
    import org.apache.spark.sql.SparkSession

    val spark1 = SparkSession.builder().appName("hudi-datalake").master("local[*]").config("spark.serializer", "org.apache.spark.serializer.KryoSerializer").config("spark.sql.hive.convertMetastoreParquet", "false").getOrCreat ()

    //Write to a Hudi Dataset 

    val inputDF = Seq(
    ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
    ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
    ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
    ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
    ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
    ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
    ).toDF("id", "creation_date", "last_update_time")

    val hudiOptions = Map[String,String](
    HoodieWriteConfig.TABLE_NAME -> "work.hudi_test",
    DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", 
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "work.hudi_test",
    DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName)
    //  Upsert Data
    // Create a new DataFrame from the first row of inputDF with a different creation_date value

    val updateDF = inputDF.limit(1).withColumn("creation_date", lit("2014-01-01"))

    updateDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.overwrite).saveAsTable("work.hudi_test")

    while writing this write statement i m getting below error message.

    java.lang.NoClassDefFoundError: org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2

有人可以指导我如何写这个声明吗?

最佳答案

这是您在 pyspark 中遇到的问题的工作示例:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

spark = (
    SparkSession.builder.appName("Hudi_Data_Processing_Framework")
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.sql.hive.convertMetastoreParquet", "false")
    .config(
        "spark.jars.packages",
        "org.apache.hudi:hudi-spark-bundle_2.12:0.7.0,org.apache.spark:spark-avro_2.12:3.0.2"
    )
    .getOrCreate()
)

input_df = spark.createDataFrame(
    [
        ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
        ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
        ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
        ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
        ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
        ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
    ],
    ("id", "creation_date", "last_update_time"),
)

hudi_options = {
    # ---------------DATA SOURCE WRITE CONFIGS---------------#
    "hoodie.table.name": "hudi_test",
    "hoodie.datasource.write.recordkey.field": "id",
    "hoodie.datasource.write.precombine.field": "last_update_time",
    "hoodie.datasource.write.partitionpath.field": "creation_date",
    "hoodie.datasource.write.hive_style_partitioning": "true",
    "hoodie.upsert.shuffle.parallelism": 1,
    "hoodie.insert.shuffle.parallelism": 1,
    "hoodie.consistency.check.enabled": True,
    "hoodie.index.type": "BLOOM",
    "hoodie.index.bloom.num_entries": 60000,
    "hoodie.index.bloom.fpp": 0.000000001,
    "hoodie.cleaner.commits.retained": 2,
}

# INSERT
(
    input_df.write.format("org.apache.hudi")
    .options(**hudi_options)
    .mode("append")
    .save("/tmp/hudi_test")
)

# UPDATE
update_df = input_df.limit(1).withColumn("creation_date", lit("2014-01-01"))
(
    update_df.write.format("org.apache.hudi")
    .options(**hudi_options)
    .mode("append")
    .save("/tmp/hudi_test")
)

# REAL UPDATE
update_df = input_df.limit(1).withColumn("last_update_time", lit("2016-01-01T13:51:39.340396Z"))
(
    update_df.write.format("org.apache.hudi")
    .options(**hudi_options)
    .mode("append")
    .save("/tmp/hudi_test")
)

output_df = spark.read.format("org.apache.hudi").load(
    "/tmp/hudi_test/*/*"
)

output_df.show()

输出: enter image description here

文件系统中的Hudi表如下所示: enter image description here

注意: 您的更新操作实际上创建了一个新分区并执行了插入操作,因为您正在修改分区列 (2015-01-01 -> 2014-01-01)。您可以在输出中看到这一点。

我提供了一个更新示例,它将 last_update_time 更新为 2016-01-01T13:51:39.340396Z,这实际上更新了分区中的 id 100 2015-01-012015-01-01T13:51:39.340396Z2016-01-01T13:51:39.340396Z

更多示例可以在 Hudi Quick Start Guide 中找到

关于apache-spark - 在 Apache Hudi 表中写入 Spark DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66705961/

相关文章:

apache-spark - PySpark,决策树(Spark 2.0.0)

azure - 解压缩存储在 Azure Databricks FileStore 中的文件夹

sql - 在 Pyspark 数据框上查找迄今为止的月份和月份

apache-spark - spark-ec2 --ebs-volume-size 不工作

sql - 如何从map(string,string)字段获取特定值?

hadoop - 如果使用Hive不可能或很难做到的hadoop,该怎么办?

apache-spark - Hive/SparkSQL - 在表达式中将日期类型强制转换为时间戳

google-cloud-dataproc - 如何在dataproc上运行hudi并写入gcs存储桶

amazon-web-services - 直接或通过 AWS Glue 数据目录将 Redshift Spectrum/AWS EMR 与 Hudi 连接

apache-spark - 具有自定义格式的 Apache Hudi 分区