python - 将 delta Lake 写入 AWS S3(无 Databricks)

标签 python amazon-s3 pyspark delta-lake

# Creating PySpark Object
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("XMLParser").getOrCreate()
sc=spark.sparkContext
hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", aws_key)
hadoop_conf.set("fs.s3n.awsSecretAccessKey", aws_secret)

然后我可以使用我的 s3 存储桶中的以下代码读取文件
df = spark.read.format("xml").options(rootTag='returnResult', rowTag="query").load("s3n://bucketName/folder/file.xml")

但是当我尝试使用此代码使用 delta Lake( Parquet 文件)写回 s3 时
df.write.format("delta").mode('overwrite').save("s3n://bucket/folder/file")

我收到此错误
    Py4JJavaError: An error occurred while calling o778.save.
: java.io.IOException: The error typically occurs when the default LogStore implementation, that
 is, HDFSLogStore, is used to write into a Delta table on a non-HDFS storage system.
 In order to get the transactional ACID guarantees on table updates, you have to use the
 correct implementation of LogStore that is appropriate for your storage system.
 See https://docs.delta.io/latest/delta-storage.html " for details.

    at org.apache.spark.sql.delta.DeltaErrors$.incorrectLogStoreImplementationException(DeltaErrors.scala:157)
    at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:73)
    at org.apache.spark.sql.delta.storage.HDFSLogStore.write(HDFSLogStore.scala:64)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit$1.apply$mcJ$sp(OptimisticTransaction.scala:434)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit$1.apply(OptimisticTransaction.scala:416)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$doCommit$1.apply(OptimisticTransaction.scala:416)
    at org.apache.spark.sql.delta.DeltaLog.lockInterruptibly(DeltaLog.scala:152)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.doCommit(OptimisticTransaction.scala:415)
    at org.apache.spark.sql.delta.OptimisticTransaction.doCommit(OptimisticTransaction.scala:80)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply$mcJ$sp(OptimisticTransaction.scala:326)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:284)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$$anonfun$commit$1.apply(OptimisticTransaction.scala:284)
    at com.databricks.spark.util.DatabricksLogging$class.recordOperation(DatabricksLogging.scala:77)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:80)
    at org.apache.spark.sql.delta.metering.DeltaLogging$class.recordDeltaOperation(DeltaLogging.scala:103)
    at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:80)
    at org.apache.spark.sql.delta.OptimisticTransactionImpl$class.commit(OptimisticTransaction.scala:284)
    at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:80)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:67)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta$$anonfun$run$1.apply(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:188)
    at org.apache.spark.sql.delta.commands.WriteIntoDelta.run(WriteIntoDelta.scala:64)
    at org.apache.spark.sql.delta.sources.DeltaDataSource.createRelation(DeltaDataSource.scala:134)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hadoop.fs.UnsupportedFileSystemException: fs.AbstractFileSystem.s3n.impl=null: No AbstractFileSystem configured for scheme: s3n
    at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:160)
    at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:249)
    at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:334)
    at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:331)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:331)
    at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:448)
    at org.apache.spark.sql.delta.storage.HDFSLogStore.getFileContext(HDFSLogStore.scala:47)
    at org.apache.spark.sql.delta.storage.HDFSLogStore.writeInternal(HDFSLogStore.scala:70)
    ... 53 more

我试图按照堆栈跟踪中给出的链接进行操作,但无法弄清楚如何解决此问题。任何帮助都会受到欢迎

最佳答案

创建 spark session 后,您需要添加 databricks 提供的配置以启用 s3 作为增量存储,例如:

conf = spark.sparkContext._conf.setAll([('spark.delta.logStore.class','org.apache.spark.sql.delta.storage.S3SingleDriverLogStore')])
spark.sparkContext._conf.getAll()

As the name suggests, the S3SingleDriverLogStore implementation only works properly when all concurrent writes originate from a single Spark driver. This is an application property, must be set before starting SparkContext, and cannot change during the lifetime of the context.



来自数据块
访问 here 以配置 s3a 路径访问 key 和 key 。

关于python - 将 delta Lake 写入 AWS S3(无 Databricks),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61769528/

相关文章:

dataframe - pyspark:计算连续的 1/0 的数量,并在条纹太短/太长时更改它们

python - 从带有文本列的 Spark 数据帧创建 TF_IDF 向量

python - Beam Python SDK : pd. 合并左连接错误(valueError:尝试对不可为空的字段编码 null)

python - 连接多个数据帧

python - Pandas:检查B列中包含的A列中的值

python - riffleshuffling 出现问题,函数找不到列表

ruby - 存储桶中的亚马逊 s3 和载波随机图像名称在数据库中不匹配

amazon-s3 - AMAZON AWS S3 递归地更改/添加整个存储桶的元数据

pyspark - GroupBy 和 concat 数组列 pyspark

amazon-web-services - 除非检查 AES256 加密,否则拒绝 S3 中的 CreateBucket