我正在 azure synapse 中使用 pyspark 创建笔记本来创建 parquet 文件。
运行时,我收到以下错误原因:org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: 在非空目录上不允许此操作。我的目录是空了,我去检查文件夹,里面写了一些文件,但几分钟后出现错误。
感谢您的关注。
我使用以下代码
df.write.mode('overwrite').parquet(path)
最佳答案
我尝试了相同的代码df.write.mode('overwrite').parquet(path)
并且我遇到了与您收到的相同的错误。
为了解决这个问题,我尝试了从位于 source 容器的 "dir1/CSV" 目录中的 CSV 文件读取数据的方法>Azure Data Lake Storage Gen2,将其转换为DataFrame,然后将其写回到“destination”容器的“dir1/”目录中 Parquet 格式。
下面是代码:
sourceaccountname ="synapsestorageaug02"
Sourcecontainer= "source"
SourceLinked Service = "AzureDataLakeStorage_ls"
Sourcefilelocation = " dir1/CSV"
Destinationaccountname="synapsestorageaug02"
destinationcontainer="destination"
destinationfile="dir1/"
spark.conf.set('spark.storage.synapse.linkedServiceName','AzureDataLakeStorage_ls')
spark.conf.set('fs.azure.account.oauth.provider.type','com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider')
df= f'abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="e3908c96918086a3909a8d8293908690978c91828486829684d3d1cd878590cd808c9186cd948a8d878c9490cd8d8697" rel="noreferrer noopener nofollow">[email protected]</a>/'
print(df)
mssparkutils.fs.ls(df)
File= f'abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="63100c1611000623101a0d0213100610170c1102040602160453514d0705104d000c11064d140a0d070c14104d0d0617" rel="noreferrer noopener nofollow">[email protected]</a>/dir1/CSV'
print(File)
source_df = spark.read.format('csv').option('header','true').option('inferschema','true').load(File)
source_df.show()
destpath = f'abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="67030214130e0906130e080927141e090617140214130815060002061200575549030114490408150249100e090308101449090213" rel="noreferrer noopener nofollow">[email protected]</a>/dir1/'
source_df.write.mode('overwrite').format('Parquet').option('header','true').save(destpath)
在上面的代码中,我为 ADLS GEN 2 创建了链接服务并提供了所需的访问权限
了解更多关于 Secure credentials with linked services using the mssparkutils
注意: 确保 synapse 系统具有该存储帐户的 Blob 存储数据提供者访问权限。 您尝试访问的用户和服务 (ASA) 需要 Blob 存储数据贡献者访问权限。
要使用链接服务设置存储帐户和 Synapse 之间的连接,您可以使用以下代码:
spark.conf.set('spark.storage.synapse.linkedServiceName','AzureDataLakeStorage_ls')
spark.conf.set('fs.azure.account.oauth.provider.type','com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedTokenProvider')
df= f'abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="483b273d3a2b2d083b312629383b2d3b3c273a292f2d293d2f787a662c2e3b662b273a2d663f21262c273f3b66262d3c" rel="noreferrer noopener nofollow">[email protected]</a>/'
print(df)
mssparkutils.fs.ls(df)
上面的代码显示了我们必须使用的容器中的目录
destpath = f'abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="87e3e2f4f3eee9e6f3eee8e9c7f4fee9e6f7f4e2f4f3e8f5e6e0e2e6f2e0b7b5a9e3e1f4a9e4e8f5e2a9f0eee9e3e8f0f4a9e9e2f3" rel="noreferrer noopener nofollow">[email protected]</a>/dir1/' source_df.write.mode('overwrite').format('Parquet').option('header','true').save(destpath)
上面的代码以Parquet格式将source_df DataFrame写入目标路径。它使用 write.mode('overwrite') 覆盖任何现有数据,使用 format('Parquet') 将输出格式指定为 Parquet,并使用 option('header ','true') 将 header 包含在 Parquet 文件中。最后,使用save(destpath)将数据保存到目标路径。
输出:
关于azure - 无法从 Azure Spark 群集写入 Blob 存储,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76823799/