python - dask DataFrame 查询然后示例错误

标签 python parquet dask

我试图从 read_parquet 连接 DaskDataFrame,然后应用查询过滤器,然后对其进行采样以将最终数据帧大小限制为小于或等于 10000。这是伪代码:

import dask.dataframe as dd

df = dd.concat([ dd.read_parquet(path, index='date').query("(col0 < 4) & (date < '20170201')")  
                 for path in files ], 
               interleave_partitions=True)
df = df.sample(float(10000) / max(10000, len(df)))
df = df.compute()

但是,它失败了:

ValueError: a must be greater than 0

Traceback
---------
  File "/opt/anaconda2/lib/python2.7/site-packages/dask/async.py", line 266, in execute_task
    result = _execute_task(task, data)  
  File "/opt/anaconda2/lib/python2.7/site-packages/dask/async.py", line 247, in _execute_task
    return func(*args2)  
  File "/opt/anaconda2/lib/python2.7/site-packages/dask/dataframe/methods.py", line 143, in sample
    return df.sample(random_state=rs, frac=frac, replace=replace)  
  File "/opt/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 2644, in sample
    locs = rs.choice(axis_length, size=n, replace=replace, p=weights)  
  File "mtrand.pyx", line 1391, in mtrand.RandomState.choice (numpy/random/mtrand/mtrand.c:16430)  

如果我不执行 .query(...) 部分,那么它就可以正常工作。如果我在示例之后应用查询,也可以,但是我无法控制最终的 DataFrame 大小。我在这里尝试做的事情有什么问题吗?

我正在运行 OS X 10.10.5、fastparquet 0.0.5、dask 0.14.1、python 2.7.12。

最佳答案

由于某些 pandas DataFrame 为空,因此引发“ValueError:a 必须大于 0”错误。这个 ValueError 是由 pandas.DataFrame.sample 方法抛出的。因为我们在 dask 查询之后进行采样,并且并非所有查询子任务都会产生非空的 pandas.DataFrame,所以这个 ValueError 几乎肯定会发生。

正确的修复应该在 dask.dataframe 代码中:如果 df 为空,则返回它本身,否则调用 df.sample:

> /opt/anaconda2/lib/python2.7/site-packages/dask/dataframe/methods.py(166)sample()
164 def sample(df, state, frac, replace):
165     rs = np.random.RandomState(state)
--> 166     return df.sample(random_state=rs, frac=frac, replace=replace)

i.e. return df.sample(random_state=rs, frac=frac, replace=replace) \
            if len(df) > 0 else df

关于python - dask DataFrame 查询然后示例错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43578601/

相关文章:

parquet - 从 Python 增量写入 Parquet 数据集

java - 在java中创建 Parquet 文件

python - 计算按另一列值分组的列值在 Pandas 数据框中的共现

python - Dask 2.1.0, key 错误 : 'Column not found: 0'

python - 使用字典将重复行映射到原始行 - Python 3.6

Python:广告列,每行具有平均值

pyspark - 为什么在胶水 pyspark ETL 作业中无法添加到 Parquet 表中的新列?

python - dask 数据帧 head() 返回空 df

python - numpy:通过引用传递对自身不起作用

python - 在 sqlalchemy 实体上全局添加一个 order_by?