使用当前版本的 时黎明 ('0.7.5', github: [a1]) 由于数据量大,我可以通过 进行分区计算dask.dataframe api。但是对于作为记录存储在 bcolz ('0.12.1', github: [a2]) 中的大型 DataFrame,我在执行此操作时遇到了 IndexError:
import dask.dataframe as dd
import bcolz
ctable = bcolz.open('test.bcolz', mode='r')
df_dd = dd.from_bcolz(ctable, chunksize=int(1E6))
# some calculations
z1 = (df_dd[['c0', 'c1']]*np.r_[1, 1j]).sum(axis=1)
z2 = (df_dd[['c2', 'c3']]*np.r_[1, 1j]).sum(axis=1)
df_dd_out = dd.concat([z1.to_frame('z1'), z2.to_frame('z2')], axis=1)
# actual computation
df_dd_out.compute()
错误是(缩写的回溯输出):
# ...
File "/usr/local/lib/python3.5/dist-packages/dask/async.py", line 481, in get_async
raise(remote_exception(res, tb))
dask.async.IndexError: index out of bounds
实际上,只有在执行 dd.concat 操作时才会出现错误。就像是
out = (z1.to_frame('z1') + z2.to_frame('z2')).compute()
正在工作。
但是,当在内存中读取部分数据时,在某些情况下也会出现此错误,至少对于分区长度 (npartition) >1 和特定数据大小而言。
ctable_mem_b = ctable[:int(1E7)] # larger in-memory copy
df_dd_mem_b = dd.from_pandas(pd.DataFrame.from_records(ctable_mem_b),
npartitions=10)
查看完整的测试代码 _test_dask_error.py ,以及带有回溯的完整输出 _test_out.txt .
实际上,在那一步我停止了调查,因为我不知道如何在 async.py 中将这个错误调试为根本原因。当然,我会将其报告为错误(如果没有提示用户/使用错误)。但是:如何进行调试以找到根本原因?
_[a1]:_https://github.com/blaze/dask/tree/077b1b82ad03f855a960d252df2aaaa72b5b1cc5
_[a2]:_https://github.com/Blosc/bcolz/tree/562fd3092d1fee17372c11cadca54d1dab10cf9a
最佳答案
取自 FAQ的dask documentation
问:使用 dask 时如何调试我的程序?
如果您想使用 Python 调试器深入了解
令人沮丧的是异步调度程序,因为它们运行您的
不同工作人员的代码,无法提供对 Python 的访问
调试器。幸运的是,您可以更改为同步调度程序,例如dask.get
或 dask.async.get_sync
通过提供 get=
关键词
到compute
方法::
my_array.compute(get=dask.async.get_sync)
两个
dask.async.get_sync
和 dask.get
将提供回溯遍历。
dask.async.get_sync
使用相同的异步机制调度程序,但只有一名工作人员。
dask.get
非常简单,但确实不缓存数据,因此对于某些工作负载可能会很慢。
评论
我很想知道问题是什么。如果使用上述方法后原因不是很明显,那么我建议在 dask issue tracker 上提出问题。 .
关于pandas - 为什么 dask.dataframe compute() 结果在特定情况下会给出 IndexError?如何找到异步错误的原因?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34426672/