我正在 Dask 调度程序上运行许多缓慢的任务,并且我想要每个任务的进度报告。任务将从处理进度报告的同一台机器提交,因此可以保留在同一进程中,但现在我们假设任务提交和进度报告在单独的进程中处理。
Dask 提供 Coordination Primitives其预期用例包括能够监控进度:
These can be used to control access to external resources, track progress of ongoing computations, or share data in side-channels between many workers, clients, and tasks sensibly.
我能想到的最简单的例子如下:
任务提交者:
from dask.distributed import Client, Pub
import time
c = Client('tcp://127.0.0.1:8786')
def slow_func():
q = Pub('progress')
for i in range(10):
q.put(f'{i}/10')
time.sleep(1)
c.submit(slow_func)
任务记者:
from dask.distributed import Client, Sub
c = Client('tcp://127.0.0.1:8786')
q = Sub('progress')
while True:
print(q.get())
这适用于 Pub
/Sub
,但也同样适用于 Queue
。现在,尽管它确实有效,但似乎并不是作者的初衷:
- 我最终隐式地依赖于在进行报告时提交任务的同一个
客户端
;即Client
最终位于工作节点上。这感觉很奇怪。 - 我无法区分不同的任务;理想情况下,我能够在报告中使用诸如 future 之钥之类的内容。
所以,我的问题(诚然有点模糊)是:就创建 Dask future 提供进度报告的“Hello world”风格示例而言,我如何将上述内容修改为可以被认为是惯用的 Dask 的内容,以及是否存在有什么需要注意的陷阱吗?
我可以通过为每个任务创建一个新客户端(下面的示例)来部分解决我的第一个问题,但由于我最终得到的东西看起来工作原理相同,也许这样做是不必要的。
import time
from dask.distributed import Client, Pub
c_submit = Client('tcp://127.0.0.1:8786')
def slow_func():
c_report = Client('tcp://127.0.0.1:8786')
q = Pub('progress', client=c_report)
for i in range(10):
q.put(f'{i}/10')
time.sleep(1)
c_submit.submit(slow_func)
最佳答案
问题的第一部分由 dask.distributed.worker_client
的存在来回答。这正是我们所需要的:提供一个与当前工作线程的调度程序对话的客户端。这样,任务提交者就变成了:
import time
from dask.distributed import Client, Pub, worker_client
c_submit = Client('tcp://127.0.0.1:8786')
def slow_func():
with worker_client() as c_report:
q = Pub('progress', client=c_report)
for i in range(10):
q.put(f'{i}/10')
time.sleep(1)
c_submit.submit(slow_func)
对于第二部分,一种不太糟糕的方法是在每次提交任务时简单地生成一个 ID。也就是说,做这样的事情:
import time
import uuid
from dask.distributed import Client, Pub, worker_client
c_submit = Client('tcp://127.0.0.1:8786')
def slow_func(task_id):
with worker_client() as c_report:
q = Pub('progress', client=c_report)
for i in range(10):
q.put(f'{task_id}: {i}/10')
time.sleep(1)
c_submit.submit(slow_func, uuid.uuid4())
这可以解决我的问题,但是当 future 的 key 中已经有了一个完全可用的 ID 时,使用新的 ID 仍然感觉有点奇怪。
关于python - 报告 Dask 任务的进度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59468124/