如何读取具有不同列和文件路径名的多个 CSV 文件并创建单个数据框。
这里是一个只有 2 条路径的示例。但是,有许多基于 frn
和 filename
的路径。
"wasbs://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="0665696872676f68637468676b63466765656973687268676b6328646a6964286569746328716f686269717528686372" rel="noreferrer noopener nofollow">[email protected]</a>/blobname/frn=xyz123/filename=file1/myfile34.csv"
"wasbs://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="d8bbb7b6acb9b1b6bdaab6b9b5bd98b9bbbbb7adb6acb6b9b5bdf6bab4b7baf6bbb7aabdf6afb1b6bcb7afabf6b6bdac" rel="noreferrer noopener nofollow">[email protected]</a>/blobname/frn=abc567/filename=file2/myfile84.csv"
如果不存在值,则可以连接列并用 NA 填充。
根据 CSV 文件的相应数据帧的路径中提到的
frn
和文件名添加新列。在每个
filename
文件夹中,仅存在一个 CSV,但每个frn
文件夹下有多个filename
文件夹.
例如:
myfile34.csv
a b frn filename
0 3 4 xyz123 file1
1 4 5 xyz123 file1
2 5 6 xyz123 file1
myfile84.csv
a c frn filename
0 1 3 abc567 file2
1 2 4 abc567 file2
2 3 5 abc567 file2
final df
a b c frn filename
0 3 4 NA xyz123 file1
1 4 5 NA xyz123 file1
2 5 6 NA xyz123 file1
3 1 NA 3 abc567 file2
4 2 NA 4 abc567 file2
5 3 NA 5. abc567 file2
我尝试过这个:
import databricks.koalas as ks
path = "wasbs://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="2744484953464e49425549464a42674644444852495349464a4209454b4845094448554209504e494348505409494253" rel="noreferrer noopener nofollow">[email protected]</a>/blobname/*/*/"
df = ks.read_csv(path, sep="','",header='infer')
但是我如何连接这些不同的 csv
文件并创建新列(例如 frn
和 filename
)?
最佳答案
我的建议/解决方法是使用 parquet
文件格式,因为它是架构演变情况的最佳格式之一。
当然,我不否认这一事实,我们可以在 pyspark 本身中执行更多步骤,但这不会是直接的。
https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#schema-merging
# Write both dataframes in same location, utilizing append, in parquet format.
df_1.write.mode('append').parquet('<URI_HERE>/out')
df_2.write.mode('append').parquet('<URI_HERE>/out')
# Note: mergeSchema will mege files with different structures, and merge common columns and fill Null values for non-intersecting columns.
spark.read.option('mergeSchema', True).parquet('<URI_HERE>/out').show()
祝你学习愉快!
关于python - 使用 Databricks PySpark 从 Azure Blob 存储读取多个 CSV 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70380974/