python - 使用 Databricks PySpark 从 Azure Blob 存储读取多个 CSV 文件

标签 python azure apache-spark pyspark databricks

如何读取具有不同列和文件路径名的多个 CSV 文件并创建单个数据框。

这里是一个只有 2 条路径的示例。但是,有许多基于 frnfilename 的路径。

"wasbs://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="0665696872676f68637468676b63466765656973687268676b6328646a6964286569746328716f686269717528686372" rel="noreferrer noopener nofollow">[email protected]</a>/blobname/frn=xyz123/filename=file1/myfile34.csv"

"wasbs://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="d8bbb7b6acb9b1b6bdaab6b9b5bd98b9bbbbb7adb6acb6b9b5bdf6bab4b7baf6bbb7aabdf6afb1b6bcb7afabf6b6bdac" rel="noreferrer noopener nofollow">[email protected]</a>/blobname/frn=abc567/filename=file2/myfile84.csv"


  1. 如果不存在值,则可以连接列并用 NA 填充。

  2. 根据 CSV 文件的相应数据帧的路径中提到的 frn 和文件名添加新列。

  3. 在每个 filename 文件夹中,仅存在一个 CSV,但每个 frn 文件夹下有多个 filename 文件夹.

例如:

myfile34.csv
   a  b frn      filename
0  3  4 xyz123   file1   
1  4  5 xyz123   file1
2  5  6 xyz123   file1

myfile84.csv    
   a  c frn      filename
0  1  3 abc567   file2
1  2  4 abc567   file2
2  3  5 abc567   file2

final df
  a b  c    frn      filename
0 3 4  NA   xyz123   file1  
1 4 5  NA   xyz123   file1  
2 5 6  NA   xyz123   file1  
3 1 NA 3    abc567   file2
4 2 NA 4    abc567   file2
5 3 NA 5.   abc567   file2


我尝试过这个:

import databricks.koalas as ks

path = "wasbs://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="2744484953464e49425549464a42674644444852495349464a4209454b4845094448554209504e494348505409494253" rel="noreferrer noopener nofollow">[email protected]</a>/blobname/*/*/"
df = ks.read_csv(path, sep="','",header='infer')

但是我如何连接这些不同的 csv 文件并创建新列(例如 frnfilename)?

最佳答案

我的建议/解决方法是使用 parquet 文件格式,因为它是架构演变情况的最佳格式之一。

当然,我不否认这一事实,我们可以在 pyspark 本身中执行更多步骤,但这不会是直接的。

https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#schema-merging

enter image description here

# Write both dataframes in same location, utilizing append, in parquet format.
df_1.write.mode('append').parquet('<URI_HERE>/out')
df_2.write.mode('append').parquet('<URI_HERE>/out')

# Note: mergeSchema will mege files with different structures, and merge common columns and fill Null values for non-intersecting columns.
spark.read.option('mergeSchema', True).parquet('<URI_HERE>/out').show()

enter image description here

祝你学习愉快!

关于python - 使用 Databricks PySpark 从 Azure Blob 存储读取多个 CSV 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70380974/

相关文章:

azure - Azure AD B2C 页面 UI 自定义不支持 Bootstrap 吗?

apache-spark - 将数据湖与已删除的记录同步

python - 在 Spark 中调用外部 matlab 函数

python - 为什么在 VS Code 中单击调试测试时会收到 "TypeError: message must be set"?

python - 绘制 numpy 数组 : setting axis scale

python - 在 View 中使用的设置中添加变量 - Django

python - 如何在 Microsoft Azure 云服务上部署 django Web 应用程序

azure - 如何停止 WebRole/WorkerRole 的单个实例/VM

python - 遍历 Spark RDD

解析元组的 Pythonic 方式