python - 报告 Dask 任务的进度

标签 python parallel-processing distributed-computing dask dask-distributed

我正在 Dask 调度程序上运行许多缓慢的任务,并且我想要每个任务的进度报告。任务将从处理进度报告的同一台机器提交,因此可以保留在同一进程中,但现在我们假设任务提交和进度报告在单独的进程中处理。

Dask 提供 Coordination Primitives其预期用例包括能够监控进度:

These can be used to control access to external resources, track progress of ongoing computations, or share data in side-channels between many workers, clients, and tasks sensibly.

我能想到的最简单的例子如下:

任务提交者:

from dask.distributed import Client, Pub
import time

c = Client('tcp://127.0.0.1:8786')

def slow_func():
    q = Pub('progress')
    for i in range(10):
        q.put(f'{i}/10')
        time.sleep(1)

c.submit(slow_func)

任务记者:

from dask.distributed import Client, Sub

c = Client('tcp://127.0.0.1:8786')
q = Sub('progress')
while True:
    print(q.get())

这适用于 Pub/Sub,但也同样适用于 Queue。现在,尽管它确实有效,但似乎并不是作者的初衷:

  • 我最终隐式地依赖于在进行报告时提交任务的同一个客户端;即 Client 最终位于工作节点上。这感觉很奇怪。
  • 我无法区分不同的任务;理想情况下,我能够在报告中使用诸如 future 之钥之类的内容。

所以,我的问题(诚然有点模糊)是:就创建 Dask future 提供进度报告的“Hello world”风格示例而言,我如何将上述内容修改为可以被认为是惯用的 Dask 的内容,以及是否存在有什么需要注意的陷阱吗?

我可以通过为每个任务创建一个新客户端(下面的示例)来部分解决我的第一个问题,但由于我最终得到的东西看起来工作原理相同,也许这样做是不必要的。

import time
from dask.distributed import Client, Pub

c_submit = Client('tcp://127.0.0.1:8786')

def slow_func():
    c_report = Client('tcp://127.0.0.1:8786')
    q = Pub('progress', client=c_report)
    for i in range(10):
        q.put(f'{i}/10')
        time.sleep(1)

c_submit.submit(slow_func)

最佳答案

问题的第一部分由 dask.distributed.worker_client 的存在来回答。这正是我们所需要的:提供一个与当前工作线程的调度程序对话的客户端。这样,任务提交者就变成了:

import time
from dask.distributed import Client, Pub, worker_client

c_submit = Client('tcp://127.0.0.1:8786')

def slow_func():
    with worker_client() as c_report:
        q = Pub('progress', client=c_report)
        for i in range(10):
            q.put(f'{i}/10')
            time.sleep(1)
c_submit.submit(slow_func)

对于第二部分,一种不太糟糕的方法是在每次提交任务时简单地生成一个 ID。也就是说,做这样的事情:

import time
import uuid

from dask.distributed import Client, Pub, worker_client

c_submit = Client('tcp://127.0.0.1:8786')

def slow_func(task_id):
    with worker_client() as c_report:
        q = Pub('progress', client=c_report)
        for i in range(10):
            q.put(f'{task_id}: {i}/10')
            time.sleep(1)

c_submit.submit(slow_func, uuid.uuid4())

这可以解决我的问题,但是当 future 的 key 中已经有了一个完全可用的 ID 时,使用新的 ID 仍然感觉有点奇怪。

关于python - 报告 Dask 任务的进度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59468124/

相关文章:

python - 产品目录 : filter by parameters

python - 在 python 中并行化这个嵌套的 for 循环

algorithm - 并行化线性时间算法

java - 在 Java 中并行化阻塞调用

python - 从哪里开始分布式计算/并行处理? ( python /C)

c++ - 在多个节点上运行时 MPI_Reduce() 中的死锁

Hadoop shuffle 使用哪种协议(protocol)?

python - 如何在 Python 中使用 flickrapi 解析 Flickr

python - Xgboost DMatrix 初始化减少特征数量

python - 如何使用 Python 自动填写在线表单中的文本区域?