我正在使用 Dask 运行一个任务池,通过 as_completed
方法按照它们完成的顺序检索结果,并可能在每次返回时向任务池提交新任务:
# Initial set of jobs
futures = [client.submit(job.run_simulation) for job in jobs]
pool = as_completed(futures, with_results=True)
while True:
# Wait for a job to finish
f, result = next(pool)
# Exit condition
if result == 'STOP':
break
# Do processing and maybe submit more jobs
more_jobs = process_result(f, result)
more_futures = [client.submit(job.run_simulation) for job in more_jobs]
pool.update(more_futures)
这是我的问题:我正在提交的函数 job.run_simulation
有时会挂起很长时间,我想让这个函数超时 - 如果运行时间结束,则终止任务并继续超过一定的时间限制。
理想情况下,我想做类似client.submit(job.run_simulation, timeout=10)
的事情,然后让next(pool)
返回无
如果任务运行时间超过超时时间。
Dask 有什么方法可以帮助我暂停这样的工作吗?
到目前为止我尝试了什么
我的第一直觉是在 job.run_simulation
函数本身中独立于 Dask 处理超时。我见过两种类型的通用 Python 超时建议(例如 here)。
1) 使用两个线程,一个用于函数本身,一个用于定时器。我的印象是这实际上不起作用,因为你不能杀死线程。即使计时器用完,两个线程都必须在任务完成之前完成。
2) 使用两个单独的进程(使用 multiprocessing
模块),一个用于函数,一个用于计时器。这可行,但由于我已经在 Dask 生成的守护进程子进程中,因此不允许我创建新的子进程。
第三种可能性是将代码块移动到我使用 subprocess.run
运行的单独脚本并使用 subprocess.run
内置超时。我可以这样做,但这感觉像是最坏的回退方案,因为它需要大量繁琐的数据传入和传出子流程。
所以感觉要在Dask这个层面完成超时。我在这里的一个想法是在我将任务提交给 Dask 的同时创建一个计时器作为子进程。然后,如果计时器用完,使用 Client.cancel()
停止任务。这个计划的问题是 Dask 可能会在开始任务之前等待工作人员释放,我不希望在任务实际运行之前计时器运行。
最佳答案
你对问题的评估对我来说似乎是正确的,你所经历的解决方案与我考虑的相同。一些注意事项:
Client.cancel
无法阻止已启动的函数运行。这些函数在线程池中运行,因此您遇到了“无法停止线程”的限制。 Dask worker 只是 Python 进程,具有相同的能力和局限性。您说您不能使用守护进程中的进程。一种解决方案是通过以下方式之一更改您使用流程的方式:
如果您在单台机器上使用 dask.distributed,那么就不要使用进程
client = Client(processes=False)
- 不要使用 Dask 的默认保姆进程,那么你的 dask worker 将是一个能够使用多进程的普通进程
- 将 dask 的
multiprocessing-context
配置设置为"spawn"
而不是 fork 或 forkserver
不过,解决此问题的简洁方法是在您的函数 job.run_simulation
中解决它。理想情况下,您可以将此超时逻辑下推到该代码并让它干净利落地引发。
关于python - 如何超时提交给 Dask 的作业?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49925130/