azure - 使用 pyspark 从 azure 文件共享读取数据

标签 azure pyspark azure-data-factory azure-file-share

嘿,大家有谁知道如何从 azure 文件共享读取 qvd 数据

我想读取这些 qvd 并将其转换为 parquet,然后使用 ADF 将数据加载到容器

但是我在使用突触从指定文件共享读取数据时遇到问题,这就是路径的定义方式

base_path = f'abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="3c5a5550594f545d4e597c4f48534e5d5b59525d5159125a555059125f534e59124b555258534b4f12525948" rel="noreferrer noopener nofollow">[email protected]</a>'
adf_path= f'{base_path}/WMOS/WMOS'

我明白了

Server failed to authenticate the request. Make sure the value of Authorization header is formed correctly including the signature

但是相同的代码但对于同一容器中的 blob 是有效的

最佳答案

首先,确保您拥有这些版本之间的 python [3.6 - 3.9]

在创建 apache Spark 池时,选择 Spark 版本,Python 版本应介于上述之间。

接下来,在突触工作区中创建一个笔记本,添加以下代码。 要从Azure文件共享获取数据,您需要将其下载到本地并读入pandas,然后读取spark dataframe。

将以下代码块添加到您的笔记本中。

pip install azure-storage-file-share==12.1.0 qvd

安装所需的软件包。

from qvd import qvd_reader
localpath="tmp.qvd"
connection_string = "Your_conn_string_to_storage_account"
share_name = "Your_file_share_name"
directory_name = "dir_name_in_fileshare"
file_name = "Airlines.qvd"

def  download_from_file_storage():
    share_client = ShareClient.from_connection_string(connection_string, share_name)
    file_client = share_client.get_file_client(directory_name + '/' + file_name)
    with  open(localpath, "wb") as  file:
        download_stream = file_client.download_file()
        file.write(download_stream.readall())
        
download_from_file_storage()

将文件下载到本地文件系统的函数。

from pyspark.sql.functions import col
df = qvd_reader.read(localpath)
s_df = spark.createDataFrame(df)
s_df = s_df.withColumn("AirlineId",col("%Airline ID")).drop(col("%Airline ID"))
display(s_df)

这里,从本地读取 qvd 文件并将其转换为 Spark 数据帧。

enter image description here

接下来,使用链接服务将该数据作为 parquet 写入 adls2 存储。

enter image description here

linkedServiceName_var = "adls2link"
spark.conf.set("fs.azure.account.auth.type", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type", "com.microsoft.azure.synapse.tokenlibrary.LinkedServiceBasedSASProvider")
spark.conf.set("spark.storage.synapse.linkedServiceName", linkedServiceName_var)

raw_container_name = "data"
raw_storageaccount_name = "jgsadls2"
relative_path = "qvd_parquet"
path = f"abfss://{raw_container_name}@{raw_storageaccount_name}.dfs.core.windows.net/qvd_parquet"
s_df.write.parquet(path)

在执行此操作之前,您需要创建指向 adls 存储的链接服务。

输出:

enter image description here

enter image description here

如果您想在管道中使用它,请将此笔记本添加到管道,退出值为 path 并运行它。 然后获取管道输出中的路径并进一步使用它。

enter image description here

关于azure - 使用 pyspark 从 azure 文件共享读取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/76707097/

相关文章:

GitLab 持续部署到 Azure Web 应用程序

c# - Azure 事件目录域

python - 如何将 PySpark 函数的返回类型指定为数据框?

pyspark - 使用 pyspark 对列上的值求和

python - Spark Dataframe 在性能上比 Pandas Dataframe 有何优势?

azure - ADF 映射数据流 - 重用单个运行的 Spark 集群来并行执行映射数据流

azure - 无法使用 "Connect-ServiceFabricCluster"连接到 ServiceFabricCluster

windows - 如何使用 Azure 防火墙处理入口网站流量?

Azure 数据工厂 IP 地址问题

c# - 如何处理 U-SQL EXTRACT 语句中丢失的文件?