python - 如何超时提交给 Dask 的作业?

标签 python dask

我正在使用 Dask 运行一个任务池,通过 as_completed 方法按照它们完成的顺序检索结果,并可能在每次返回时向任务池提交新任务:

# Initial set of jobs
futures = [client.submit(job.run_simulation) for job in jobs]
pool = as_completed(futures, with_results=True)

while True:
    # Wait for a job to finish
    f, result = next(pool)

    # Exit condition
    if result == 'STOP':
        break

    # Do processing and maybe submit more jobs
    more_jobs = process_result(f, result)
    more_futures = [client.submit(job.run_simulation) for job in more_jobs]
    pool.update(more_futures)

这是我的问题:我正在提交的函数 job.run_simulation 有时会挂起很长时间,我想让这个函数超时 - 如果运行时间结束,则终止任务并继续超过一定的时间限制。

理想情况下,我想做类似client.submit(job.run_simulation, timeout=10)的事情,然后让next(pool)返回 如果任务运行时间超过超时时间。

Dask 有什么方法可以帮助我暂停这样的工作吗?

到目前为止我尝试了什么

我的第一直觉是在 job.run_simulation 函数本身中独立于 Dask 处理超时。我见过两种类型的通用 Python 超时建议(例如 here)。

1) 使用两个线程,一个用于函数本身,一个用于定时器。我的印象是这实际上不起作用,因为你不能杀死线程。即使计时器用完,两个线程都必须在任务完成之前完成。

2) 使用两个单独的进程(使用 multiprocessing 模块),一个用于函数,一个用于计时器。这可行,但由于我已经在 Dask 生成的守护进程子进程中,因此不允许我创建新的子进程。

第三种可能性是将代码块移动到我使用 subprocess.run 运行的单独脚本并使用 subprocess.run 内置超时。我可以这样做,但这感觉像是最坏的回退方案,因为它需要大量繁琐的数据传入和传出子流程。

所以感觉要在Dask这个层面完成超时。我在这里的一个想法是在我将任务提交给 Dask 的同时创建一个计时器作为子进程。然后,如果计时器用完,使用 Client.cancel() 停止任务。这个计划的问题是 Dask 可能会在开始任务之前等待工作人员释放,我不希望在任务实际运行之前计时器运行。

最佳答案

你对问题的评估对我来说似乎是正确的,你所经历的解决方案与我考虑的相同。一些注意事项:

  1. Client.cancel 无法阻止已启动的函数运行。这些函数在线程池中运行,因此您遇到了“无法停止线程”的限制。 Dask worker 只是 Python 进程,具有相同的能力和局限性。
  2. 您说您不能使用守护进程中的进程。一种解决方案是通过以下方式之一更改您使用流程的方式:

    • 如果您在单台机器上使用 dask.distributed,那么就不要使用进程

      client = Client(processes=False)
      
    • 不要使用 Dask 的默认保姆进程,那么你的 dask worker 将是一个能够使用多进程的普通进程
    • 将 dask 的 multiprocessing-context 配置设置为 "spawn" 而不是 fork 或 forkserver

不过,解决此问题的简洁方法是在您的函数 job.run_simulation 中解决它。理想情况下,您可以将此超时逻辑下推到该代码并让它干净利落地引发。

关于python - 如何超时提交给 Dask 的作业?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49925130/

相关文章:

pandas - 如何在 blaze 中读取制表符分隔的 CSV?

python - 如何使用dask有效地计算许多简单统计数据

dask - 属性错误 : module 'dask' has no attribute 'delayed'

python - lxml ElementMaker 属性格式化

python - 在 Pandas 中选择带有startswith的列

python - 音频处理

python - 使用 Python xarray 屏蔽 'where' 不再起作用

python - Jinja2 html按钮: Catch POST on different pages

python - Groupby 使用列和索引,然后求和以创建新列

python - 达斯克不安装graphviz依赖