python - 使用 Dask 在大型集合上映射可变执行时间的函数

标签 python pandas dataframe dask

我有大量条目 E 和函数 f: E --> pd.DataFrame。对于不同的输入,函数 f 的执行时间可能会有很大差异。最后,所有 DataFrame 都应该连接成一个 DataFrame。

我想避免的情况是分区(为了示例使用 2 个分区),其中意外地所有快速函数执行都发生在分区 1 上,而所有慢速执行都发生在分区 2 上,因此没有最佳地使用工作人员。

partition 1:
[==][==][==]

partition 2:
[============][=============][===============]

--------------------time--------------------->

我目前的解决方案是迭代条目集合并使用 delayed 创建一个 Dask 图形,聚合延迟的部分 DataFrame 导致最终结果 DataFrame 与 dd.from_delayed.

delayed_dfs = []  

for e in collection:
    delayed_partial_df = delayed(f)(e, arg2, ...)

    delayed_dfs.append(delayed_partial_df)

result_df = from_delayed(delayed_dfs, meta=make_meta({..}))

我推断 Dask 调度程序会负责将工作最佳分配给可用的工作人员。

  1. 这是一个正确的假设吗?
  2. 您认为整体方法合理吗?

最佳答案

正如上面的评论所说,是的,你这样做是明智的。

任务最初会分配给 worker ,但如果一些 worker 比其他 worker 先完成分配的任务,那么他们会动态地从那些工作量过大的 worker 那里窃取任务。

另外如评论中所述,您可以考虑使用诊断仪表板来很好地了解调度程序正在做什么。所有关于工作负载、工作窃取等的信息都很容易查看。

http://distributed.readthedocs.io/en/latest/web.html

关于python - 使用 Dask 在大型集合上映射可变执行时间的函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47242819/

相关文章:

python - 如何从打印的数据框字符串加载数据框?

python - 如何用单个参数进行多重替换

python - pandas如何根据逗号将单行拆分为多行并删除第三个括号和单引号?

python - dataframe 将每一列保存在单独的 CSV 文件中

python - Pandas:按列分组时获取滚动总和

r - 如何将单词列表 (chr) 与数据帧中多列中的值进行比较,并在 R 中存在匹配时输出二进制响应

python - Popen 参数中的十六进制字符

python - 在 numpy 中转换为数组时列表元素的违反直觉的截断?

python - 从特定日期的 pandas DataFrame 中选择行

python - 尝试将值分配给 groupby 对象的新列时出现 NotImplementedError