我正在尝试为我的 celery 作业设置最长运行时间。
我目前正在使用上下文管理器从异常中恢复。我最终得到了与这个片段非常相似的代码:
from celery.exceptions import SoftTimeLimitExceeded
class Manager:
def __enter__(self):
return self
def __exit__(self, error_type, error, tb):
if error_type == SoftTimeLimitExceeded:
logger.info('job killed.')
# swallow the exception
return True
@task
def do_foo():
with Manager():
run_task1()
run_task2()
run_task3()
我的预期:
如果 do_foo
在 run_task1
中超时,记录器记录,SoftTimeLimitExceeded 异常被吞噬,跳过管理器的主体,作业结束而不运行 run_task2
和 run_task3
。
我观察到的:
do_foo
在 run_task1
中超时,引发 SoftTimeLimitExceeded,记录器记录,SoftTimeLimitExceeded 异常被吞没但是 run_task2
和尽管如此,run_task3
仍在运行。
我正在寻找以下两个问题的答案:
为什么在此设置中
run_task1
中引发 SoftTimeLimitExceeded 时run_task2
仍然执行?是否有一种简单的方法来转换我的代码,使其能够按预期执行?
最佳答案
清理代码
这段代码很不错;没有太多清理工作要做。
- 你不应该返回
self
来自__enter__
如果上下文管理器未设计为与as
一起使用关键字。 -
is
应该在检查类时使用,因为它们是单例... - 但你应该更喜欢
issubclass
正确模拟异常处理。
实现这些改变会得到:
from celery.exceptions import SoftTimeLimitExceeded
class Manager:
def __enter__(self):
pass
def __exit__(self, error_type, error, tb):
if issubclass(error_type, SoftTimeLimitExceeded):
logger.info('job killed.')
# swallow the exception
return True
@task
def do_foo():
with Manager():
run_task1()
run_task2()
run_task3()
调试
我创建了一个用于调试的模拟环境:
class SoftTimeLimitExceeded(Exception):
pass
class Logger:
info = print
logger = Logger()
del Logger
def task(f):
return f
def run_task1():
print("running task 1")
raise SoftTimeLimitExceeded
def run_task2():
print("running task 2")
def run_task_3():
print("running task 3")
执行这个然后你的程序给出:
>>> do_foo()
running task 1
job killed.
这是预期的行为。
假设
我可以想到两种可能性:
- 链中的某物,可能是
run_task1
, 是异步的。 -
celery
正在做一些奇怪的事情。
我将采用第二个假设,因为我无法检验前者。
我以前曾被上下文管理器、异常和协同程序之间组合的模糊行为所困扰,所以我知道它会导致什么样的问题。这似乎是其中之一,但我必须看看 celery
在我继续之前的代码。
编辑:我无法生成 celery
的头部或尾部的代码,搜索没有找到引发 SoftTimeLimitExceeded
的代码让我向后追溯。我会将其传递给对 celery
更有经验的人看看他们是否能弄清楚它是如何工作的。
关于python - 使用上下文管理器从 celery 的 SoftTimeLimitExceeded 中恢复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49688481/