django - 使用 gevent 执行池的 celery 任务的 SynchronousOnlyOperation

标签 django celery python-asyncio gevent

鉴于 celery 使用这些选项运行:

celery -A openwisp2 worker -l info --pool=gevent --concurrency=15 -Ofair
鉴于此 celery task from openwisp-monitoring :
@shared_task
def perform_check(uuid):
    """
    Retrieves check according to the passed UUID
    and calls ``check.perform_check()``
    """
    try:
        check = get_check_model().objects.get(pk=uuid)
    except ObjectDoesNotExist:
        logger.warning(f'The check with uuid {uuid} has been deleted')
        return
    result = check.perform_check()
    if settings.DEBUG:  # pragma: nocover
        print(json.dumps(result, indent=4, sort_keys=True))
大部分时间任务都可以工作,但有时(通常是突发),会生成以下异常:
SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
完整的堆栈跟踪:
SynchronousOnlyOperation: You cannot call this from an async context - use a thread or sync_to_async.
  File "celery/app/trace.py", line 412, in trace_task
    R = retval = fun(*args, **kwargs)
  File "celery/app/trace.py", line 704, in __protected_call__
    return self.run(*args, **kwargs)
  File "openwisp_monitoring/check/tasks.py", line 44, in perform_check
    check = get_check_model().objects.get(pk=uuid)
  File "django/db/models/manager.py", line 85, in manager_method
    return getattr(self.get_queryset(), name)(*args, **kwargs)
  File "django/db/models/query.py", line 425, in get
    num = len(clone)
  File "django/db/models/query.py", line 269, in __len__
    self._fetch_all()
  File "django/db/models/query.py", line 1308, in _fetch_all
    self._result_cache = list(self._iterable_class(self))
  File "django/db/models/query.py", line 53, in __iter__
    results = compiler.execute_sql(chunked_fetch=self.chunked_fetch, chunk_size=self.chunk_size)
  File "django/db/models/sql/compiler.py", line 1154, in execute_sql
    cursor = self.connection.cursor()
  File "django/utils/asyncio.py", line 24, in inner
    raise SynchronousOnlyOperation(message)
我不完全理解为什么会发生这种情况。
让我们回顾一下,如果我错了,请纠正我:
  • 使用此配置,celery 可以并行执行任务,此并行化由 gevent 使用 asyncio 事件循环执行
  • 然后 gevent 调用每个任务,使用相同的线程
  • 这些任务不是设计为异步的,它们使用简单的同步代码,这些任务执行数据库查询和网络请求
  • django 有一个 async_unsafe装饰数据库操作的装饰器,此装饰器检查事件循环是否正在运行,在这种情况下,它会引发 SynchronousOnlyOperation

  • 但是为什么这个异常(exception)不是在 100% 的情况下出现,而是在少数情况下出现?
    这些任务确实有效,我可以看到它,因为它们产生了正常显示的图表数据集合,或者它们产生了设备模型中的状态变化(例如:ok to critical)。
    它是 OpenWISP 监控中的错误、配置错误还是 Django 中的错误?
    看起来事件循环没有在 Django 中使用,但 Django 正在引发这个异常,即使它不关心它。这可能是一个错误,但希望在提交错误报告之前听取专家对该主题的意见。
    我认为可能的快速解决方案可能是设置 env 变量 DJANGO_ALLOW_ASYNC_UNSAFE但仅限于 celery 过程。
    提前致谢。

    最佳答案

    事实证明我的假设是错误的,我使用的代码在 gevent 中不是 greenlet 安全的。
    由于现在无法将代码重写为 greenlet 安全,因此我切换回 prefork。

    关于django - 使用 gevent 执行池的 celery 任务的 SynchronousOnlyOperation,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65326908/

    相关文章:

    python - python 3.8的异步编程如何使用 'yield'语句?

    python - Django 中的年份字段

    python - 在 1.4 中重写 Model __getattr__ 的适当方法

    python - 变量在周期性任务中不更新

    python-3.x - 在生产中收到 KeyError 类型的未注册任务

    python - 将异步任务发送到在其他线程中循环运行

    python-3.x - asyncio 以不同的间隔定期运行两个不同的函数

    javascript - 从外部js文件获取模板变量的最佳方式

    python - 带空格的django查询参数

    python - celery 任务在 Django 框架中不起作用