我有一个 Dask 工作集群,我想用它们来使用复杂模型并行化预测操作。模型文件很大并且需要时间来加载,因此我使用 client.run 让所有工作人员运行初始化函数来加载该模型。
如何从 client.run
函数中保留 Python 变量状态,以便我可以在将来的任务操作中引用并使用它?
我找到了 dask.distributed.get_worker
和 worker.data
字典,并使用它来设置任意值,然后我可以在 map_partition
函数中访问,但不确定这是否是最好或最安全的选项。
如果某个工作线程死亡并重新启动,或者其他工作线程加入集群,是否有办法让这些工作线程自动调用我最初传递给 client.run
的相同函数?
最佳答案
只需使用 future
如果你的模型/状态不变,那么我可能只会使用 client.scatter
将其发送出去,并让 Dask 根据需要复制它。这是最简单的方法,也是最稳健的方法。如果有新的工作人员到达,那么它将根据需要复制它。
是的,使用 get_worker 很有意义
但是,如果您想自己管理状态,那么是的,运行一个函数,获取一些状态,并将其附加到工作线程是一个好主意:
get_worker().my_special_state = x
我不建议将数据放入 get_worker().data
中,因为这是 Dask 管理其自身内存的地方。当它看到它不知道的异物时,它可能会感到困惑。事情应该会好起来,但你永远不知道。
工作插件
If a worker dies and is re-started, or if other workers join the cluster, is there a way to have those workers automatically call the same function I originally passed to client.run?
是的,这里最简单的方法是使用预加载脚本或工作插件。请参阅https://docs.dask.org/en/latest/setup/custom-startup.html
关于python - 如何在执行 Dask 任务之间保留一些 Python 对象状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58126830/