我在 Dask 中有一个简单(但很大)的任务图。这是一个代码示例
results = []
for params in SomeIterable:
a = dask.delayed(my_function)(**params)
b = dask.delayed(my_other_function)(a)
results.append(b)
dask.compute(**results)
在这里SomeIterable
是 dict
的列表,其中每个都是 my_function
的参数.在每次迭代中 b
取决于 a
,所以如果产生 a
的任务失败,b
无法计算。但是,results
的每个元素是独立的,所以我希望如果一个失败,另一个可以继续运行。这在实践中不会发生,如果 results
的元素失败,则脚本执行结束。
编辑:
这也在使用 submit
时发生(或 map
)客户端类的方法 dask.distributed.Client
, 例如
futures = [client.submit(my_other_function_2, **params) for params in MyOtherIterable]
results = [ft.result() for ft in futures]
在上面的代码中,如果当我尝试收集结果时一个任务失败,则所有代码都会失败,如 docs
最佳答案
解决这个问题的一个简单方法是将您的函数包装在 try/except
中,所以像这样:
def try_f(params):
try:
a = my_function(**params)
b = my_other_function(a)
except:
b = f"Failed for: {params}"
return b
results = [dask.delayed(try_f)(params) for params in SomeIterable]
computed = dask.compute(results)
但是,根据您的情况,您可能希望使用 client.submit
API,因为它会给您带来更多的灵 active ,例如指定一些条件重试。
关于python - Dask:如果失败则继续执行其他任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68119868/