python - 如何在执行 Dask 任务之间保留一些 Python 对象状态?

标签 python dask dask-distributed

我有一个 Dask 工作集群,我想用它们来使用复杂模型并行化预测操作。模型文件很大并且需要时间来加载,因此我使用 client.run 让所有工作人员运行初始化函数来加载该模型。

如何从 client.run 函数中保留 Python 变量状态,以便我可以在将来的任务操作中引用并使用它?

我找到了 dask.distributed.get_workerworker.data字典,并使用它来设置任意值,然后我可以在 map_partition 函数中访问,但不确定这是否是最好或最安全的选项。

如果某个工作线程死亡并重新启动,或者其他工作线程加入集群,是否有办法让这些工作线程自动调用我最初传递给 client.run 的相同函数?

最佳答案

只需使用 future

如果你的模型/状态不变,那么我可能只会使用 client.scatter 将其发送出去,并让 Dask 根据需要复制它。这是最简单的方法,也是最稳健的方法。如果有新的工作人员到达,那么它将根据需要复制它。

是的,使用 get_worker 很有意义

但是,如果您想自己管理状态,那么是的,运行一个函数,获取一些状态,并将其附加到工作线程是一个好主意:

get_worker().my_special_state = x

我不建议将数据放入 get_worker().data 中,因为这是 Dask 管理其自身内存的地方。当它看到它不知道的异物时,它可能会感到困惑。事情应该会好起来,但你永远不知道。

工作插件

If a worker dies and is re-started, or if other workers join the cluster, is there a way to have those workers automatically call the same function I originally passed to client.run?

是的,这里最简单的方法是使用预加载脚本或工作插件。请参阅https://docs.dask.org/en/latest/setup/custom-startup.html

关于python - 如何在执行 Dask 任务之间保留一些 Python 对象状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58126830/

相关文章:

python - 将 dask 数据框中的列转换为 Doc2Vec 的 TaggedDocument

python - 有效日期范围单热编码 groupby

dask - 按顺序迭代一个 dask 包

dask - 在群集上运行的 Dask 程序中找不到文件错误

python - 我如何在 Dask 分布式工作人员之间共享一个大型只读对象

Python:将日期字符串转换为UTC

Python 套接字、数据和字典

python - 指定 das 的仪表板端口

python - 编写一个 pytest 函数来检查输出到 python 中的文件?

Python文本提取