python - 有没有办法访问传递给 asyncio.as_completed 的原始任务?

标签 python python-asyncio

我正在尝试从异步队列中提取任务,并在发生异常时调用给定的错误处理程序。排队项目以字典形式给出(由 enqueue_task 排队),其中包含任务、可能的错误处理程序以及错误处理程序可能需要的任何 args/kwargs。由于我想在任务完成时处理任何错误,因此我将每个任务映射到出队字典,并在发生异常时尝试访问它。

async def _check_tasks(self):
    try:
        while self._check_tasks_task or not self._check_task_queue.empty():
            tasks = []
            details = {}
            try:
                while len(tasks) < self._CHECK_TASKS_MAX_COUNT:
                    detail = self._check_task_queue.get_nowait()
                    task = detail['task']
                    tasks.append(task)
                    details[task] = detail
            except asyncio.QueueEmpty:
                pass

            if tasks:
                for task in asyncio.as_completed(tasks):
                    try:
                        await task
                    except Exception as e:
                        logger.exception('')
                        detail = details[task]
                        error_handler = detail.get('error_handler')
                        error_handler_args = detail.get('error_handler_args', [])
                        error_handler_kwargs = detail.get('error_handler_kwargs', {})

                        if error_handler:
                            logger.info('calling error handler')
                            if inspect.iscoroutinefunction(error_handler):
                                self.enqueue_task(
                                    task=error_handler(
                                        e,
                                        *error_handler_args,
                                        **error_handler_kwargs
                                    )
                                )
                            else:
                                error_handler(e, *error_handler_args, **error_handler_kwargs)
                        else:
                            logger.exception(f'Exception encountered while handling task: {str(e)}')
            else:
                await asyncio.sleep(self._QUEUE_EMPTY_SLEEP_TIME)
    except:
        logger.exception('')


def enqueue_task(self, task, error_handler=None, error_handler_args=[],
                 error_handler_kwargs={}):
    if not asyncio.isfuture(task):
        task = asyncio.ensure_future(task)

    self._app.gateway._check_task_queue.put_nowait({
        'task': task,
        'error_handler': error_handler,
        'error_handler_args': error_handler_args,
        'error_handler_kwargs': error_handler_kwargs,
    })

但是,当发生异常时,似乎在 details 字典中找不到用作键的任务,并且我收到以下错误:

KeyError: <generator object as_completed.<locals>._wait_for_one at 0x7fc2d1cea308>
Exception encountered while handling task: <generator object as_completed.<locals>._wait_for_one at 0x7fc2d1cea308>
Traceback (most recent call last):
  File "/app/app/gateway/gateway.py", line 64, in _check_tasks
    detail = details[task]
KeyError: <generator object as_completed.<locals>._wait_for_one at 0x7fc2d1cea308>

taskasyncio.as_completed产生时,它似乎是一个生成器

<generator object as_completed.<locals>._wait_for_one at 0x7fc2d1cea308>

当我期望这是一项任务时

<Task pending coro=<GatewayL1Component._save_tick_to_stream() running at /app/app/gateway/l1.py:320> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fc2d4380d98>()]>>

为什么taskasyncio.as_complete产生后是一个生成器而不是原始任务?有没有办法访问原始任务?

最佳答案

Why is task a generator instead of the original task after being yielded by asyncio.as_complete?

问题是 as_completed 不是 async iterator (你可以用async for来耗尽它),但是一个普通的迭代器。异步迭代器的 __aiter__ 可以在等待异步事件时挂起,而普通迭代器的 __iter__ 必须立即提供结果。它显然无法生成已完成的任务,因为没有任务还没有时间完成,因此它生成了一个实际上等待任务完成的可等待对象。这是看起来像发电机的对象。

作为实现的另一个结果,等待此任务会为您提供原始任务的结果,而不是对任务对象的引用 - 与原始 concurrent.futures.as_completed 不同。这使得asyncio.as_completed不太直观并且更难使用,并且there is a bug report认为 as_completed 也应该可以用作异步迭代器,提供正确的语义。 (这可以通过向后兼容的方式来完成。)

Is there a way to access the original task?

作为解决方法,您可以通过将原始任务包装到协程中来创建 as_completed 的异步版本,该协程在任务完成时完成,并将任务作为其结果:

async def as_completed_async(futures):
    loop = asyncio.get_event_loop()
    wrappers = []
    for fut in futures:
        assert isinstance(fut, asyncio.Future)  # we need Future or Task
        # Wrap the future in one that completes when the original does,
        # and whose result is the original future object.
        wrapper = loop.create_future()
        fut.add_done_callback(wrapper.set_result)
        wrappers.append(wrapper)

    for next_completed in asyncio.as_completed(wrappers):
        # awaiting next_completed will dereference the wrapper and get
        # the original future (which we know has completed), so we can
        # just yield that
        yield await next_completed

这应该允许您获得原始任务 - 这是一个简单的测试用例:

async def main():
    loop = asyncio.get_event_loop()
    fut1 = loop.create_task(asyncio.sleep(.2))
    fut1.t = .2
    fut2 = loop.create_task(asyncio.sleep(.3))
    fut2.t = .3
    fut3 = loop.create_task(asyncio.sleep(.1))
    fut3.t = .1
    async for fut in as_completed_async([fut1, fut2, fut3]):
        # using the `.t` attribute shows that we've got the original tasks
        print('completed', fut.t)

asyncio.get_event_loop().run_until_complete(main())

关于python - 有没有办法访问传递给 asyncio.as_completed 的原始任务?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55521285/

相关文章:

python - 执行器何时(以及如何)将控制权交还给事件循环?

python - django 中的动态表单与 ajax/dajax

python - 排队任务的数量;如何避免全局变量?

python3 asyncio start_unix_server 权限

python - asyncio 和 trio 之间的核心区别是什么?

python - 使用 sklearn 的因子加载

python - 为什么 QWebEngineUrlRequestInterceptor 在 app.quit() 之后仍然存在

python - 双向重复测量方差分析python函数

python - PySNMP 查询接口(interface)的选择列表

python - CSV 中的 URL 未传递给函数