python - 自定义 Dask 可遍历对象

标签 python dask dask-delayed

我使用了类似对象的自定义字典来轻松存储 Dask 图的结果,但使用结果对象来计算 Dask 图,不会计算其子项。

是否可以更改自定义对象,以便 Dask 能够遍历并计算其子对象?

一个例子:

import dask
import dask.delayed as delayed
from collections import defaultdict
print('Dask version', dask.__version__)

Dictionary1 = {}
Dictionary1['a'] = delayed(sum)([2,3])

print('Native Dict', dask.compute(Dictionary1) )

Dictionary2 = defaultdict(defaultdict)
Dictionary2['a'] = delayed(sum)([2,3])

print('Custom Dict', dask.compute(Dictionary2) )

结果输出:

Dask version 0.19.2
Native Dict ({'a': 5},)
Custom Dict (defaultdict(<class 'collections.defaultdict'>, {'a': Delayed('sum-212db0df-1c14-4314-9a56-2eb87ef58abe')}),)

编辑:基于 MRocklin 答案的解决方案

import dask
import dask.delayed as delayed
from collections import defaultdict
from dask.base import DaskMethodsMixin

class DefaultDictDict(defaultdict, DaskMethodsMixin):
  def __init__(self, *args ): ## Define an infinite nested dict.
    return defaultdict.__init__(self, DefaultDictDict, *args)

  def __dask_graph__(self):
    ## NOTE: Errors in this functions are silent, and disable collections interface
    ## The dask attributes are already a graph with key to itself.
    a = dict()
    self._keys = []
    for x in self.values():
      if not hasattr(x,'dask'): ## Use dummy delayed to convert objects to graphs.
        x = delayed(lambda data:data)(x)
      a.update(x.dask) 
      self._keys.append(x.key)
    return a

  def __dask_keys__(self):
    return self._keys

  __dask_scheduler__ = staticmethod(dask.threaded.get)
  def __dask_postcompute__(self):
      def Reconstruct(results):
        return DefaultDictDict(zip(self.keys(), results))
      return Reconstruct, ()  

Dictionary3 = DefaultDictDict()
Dictionary3['b']['c'] = delayed(sum)([2,3])
print('Collections Dict', dask.compute(Dictionary3)[0] )

结果:

Collections Dict defaultdict(<class '__main__.DefaultDictDict'>, {'b': defaultdict(<class '__main__.DefaultDictDict'>, {'c': 5})})

(它仍然显示 defaultdict,因为 __repr__ 没有被正确覆盖)

最佳答案

目前 Dask 仅遍历标准核心 Python 集合(字典、列表等)。自 2018 年 10 月 7 日起,此行为不可扩展。

但是,您可以创建自己的 Dask 集合,它本质上只是传递其成员的图表和键。请参阅http://docs.dask.org/en/latest/custom-collections.html

关于python - 自定义 Dask 可遍历对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52687884/

相关文章:

python - 从一个来源计算多个 dask.dataframe.from_delayed()

amazon-web-services - `dask-kubernetes` 调度程序 - AWS 上的工作人员

parquet - 在单个多核机器上索引大型 dask 数据帧时的内存使用情况

python - Dask groupby 索引列

python - 组合字典时未指定长度的 Dask 延迟对象不可迭代错误

python - 如何在包导入时动态加载模块?

python - 将一个 pdb 命令转换为另一个命令,但它不起作用

python - 我如何在Python网络中检查IP地址范围

python - 找到 reCAPTCHA 元素并单击它 - Python + Selenium

dask - 如何在文件中捕获 dask-worker 控制台日志。?