python - 使用 Dask 根据列值保存多个 csv 文件

标签 python csv dask

我有一个很大的 csv 文件,假设它看起来像这样

ID,PostCode,Value
H1A0A1-00,H1A0A1,0
H1A0A1-01,H1A0A1,0
H1A0A1-02,H1A0A1,0
H1A0A1-03,H1A0A1,0
H1A0A1-04,H1A0A1,1
H1A0A1-05,H1A0A1,0
H1A1G7-0,H1A1G7,0
H1A1G7-1,H1A1G7,0
H1A1G7-2,H1A1G7,0
H1A1N6-00,H1A1N6,0
H1A1N6-01,H1A1N6,0
H1A1N6-02,H1A1N6,0
H1A1N6-03,H1A1N6,0
H1A1N6-04,H1A1N6,0
H1A1N6-05,H1A1N6,0
...

我想按邮政编码值将其拆分,并将具有相同邮政编码的所有行保存为 CSV。我已经尝试过了

postals = data['PostCode'].unique()
for p in postals:
    df = data[data['PostCode'] == p]
    df.to_csv(directory + '/output/demographics/' + p + '.csv', header=False, index=False)

有没有办法使用 Dask 来利用多处理来做到这一点? 谢谢

最佳答案

如果您想保存到 Parquet ,这非常简单

Parquet

import dask.dataframe as dd
import pandas as pd
import os 

fldr = 'data_pq'
data.to_parquet(fldr, partition_on="PostCode")

这会将每个邮政编码的数据保存在名为 PostCode=xxxxxxx 的文件夹中,该文件夹包含与 dask.dataframe 的分区数量一样多的文件。

CSV

这里我建议你使用自定义函数write_file

import dask.dataframe as dd
import pandas as pd
import os 

fldr = "data_csv"
os.makedirs(fldr, exist_ok=True)


def write_file(grp):
    pc = grp["PostCode"].unique()[0]
    grp.to_csv(f"{fldr}/{pc}.csv",
               header=False,
               index=False)
    return None


data.groupby("PostCode")\
    .apply(write_file, meta=('x', 'f8'))\
    .compute()

# the same function works for pandas df too
# data.groupby("PostCode").apply(write_file)

您应该检查它的性能如何工作并最终使用 scheduler .

关于python - 使用 Dask 根据列值保存多个 csv 文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60032055/

相关文章:

python - 按数据框中的日期过滤数据

python - Pandas 解析 csv 错误 - 预期 1 个字段找到 9

kubernetes - 为什么在 Pod 中启动 daskdev/dask 失败?

python - 如何调试 Kubernetes 中部署的 Dask Gateway 中的 CommClosedError

python - 如果请求的数据有时被 gzip 压缩,有时不被压缩,如何使用 pycurl?

python - 使用Python将PDF表单设为只读

python以相同的顺序将字典保存到csv

python - 在虚拟环境中安装python站 pip 包

python - 访问 pandas 数据框中的单元格

python - dask.linalg.solve(A, b) 上求解 Ax = b 时出现 MemoryError