我的 Celery 任务引发自定义异常 NonTransientProcessingError
,然后被 AsyncResult.get()
捕获.任务.py:
class NonTransientProcessingError(Exception):
pass
@shared_task()
def throw_exception():
raise NonTransientProcessingError('Error raised by POC model for test purposes')
在 Python 控制台中:
from my_app.tasks import *
r = throw_exception.apply_async()
try:
r.get()
except NonTransientProcessingError as e:
print('caught NonTrans in type specific except clause')
但我的自定义异常是 my_app.tasks.
NonTransientProcessingError
,而 AsyncResult.get()
引发的异常是 celery.backends.base.
NonTransientProcessingError
,所以我的except
子句失败。
Traceback (most recent call last):
File "<input>", line 4, in <module>
File "/...venv/lib/python3.5/site-packages/celery/result.py", line 175, in get
raise meta['result']
celery.backends.base.NonTransientProcessingError: Error raised by POC model for test purposes
如果我在任务中捕获到异常,它就可以正常工作。只有当异常被引发到 .get()
调用它已重命名。
如何引发自定义异常并正确捕获它?
我已经确认在定义 Task
时会发生同样的情况类并在其 on_failure
中引发自定义异常方法。以下确实有效:
try:
r.get()
except Exception as e:
if type(e).__name__ == 'NonTransientProcessingError':
print('specific exception identified')
else:
print('caught generic but not identified')
输出:
specific exception identified
但这不是最好的方法吗?理想情况下,我想捕获行为类别的异常父类(super class)。
我正在使用 Django 1.8.6、Python 3.5 和 Celery 3.1.18,以及 Redis 3.1.18、Python redis lib 2.10.3 后端。
最佳答案
import celery
from celery import shared_task
class NonTransientProcessingError(Exception):
pass
class CeleryTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
if isinstance(exc, NonTransientProcessingError):
"""
deal with NonTransientProcessingError
"""
pass
def run(self, *args, **kwargs):
pass
@shared_task(base=CeleryTask)
def add(x, y):
raise NonTransientProcessingError
使用带有 on_failure 回调的基本任务来捕获自定义异常。
关于python - 如何从 Celery worker 捕获自定义异常,或阻止它以 `celery.backends.base` 为前缀?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35114144/