hadoop - 将增量文件写入S3(MinIO)-PySpark 2.4.3

标签 hadoop amazon-s3 pyspark minio delta-lake

我目前正在尝试将delta-lake parquet文件写入S3,并将其替换为本地的MinIO。

我可以完美地将标准parquet文件读/写到S3

但是,当我使用delta lake example

配置delta to s3

看来我无法将delta_log/写入我的MinIO

所以我尝试设置:fs.AbstractFileSystem.s3a.implfs.s3a.impl

我正在使用在当前pyspark[sql]==2.4.3中使用的venv
src/.env:

# pyspark packages
DELTA = io.delta:delta-core_2.11:0.3.0
HADOOP_COMMON = org.apache.hadoop:hadoop-common:2.7.3
HADOOP_AWS = org.apache.hadoop:hadoop-aws:2.7.3
PYSPARK_SUBMIT_ARGS = ${HADOOP_AWS},${HADOOP_COMMON},${DELTA}
src/spark_session.py:

# configure s3 connection for read/write operation (native spark)
hadoop_conf = sc.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", self.aws_endpoint_url)
hadoop_conf.set("fs.s3a.access.key", self.aws_access_key_id)
hadoop_conf.set("fs.s3a.secret.key", self.aws_secret_access_key)
# hadoop_conf.set("fs.AbstractFileSystem.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")  #  when using hadoop 2.8.5
# hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")  #  alternative to above hadoop 2.8.5
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("spark.history.fs.logDirectory", 's3a://spark-logs-test/')
src/apps/raw_to_parquet.py
# Trying to write pyspark dataframe to MinIO (S3)

raw_df.coalesce(1).write.format("delta").save(s3_url)

bash:

# RUN CODE
spark-submit --packages $(PYSPARK_SUBMIT_ARGS) src/run_onlineretailer.py

hadoop-common: 2.7.3hadoop-aws: 2.7.3错误:java.lang.RuntimeException: java.lang.NoSuchMethodException: org.apache.hadoop.fs.s3a.S3AFileSystem.<init>(java.net.URI, org.apache.hadoop.conf.Configuration)
因此,由于这个错误,我随后更新为hadoop-common: 2.8.5hadoop-aws: 2.8.5来修复NoSuchMethodException。因为delta需要:S3AFileSystempy4j.protocol.Py4JJavaError: An error occurred while calling o89.save. : java.lang.NoSuchMethodError: org.apache.hadoop.security.ProviderUtils.excludeIncompatibleCredentialProviders(Lorg/apache/hadoop/conf/Configuration;Ljava/lang/Class;)Lorg/apache/hadoop/conf/Configuration
因此,对我来说,似乎可以毫无问题地写入parquet文件,但是,delta创建了这些无法识别的delta_log文件夹(我认为吗?)。

当前source code

阅读几个不同的类似问题,但似乎没有人尝试使用delta lake文件。

更新

目前可以使用以下设置:

#pyspark packages
DELTA_LOGSTORE = spark.delta.logStore.class=org.apache.spark.sql.delta.storage.S3SingleDriverLogStore
DELTA = io.delta:delta-core_2.11:0.3.0
HADOOP_COMMON = org.apache.hadoop:hadoop-common:2.7.7
HADOOP_AWS = org.apache.hadoop:hadoop-aws:2.7.7
PYSPARK_SUBMIT_ARGS = ${HADOOP_AWS},${HADOOP_COMMON},${DELTA}
PYSPARK_CONF_ARGS = ${DELTA_LOGSTORE}

# configure s3 connection for read/write operation (native spark)
hadoop_conf = sc.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", self.aws_endpoint_url)
hadoop_conf.set("fs.s3a.access.key", self.aws_access_key_id)
hadoop_conf.set("fs.s3a.secret.key", self.aws_secret_access_key)

spark-submit --packages $(PYSPARK_SUBMIT_ARGS) --conf $(PYSPARK_CONF_ARGS) src/run_onlineretailer.py

奇怪的是,它只能像这样工作。

如果我尝试使用sc.confhadoop_conf设置它不起作用,请参见未注释的代码:

def spark_init(self) -> SparkSession:

    sc: SparkSession = SparkSession \
        .builder \
        .appName(self.app_name) \
        .config("spark.sql.warehouse.dir", self.warehouse_location) \
        .getOrCreate()

    # set log level
    sc.sparkContext.setLogLevel("WARN")

    # Enable Arrow-based columnar data transfers
    sc.conf.set("spark.sql.execution.arrow.enabled", "true")

    # sc.conf.set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") # does not work

    # configure s3 connection for read/write operation (native spark)
    hadoop_conf = sc.sparkContext._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.s3a.endpoint", self.aws_endpoint_url)
    hadoop_conf.set("fs.s3a.access.key", self.aws_access_key_id)
    hadoop_conf.set("fs.s3a.secret.key", self.aws_secret_access_key)
    #hadoop_conf.set("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") # does not work

    return sc

如果有人可以解释这一点,那就太好了。是因为.getOrCreate()吗?如果没有此调用,似乎无法设置conf吗?运行应用程序时在命令行中除外。

最佳答案

您正在混合hadoop- * jar ;就像 Spark 一样,它们只有在全部来自同一发行版时才起作用

关于hadoop - 将增量文件写入S3(MinIO)-PySpark 2.4.3,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57845157/

相关文章:

ruby-on-rails - AWS s3从rails应用程序访问私有(private)存储桶url

apache-spark - 在pyspark中展平结构数组

python - PySpark 尝试运行 Word2Vec 示例时出错

python - 删除或加速 PySpark 中的显式 for 循环

amazon-web-services - 为什么将我的云形成设计器保存到 S3 存储桶会挂起?

hadoop - Hive 无法识别 hbase 中的数字类型值

hadoop - HDFS 配置以及用户目录的用途是什么?

python - Hadoop Streaming - 无法找到文件错误

java - NoRouteToHostException 而 hadoop fs -copyFromLocal

amazon-web-services - Amazon S3 生命周期规则何时执行?