python - 使用 autoretry_for 正确处理 Celery 异常

标签 python rabbitmq celery message-queue

我有一个 celery 任务,其装饰有 autoretry_for这样,如果发生已知异常,它将重试该任务。这里有一个虚拟版本:

class ExpectedException(Exception):
    pass


@app.task(autoretry_for=(ExpectedException,), retry_kwargs={'max_retries': 2, 'countdown': 1})
def decorated_autoretry():
    logging.info(
        "Attempt: {attempt} of {attempts}".format(
            attempt=decorated_autoretry.request.retries, attempts=decorated_autoretry.max_retries
        )
    )
    raise ExpectedException

运行时给出以下输出:

[2018-01-30 12:17:31,899: INFO/MainProcess] Received task: interpretationAPI.tasks.util_tasks.decorated_autoretry[9e00f56d-fb90-46db-a735-678fd0b4cb5a]  
[2018-01-30 12:17:31,900: INFO/ForkPoolWorker-1] Attempt: 1 of 3
[2018-01-30 12:17:31,915: INFO/ForkPoolWorker-1] Task interpretationAPI.tasks.util_tasks.decorated_autoretry[9e00f56d-fb90-46db-a735-678fd0b4cb5a] retry: Retry in 1s: ExpectedException()
[2018-01-30 12:17:31,915: INFO/MainProcess] Received task: interpretationAPI.tasks.util_tasks.decorated_autoretry[9e00f56d-fb90-46db-a735-678fd0b4cb5a]  ETA:[2018-01-30 12:17:32.901955+00:00] 
[2018-01-30 12:17:33,024: INFO/ForkPoolWorker-2] Attempt: 2 of 3
[2018-01-30 12:17:33,072: INFO/MainProcess] Received task: interpretationAPI.tasks.util_tasks.decorated_autoretry[9e00f56d-fb90-46db-a735-678fd0b4cb5a]  ETA:[2018-01-30 12:17:34.029462+00:00] 
[2018-01-30 12:17:33,072: INFO/ForkPoolWorker-2] Task interpretationAPI.tasks.util_tasks.decorated_autoretry[9e00f56d-fb90-46db-a735-678fd0b4cb5a] retry: Retry in 1s: ExpectedException()
[2018-01-30 12:17:34,033: INFO/ForkPoolWorker-1] Attempt: 3 of 3
[2018-01-30 12:17:34,037: ERROR/ForkPoolWorker-1] Task interpretationAPI.tasks.util_tasks.decorated_autoretry[9e00f56d-fb90-46db-a735-678fd0b4cb5a] raised unexpected: ExpectedException()
Traceback (most recent call last):
  File "/home/greg/gel/interpretation/.env/local/lib/python2.7/site-packages/celery/app/trace.py", line 374, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/greg/gel/interpretation/.env/local/lib/python2.7/site-packages/celery/app/trace.py", line 629, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/greg/gel/interpretation/.env/local/lib/python2.7/site-packages/celery/app/base.py", line 474, in run
    raise task.retry(exc=exc, **retry_kwargs)
  File "/home/greg/gel/interpretation/.env/local/lib/python2.7/site-packages/celery/app/task.py", line 669, in retry
    raise_with_context(exc)
  File "/home/greg/gel/interpretation/.env/local/lib/python2.7/site-packages/celery/app/base.py", line 472, in run
    return task._orig_run(*args, **kwargs)
  File "/home/greg/gel/interpretation/interpretationAPI/tasks/util_tasks.py", line 25, in decorated_autoretry
    raise ExpectedException
ExpectedException

在最后一次尝试中 ExpectedException被提出,但 Celery 认为这是意外的。

我可以在这里显式处理异常:

@app.task
def explicit_autoretry():
    logging.info(
        "Attempt: {attempt} of {attempts}".format(
            attempt=explicit_autoretry.request.retries+1, attempts=explicit_autoretry.max_retries
        )
    )
    try:
        raise ExpectedException
    except ExpectedException as e:
        logging.info(msg="Received exception of type: {e_type}".format(e_type=type(e)))
        try:
            explicit_autoretry.retry(countdown=1)
        except MaxRetriesExceededError as e:
            logging.info(msg="Received exception of type: {e_type}".format(e_type=type(e)))

运行时会给出以下输出:

[2018-01-30 12:19:45,284: INFO/MainProcess] Received task: interpretationAPI.tasks.util_tasks.explicit_autoretry[b1f29107-ac73-401d-b0c3-636e91661ee9]  
[2018-01-30 12:19:45,287: INFO/ForkPoolWorker-3] Attempt: 1 of 3
[2018-01-30 12:19:45,288: INFO/ForkPoolWorker-3] Received exception of type: <class 'interpretationAPI.tasks.util_tasks.ExpectedException'>
[2018-01-30 12:19:45,301: INFO/ForkPoolWorker-3] Task interpretationAPI.tasks.util_tasks.explicit_autoretry[b1f29107-ac73-401d-b0c3-636e91661ee9] retry: Retry in 1s
[2018-01-30 12:19:45,301: INFO/MainProcess] Received task: interpretationAPI.tasks.util_tasks.explicit_autoretry[b1f29107-ac73-401d-b0c3-636e91661ee9]  ETA:[2018-01-30 12:19:46.288992+00:00] 
[2018-01-30 12:19:47,790: INFO/ForkPoolWorker-1] Attempt: 2 of 3
[2018-01-30 12:19:47,792: INFO/ForkPoolWorker-1] Received exception of type: <class 'interpretationAPI.tasks.util_tasks.ExpectedException'>
[2018-01-30 12:19:47,839: INFO/ForkPoolWorker-1] Task interpretationAPI.tasks.util_tasks.explicit_autoretry[b1f29107-ac73-401d-b0c3-636e91661ee9] retry: Retry in 1s
[2018-01-30 12:19:47,839: INFO/MainProcess] Received task: interpretationAPI.tasks.util_tasks.explicit_autoretry[b1f29107-ac73-401d-b0c3-636e91661ee9]  ETA:[2018-01-30 12:19:48.796492+00:00] 
[2018-01-30 12:19:49,789: INFO/ForkPoolWorker-3] Attempt: 3 of 3
[2018-01-30 12:19:49,789: INFO/ForkPoolWorker-3] Received exception of type: <class 'interpretationAPI.tasks.util_tasks.ExpectedException'>
[2018-01-30 12:19:49,791: INFO/ForkPoolWorker-3] Task interpretationAPI.tasks.util_tasks.explicit_autoretry[b1f29107-ac73-401d-b0c3-636e91661ee9] retry: Retry in 1s
[2018-01-30 12:19:49,791: INFO/MainProcess] Received task: interpretationAPI.tasks.util_tasks.explicit_autoretry[b1f29107-ac73-401d-b0c3-636e91661ee9]  ETA:[2018-01-30 12:19:50.790244+00:00] 
[2018-01-30 12:19:51,791: INFO/ForkPoolWorker-1] Attempt: 4 of 3
[2018-01-30 12:19:51,791: INFO/ForkPoolWorker-1] Received exception of type: <class 'interpretationAPI.tasks.util_tasks.ExpectedException'>
[2018-01-30 12:19:51,791: INFO/ForkPoolWorker-1] Received exception of type: <class 'celery.exceptions.MaxRetriesExceededError'>
[2018-01-30 12:19:51,791: INFO/ForkPoolWorker-1] Task interpretationAPI.tasks.util_tasks.explicit_autoretry[b1f29107-ac73-401d-b0c3-636e91661ee9] succeeded in 0.00055760199939s: None

正确处理 ExpectedException 的每个实例最后MaxRetriesExceededError ,允许优雅的错误处理。

使用 autoretry_for 时是否有处理此类错误的方法装饰器,最大限度地减少显式错误处理的数量?我也尝试过使用 on_failure处理程序但无济于事。

最佳答案

throws=(ExpectedException,) 添加到装饰器中。 修改上面的例子:

@app.task(throws=(ExpectedException,), autoretry_for=(ExpectedException,), retry_kwargs={'max_retries': 2, 'countdown': 1})
def decorated_autoretry():
    logging.info(
        "Attempt: {attempt} of {attempts}".format(
            attempt=decorated_autoretry.request.retries, attempts=decorated_autoretry.max_retries
        )
    )
    raise ExpectedException

将运行给予:

[2018-01-30 12:43:48,684: INFO/MainProcess] Received task: interpretationAPI.tasks.util_tasks.decorated_autoretry[34de3cb0-9fec-4bc3-a900-c0eb5d8eb5b0]  
[2018-01-30 12:43:48,685: INFO/ForkPoolWorker-1] Attempt: 1 of 3
[2018-01-30 12:43:48,687: INFO/ForkPoolWorker-1] Task interpretationAPI.tasks.util_tasks.decorated_autoretry[34de3cb0-9fec-4bc3-a900-c0eb5d8eb5b0] retry: Retry in 1s: ExpectedException()
[2018-01-30 12:43:48,688: INFO/MainProcess] Received task: interpretationAPI.tasks.util_tasks.decorated_autoretry[34de3cb0-9fec-4bc3-a900-c0eb5d8eb5b0]  ETA:[2018-01-30 12:43:49.685804+00:00] 
[2018-01-30 12:43:51,641: INFO/ForkPoolWorker-2] Attempt: 2 of 3
[2018-01-30 12:43:51,643: INFO/ForkPoolWorker-2] Task interpretationAPI.tasks.util_tasks.decorated_autoretry[34de3cb0-9fec-4bc3-a900-c0eb5d8eb5b0] retry: Retry in 1s: ExpectedException()
[2018-01-30 12:43:51,643: INFO/MainProcess] Received task: interpretationAPI.tasks.util_tasks.decorated_autoretry[34de3cb0-9fec-4bc3-a900-c0eb5d8eb5b0]  ETA:[2018-01-30 12:43:52.642046+00:00] 
[2018-01-30 12:43:53,459: INFO/ForkPoolWorker-1] Attempt: 3 of 3
[2018-01-30 12:43:53,461: INFO/ForkPoolWorker-1] Task interpretationAPI.tasks.util_tasks.decorated_autoretry[34de3cb0-9fec-4bc3-a900-c0eb5d8eb5b0] raised expected: ExpectedException()

并且将正常退出,正如我们从消息的 raised Expected: ExpectedException() 部分看到的那样。

关于python - 使用 autoretry_for 正确处理 Celery 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48521576/

相关文章:

python - 在 Windows 10 上的 R Studio 中安装和运行 Tensorflow 的噩梦

python - Django-Piston - 我无法在带有外键的模型上发布

c# - 在Docker容器中运行RabbitMQ

ssl - 如果定义了 k8s API tls 密码,则 k8s 上的 Rabbitmq 对等发现失败

python - 卡在 Celery 队列中的任务

python - 迭代numpy矩阵并获取score gt的所有值

rabbitmq - 当rabbitmq从队列中删除消息?

python - 如果参数和任务名称已经在服务器中排队,是否可以跳过委派 celery 任务?

python - 在用 celery beat 安排任务时防止口是心非

python - Python Pandas 中的日期时间 strptime : what's wrong?