我有一个分布式 dask 集群设置,我用它来加载和转换一堆数据。就像魅力一样。
我想用它并行进行一些处理。这是我的功能
el = 5000
n_using = 26
n_across= 6
mat = np.random.random((el,n_using,n_across))
idx = np.tril_indices(n_across*2, -n_across)
def get_vals(c1, m, el, idx):
m1 = m[c1,:,:]
corr_vals = np.zeros((el, (n_across//2)*(n_across+1)))
for c2 in range(c1+1, el):
corr = np.corrcoef(m1.T, m[c2,:,:].T)
corr_vals[c2] = corr[idx]
return corr_vals
lazy_get_val = dask.delayed(get_vals, pure=True)
这是我正在尝试做的单处理器版本:
arrays = [get_vals(c1, mat, el, idx) for c1 in range(el)]
all_corr = np.stack(arrays, axis=0)
工作正常,但需要几个小时。 这是我在 dask 中执行此操作的方法:
lazy_list = [lazy_get_val(c1, mat, el, idx) for c1 in range(el)]
arrays = [da.from_delayed(lazy_item, dtype=float, shape=(el, 21)) for lazy_item in lazy_list]
all_corr = da.stack(arrays, axis=0)
即使它运行 all_corr[1].compute()
,它也只是坐在那里,不响应。当我中断内核时,它似乎卡在/distributed/utils.py:
~/.../lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
249 else: 250 while not e.is_set(): --> 251 e.wait(10) 252 if error[0]: 253 six.reraise(*error[0])
关于调试这个有什么建议吗?
其他事情:
- 如果我用较小的
mat
(el=1000) 运行它,它运行得很好。 - 如果我设置
el = 5000
,它就会挂起。 - 如果我中断内核并使用
el = 1000
再次运行它,它就会挂起。
最佳答案
将导入添加到示例后,我运行了一些东西,但构建图表时速度非常慢。这可以通过避免将 numpy 数组直接放置在延迟调用中来改进,如下所示:
# mat = np.random.random((el,n_using,n_across))
# idx = np.tril_indices(n_across*2, -n_across)
mat = dask.delayed(np.random.random)((el,n_using,n_across))
idx = dask.delayed(np.tril_indices)(n_across*2, -n_across)
或者通过删除 dask.delayed 的 pure=True
关键字(当您设置 pure=True 时,它必须对所有输入的内容进行哈希处理以获得唯一的 key ,您正在做这 5000 次)。我通过使用 IPython 中的 %snakeviz
魔法分析您的代码发现了这一点。
然后我运行了 all_corr[1].compute()
,结果很好。然后我运行了 all_corr.compute()
,看起来它会完成,但速度不是很快。我怀疑要么是你的任务太小,导致开销太大,要么是你的代码在 Python for 循环中花费了太多时间,因此遇到了 GIL 问题。不确定是哪个。
我建议尝试的下一件事是使用 dask.distributed 调度程序,它可以更好地处理 GIL 问题并加剧开销问题。查看其执行情况可能有助于隔离问题。
关于python - Dask 延迟/Dask 阵列无响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51053513/