python - 了解 Dask 中分区的工作原理

标签 python dask

我有一个包含 17,850,209 行的 CSV,对于 Pandas 来说太大了,无法处理我的整个代码,因此我尝试使用 Dask 对其进行操作。我的所有代码都“有效”,但是当我将 CSV 写入磁盘时,我没有获得所有 17,850,209 条记录。相反,我得到了 N 个 CSV(其中 N = npartitions),每个 CSV 仅包含 50,000 条记录,在本例中总共有 900,000 条记录。

首先,我读取原始 CSV 并使用前 2 行和时间戳创建干净的数据框:

import pandas as pd 
import numpy as np
import time as t 
import dask.dataframe as dd


my_dtypes = {
    'uid': object, 
    'state': object, 
    'var01': np.float64, 
    'var02': np.float64
    }

df_raw = pd.read_csv('/Users/me/input_data/stackoverflow_raw.csv', dtype = my_dtypes, sep=',') 

df_clean = pd.DataFrame(df_raw['uid'].str.strip().str.replace('{','').str.replace('}',''))

df_clean['state'] = pd.DataFrame(df_raw['state'].str.strip())

df_clean['rowcreatetimestamp'] = t.strftime("%Y-%m-%d %H:%M:%S")

这给了我以下(正确的)计数:

df_clean.count()
# uid                   17850209
# state                 17850209
# rowcreatetimestamp    17850209
# dtype: int64

然后,我将其移动到卡盘大小为 1,000,000 的 Dask(我团队的大多数机器都可以处理)。

df_clean = dd.from_pandas(df_clean, chunksize=1000000) 

df_clean
# dd.DataFrame<from_pa..., npartitions=18, divisions=(0, 1000000, 2000000, ..., 17000000, 17850208)>

df_clean.compute()
# [17850209 rows x 3 columns]

df_clean.count().compute()
# uid                   17850209
# state                 17850209
# rowcreatetimestamp    17850209
# dtype: int64

然而,当我执行第一个 Dask 操作时,它仅“保留”数据帧的 900,000 行并创建 50,000 行新列:

df_clean['var01'] = dd.from_array(np.where((df_raw['var01'] > 0), 1, 0))

df_clean.compute()
# [900000 rows x 4 columns]

df_clean.count().compute()
uid                   900000
state                 900000
rowcreatetimestamp    900000
var01                  50000
dtype: int64

当我将 Dask 数据帧写入磁盘时,我得到 18 个 CSV,每个 50,000 条记录。我使用了compute=True参数并省略了它并得到了相同的结果:

df_clean.to_csv('/Users/me/input_data/stackoverflow_clean_*.csv', header=True, sep=',', index=False, compute=True)

df_clean.to_csv('/Users/me/input_data/stackoverflow_clean_*.csv', header=True, sep=',', index=False)

当我写入单个文件时,我得到 900,000 条记录加上标题:

df_clean.compute().to_csv('/Users/me/input_data/stackoverflow_clean_one_file.csv', header=True, sep=',', index=False)

(在 bash 中)

wc -l '/Users/me/input_data/stackoverflow_clean_one_file.csv' 
900001

虽然 900,000 条记录是错误的,但当我打开 CSV 时,只有前 50,000 行包含 var01 的数据。

我搜索了latest documentation但还没有看到我在输出包含所有数据的 block 文件或具有正确行数的单个文件方面缺少什么。

TIA。

最佳答案

这句话有点奇怪

df_clean['var01'] = dd.from_array(np.where((df_raw['var01'] > 0), 1, 0))

您将 dask.dataframe、dask.array 和 numpy 混合在一起。即使支持这种行为(这是不确定的),像这样混合惰性操作和具体操作可能会非常非常慢。

我建议使用dd.Series.where

df_clean['var01'] = df_raw.var01.where(df_raw.var01 > 0, 1)
df_clean['var01'] = df_raw.var01.where(df_raw.var01 < 0, 0)

关于python - 了解 Dask 中分区的工作原理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42787984/

相关文章:

Python Qt : How to catch "return" in qtablewidget

python - 如何在 Kivy Listview 中格式化滚动条

python - Dask Dataframe 将列表的列拆分为多列

python - Dask Future 的状态为 "cancelled"且尚未取消

dask - 为什么在 dask 中计算索引 Parquet 文件的形状如此缓慢?

python - 在 python 2 和 3 的第一行打印

javascript - 我将字符串发布到后端,但后端返回状态代码 500 : View function did not return a response

python - Selenium Firefox webdriver 导致错误 : Service geckodriver unexpectedly exited. 状态代码为:2

python - 如何超时提交给 Dask 的作业?

python - 将 Matplotlib 与 Dask 结合使用