python - 如何在回调中获取 future 的结果?

标签 python distributed dask

add_done_callback 方法最近被添加到分布式 Future 对象中,它允许您在 future 完成后采取一些操作,无论它是否成功。

http://distributed.readthedocs.io/en/latest/api.html?highlight=add_done_callback#distributed.client.Future.add_done_callback

如果您尝试直接调用传递的 future 对象上的任何方法 resultexceptiontraceback,回调函数将挂起。

但是可以在回调中访问异常和回溯,如下所示: fut._exception().result() fut._traceback().result()

对结果尝试相同的模式 - 即 fut._result().result() 引发异常:

  File "C:\Python\lib\site-packages\tornado\concurrent.py", line 316, in _check_done
    raise Exception("DummyFuture does not support blocking for results")
Exception: DummyFuture does not support blocking for results

如果无法在回调中访问 future 的结果,添加回调对我来说作用有限。

我错过了什么 - 有办法在回调中获取 future 的结果吗?

在 asyncio 文档中,它似乎给出了一个可以直接访问 result 方法的示例:

https://docs.python.org/3/library/asyncio-task.html#example-future-with-run-forever

...我不确定这与 Tornado /分布式有什么关系,但能够做同样的事情非常有用。

from distributed import Client


client = Client("127.0.0.1:8786")

def f(delay):
    from time import sleep
    from numpy.random import randn
    sleep(delay)
    if randn() > 1:
        1/0
    return delay

def callback(fut):
    import logging
    logger = logging.getLogger('distributed')
    if fut.status == 'finished':
        res = future._result().result()  # <-------------- Doesn't work!
        logger.info("{!r} - {!s}".format(fut, res))
    else:
        logger.info("{!r} - {!s}".format(fut, fut.status))


args = rand(10)
futs = client.map(f, args)
for fut in futs:
    fut.add_done_callback(callback)

最佳答案

目前,您的回调在 Tornado 事件循环中被调用。如果您想获得 future 的结果,您必须使用 Tornado API。

这是一个最小的例子:

In [1]: from distributed import Client
In [2]: client = Client()
In [3]: def inc(x):
   ...:     return x + 1
   ...: 
In [4]: from tornado import gen

In [5]: @gen.coroutine
   ...: def callback(future):
   ...:     result = yield future._result()
   ...:     print(result * 10)
   ...:     
In [6]: future = client.submit(inc, 1)

In [7]: future.add_done_callback(callback)

20

但是,您的问题强调,这也许不是用户与 add_done_callback 交互的最直观方式,因此如果我们为后续版本引入重大更改,我不会感到惊讶。

In [8]: import distributed

In [8]: distributed.__version__
Out[8]: '1.14.0'

关于python - 如何在回调中获取 future 的结果?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40477518/

相关文章:

python - 有 SimpleHTTPServer 的 Tornado 等效项吗?

python - 如何使用 for 循环和通过使用追加从预先存在的列表中删除项目?

asp.net - 我什么时候应该在 ASP.NET 网站中使用服务总线?

ssh - 使用 MPI 后端时,分布式 PyTorch 代码在多个节点上停止

pandas - dask read_sql_table 在具有数字日期时间的 sqlite 表上失败

Python 查询(字符串)

python - 多处理范围 : list not updating using 'multiprocessing.Process' , 使用 'threading.Thread' 工作

sockets - 乔卡姆 : problems with remote connection

dask - 将 dask.compute 与延迟项目数组一起使用

python - 解包元组列表的 dask 延迟对象