Python asyncio 任务的 yield 很差

标签 python asynchronous python-asyncio

我对如何在 Python 3.4 中使用 asyncio 模块感到困惑。我有一个用于搜索引擎的 searching API,并希望每个搜索请求并行或异步运行,这样我就不必等待一个搜索完成再开始另一个搜索。

这是我的高级搜索 API,用于使用原始搜索结果构建一些对象。搜索引擎本身正在使用某种异步机制,因此我不会为此烦恼。

# No asyncio module used here now
class search(object):
  ...
  self.s = some_search_engine()
  ...
  def searching(self, *args, **kwargs):
    ret = {}
    # do some raw searching according to args and kwargs and build the wrapped results
    ...
    return ret

为了尝试异步请求,我编写了以下测试用例来测试我如何与 asyncio 模块进行交互。

# Here is my testing script
@asyncio.coroutine
def handle(f, *args, **kwargs):
  r = yield from f(*args, **kwargs)
  return r

s = search()
loop = asyncio.get_event_loop()
loop.run_until_complete(handle(s.searching, arg1, arg2, ...))
loop.close()

通过使用 pytest 运行,它将返回 RuntimeError: Task got bad yield : {results from searching...},当它到达行 r = yield from ...

我也试过另一种方式。

# same handle as above
def handle(..):
  ....
s = search()
loop = asyncio.get_event_loop()
tasks = [
        asyncio.async(handle(s.searching, arg11, arg12, ...)),
        asyncio.async(handle(s.searching, arg21, arg22, ...)),
        ...
        ]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

通过 pytest 运行这个测试用例,它通过了,但是搜索引擎会引发一些奇怪的异常。它说 Future/Task exception was never retrieved

我想问的事情:

  1. 对于我的第一次尝试,通过返回函数调用的实际结果来使用 yield from 的方法是否正确?
  2. 我想我需要为我的第二个测试用例添加一些 sleep 以等待任务完成,但我应该怎么做?我怎样才能让我的函数调用在我的第二个测试用例中返回?
  3. 通过创建异步处理程序来处理请求,这是使用现有模块实现异步的好方法吗?
  4. 如果问题 2 的答案是否定的,是否每个客户端调用 search 类都需要包含 loop = get_event_loop() 这种东西来异步请求?

最佳答案

问题是您不能像调用 asyncio.coroutine 一样调用现有的同步代码并获得异步行为。当您调用 yield from searching(...) 时,如果 searching 本身实际上是一个 asyncio.coroutine,您只会获得异步行为>,或者至少返回一个 asyncio.Future。现在,searching 只是一个常规的同步函数,所以调用 yield from searching(...) 只会抛出错误,因为它不会返回Future 或协程。

要获得您想要的行为,除了 synchronous 版本之外,您还需要 searching 的异步版本(或者干脆放弃同步版本,如果你不需要它)。您有几个选项可以同时支持这两者:

  1. searching 重写为 asyncio.coroutine,它使用 asyncio 兼容的调用来执行其 I/O,而不是阻塞 I/欧。这将使它在 asyncio 上下文中工作,但这意味着您将无法再在同步上下文中直接调用它。相反,您还需要提供另一种同步 searching 方法,该方法启动 asyncio 事件循环并调用 return loop.run_until_complete(self.searching(.. .))。参见 this question了解更多详情。
  2. 保持searching 的同步实现,并提供一个使用BaseEventLoop.run_in_executor 的替代异步API。在后台线程中运行 searching 方法:

    class search(object):
      ...
      self.s = some_search_engine()
      ...
      def searching(self, *args, **kwargs):
        ret = {}
        ...
        return ret
    
       @asyncio.coroutine
       def searching_async(self, *args, **kwargs):
          loop = kwargs.get('loop', asyncio.get_event_loop())
          try:
              del kwargs['loop']  # assuming searching doesn't take loop as an arg
          except KeyError:
              pass
          r = yield from loop.run_in_executor(None, self.searching, *args)  # Passing None tells asyncio to use the default ThreadPoolExecutor
          return r
    

    测试脚本:

    s = search()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(s.searching_async(arg1, arg2, ...))
    loop.close()
    

    这样,您可以保持同步代码不变,并且至少提供可以在 asyncio 代码中使用的方法,而不会阻塞事件循环。如果您在代码中实际使用异步 I/O,这不是一个干净的解决方案,但总比没有好。

  3. 提供两个完全独立的searching 版本,一个使用阻塞式 I/O,另一个兼容 asyncio。这为两种上下文提供了理想的实现,但需要两倍的工作。

关于Python asyncio 任务的 yield 很差,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30172821/

相关文章:

c++ - Boost::Python 中的智能指针转换

python - 将 PNG 格式的 networkx 图形绘制到 stdout/http 响应

javascript - 如何多次顺序执行多个异步函数?

python - 在 Python 中异步编写 CSV 文件

OS X 与 Ubuntu 上的 Python asyncio 性能

python - Matplotlib "ValueError: x and y must be the same size"

python - 文档脚本不是狮身人面像中模块的一部分

linux - 脚本从上次运行中获取参数

java - 理解 Java Completablefuture 的行为

python - 跨多进程共享基于异步等待协程的复杂对象