python - Dask:如果失败则继续执行其他任务

标签 python dask dask-distributed dask-delayed

我在 Dask 中有一个简单(但很大)的任务图。这是一个代码示例

results = []

for params in SomeIterable:
    a = dask.delayed(my_function)(**params)
    b = dask.delayed(my_other_function)(a)
    results.append(b)

dask.compute(**results)


在这里SomeIterabledict 的列表,其中每个都是 my_function 的参数.在每次迭代中 b取决于 a ,所以如果产生 a 的任务失败,b无法计算。但是,results 的每个元素是独立的,所以我希望如果一个失败,另一个可以继续运行。这在实践中不会发生,如果 results 的元素失败,则脚本执行结束。

编辑:

在使用 submit 时发生(或 map )客户端类的方法 dask.distributed.Client , 例如

futures = [client.submit(my_other_function_2, **params) for params in MyOtherIterable]
results = [ft.result() for ft in futures]

在上面的代码中,如果当我尝试收集结果时一个任务失败,则所有代码都会失败,如 docs

最佳答案

解决这个问题的一个简单方法是将您的函数包装在 try/except 中,所以像这样:

def try_f(params):
    try:
        a = my_function(**params)
        b = my_other_function(a)
    except:
        b = f"Failed for: {params}"
    return b

results = [dask.delayed(try_f)(params) for params in SomeIterable]
computed = dask.compute(results)

但是,根据您的情况,您可能希望使用 client.submit API,因为它会给您带来更多的灵 active ,例如指定一些条件重试。

关于python - Dask:如果失败则继续执行其他任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68119868/

相关文章:

python - 如何从函数中更改全局变量?

python - 在 Dask 中序列化大于 2GB 的数据时出错

python - 可以在 SageMaker (Labs 1.2.*) 上使用 dask 仪表板吗?

python - 使用 dask 提交任务时出现 Pickle 错误

dask - 分布式 Dask CPP 工作人员

ssh - 有没有办法通过 ssh 使用 dask jobqueue

python - PyQt - 如何打开/关闭拼写检查

python - Apache Airflow 如何将 xcom_pull() 值转换为 DAG?

python - Dask 数据帧 : Can `set_index` put a single index into multiple partitions?

python - Opencv - 灰度模式与灰度颜色转换