azure - 无法从 Azure Spark 群集写入 Blob 存储

标签 azure pyspark azure-synapse

我正在 azure synapse 中使用 pyspark 创建笔记本来创建 parquet 文件。

运行时,我收到以下错误原因:org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: 在非空目录上不允许此操作。我的目录是空了,我去检查文件夹,里面写了一些文件,但几分钟后出现错误。

感谢您的关注。

我使用以下代码

df.write.mode('overwrite').parquet(path)

最佳答案

我尝试了相同的代码df.write.mode('overwrite').parquet(path)并且我遇到了与您收到的相同的错误。 enter image description here

为了解决这个问题,我尝试了从位于 source 容器的 "dir1/CSV" 目录中的 CSV 文件读取数据的方法>Azure Data Lake Storage Gen2,将其转换为DataFrame,然后将其写回到“destination”容器的“dir1/”目录中 Parquet 格式。

下面是代码:

sourceaccountname ="synapsestorageaug02"
Sourcecontainer= "source"
SourceLinked Service = "AzureDataLakeStorage_ls"
Sourcefilelocation = " dir1/CSV"
Destinationaccountname="synapsestorageaug02"
destinationcontainer="destination"
destinationfile="dir1/"
spark.conf.set('spark.storage.synapse.linkedServiceName','AzureDataLakeStorage_ls')
spark.conf.set('fs.azure.account.oauth.provider.type','com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider')
df= f'abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="e3908c96918086a3909a8d8293908690978c91828486829684d3d1cd878590cd808c9186cd948a8d878c9490cd8d8697" rel="noreferrer noopener nofollow">[email protected]</a>/'
print(df)
mssparkutils.fs.ls(df)
File= f'abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="63100c1611000623101a0d0213100610170c1102040602160453514d0705104d000c11064d140a0d070c14104d0d0617" rel="noreferrer noopener nofollow">[email protected]</a>/dir1/CSV'
print(File)
source_df = spark.read.format('csv').option('header','true').option('inferschema','true').load(File)
source_df.show()
destpath = f'abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="67030214130e0906130e080927141e090617140214130815060002061200575549030114490408150249100e090308101449090213" rel="noreferrer noopener nofollow">[email protected]</a>/dir1/'
source_df.write.mode('overwrite').format('Parquet').option('header','true').save(destpath)

在上面的代码中,我为 ADLS GEN 2 创建了链接服务并提供了所需的访问权限

了解更多关于 Secure credentials with linked services using the mssparkutils

注意: 确保 synapse 系统具有该存储帐户的 Blob 存储数据提供者访问权限。 您尝试访问的用户和服务 (ASA) 需要 Blob 存储数据贡献者访问权限。

要使用链接服务设置存储帐户和 Synapse 之间的连接,您可以使用以下代码:

spark.conf.set('spark.storage.synapse.linkedServiceName','AzureDataLakeStorage_ls')
spark.conf.set('fs.azure.account.oauth.provider.type','com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider')
df= f'abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="483b273d3a2b2d083b312629383b2d3b3c273a292f2d293d2f787a662c2e3b662b273a2d663f21262c273f3b66262d3c" rel="noreferrer noopener nofollow">[email protected]</a>/'
print(df)
mssparkutils.fs.ls(df)

上面的代码显示了我们必须使用的容器中的目录 enter image description here

destpath = f'abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="87e3e2f4f3eee9e6f3eee8e9c7f4fee9e6f7f4e2f4f3e8f5e6e0e2e6f2e0b7b5a9e3e1f4a9e4e8f5e2a9f0eee9e3e8f0f4a9e9e2f3" rel="noreferrer noopener nofollow">[email protected]</a>/dir1/'  source_df.write.mode('overwrite').format('Parquet').option('header','true').save(destpath)

上面的代码以Parquet格式将source_df DataFrame写入目标路径。它使用 write.mode('overwrite') 覆盖任何现有数据,使用 format('Parquet') 将输出格式指定为 Parquet,并使用 option('header ','true') 将 header 包含在 Parquet 文件中。最后,使用save(destpath)将数据保存到目标路径。

输出: enter image description here

关于azure - 无法从 Azure Spark 群集写入 Blob 存储,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76823799/

相关文章:

wcf - 确定来自针对 Azure 的 FirstOrDefault 的 WCF 数据服务 LINQ 查询的请求 Uri,而不执行它?

python - 数据框上的多条件过滤器

pandas - Spark 与 Scala 和 Pandas

azure - Synapse 管道从 git 导入资源

Azure DF - 将日期时间从数据库提取到 CSV 时,有时会被解释为 datetime2

azure - VS Team Services 和 Azure 持续交付子目录

azure - 无法使用 Azure 混合连接连接到 SQL Server Analysis Services

azure - OAUTH/Azure 函数 : Method to auth AAD user for endpoints that don't support service principals

pyspark - AWS EMR 集群中的 session 不处于事件状态 Pyspark

azure - Azure Synapse 工作区中列出了重复的用户