django - 在数据库中保存 celery 任务(用于重新运行)

标签 django celery celery-task

我们的工作流程目前是围绕旧版本的 celery 构建的,因此请记住,事情已经不是最佳的。我们需要运行一个任务并将该任务运行的记录保存在数据库中。如果该任务失败或挂起(这种情况经常发生),我们希望重新运行,就像第一次运行一样。但这不应该自动发生。它需要根据故障的性质手动触发,并且需要将结果记录在数据库中以做出决定(通过前端)。

我们如何在数据库中保存任务的完整记录,以便后续进程可以获取该记录并运行新的相同任务?当前的实现将 @task 修饰函数的路径保存在数据库中作为 TaskInfo 模型的一部分。当任务需要重新运行时,我们在 TaskInfo 模型上有一个 get_task() 方法,用于从数据库获取路径,并使用 getattr 导入它code>,以及另一个使用 *args、**kwargs(也保存在数据库中)再次运行任务的 rerun() 方法。

就像这样(这些是 TaskInfo 模型实例上的方法):

def get_task(self):
    """Returns the task's decorated function, which can be delayed."""
    module_name, object_name = self.path.rsplit('.', 1)
    module = import_module(module_name)
    task = getattr(module, object_name)
    if inspect.isclass(task):
        task = task()
    # task = current_app.tasks[self.path]
    return task

 def rerun(self):
    """Re-run the task, and replace this one.

    - A new task is scheduled to run.
    - The new task's TaskInfo has the same parent as this TaskInfo.
    - This TaskInfo is deleted.
    """
    args, kwargs = self.get_arguments()
    celery_task = self.get_task()
    celery_task.delay(*args, **kwargs)
    defaults = {
        'path': self.path,
        'status': Status.PENDING,
        'timestamp': timezone.now(),
        'args': args,
        'kwargs': kwargs,
        'parent': self.parent,
    }
    TaskInfo.objects.update_or_create(task_id=celery_task.id, defaults=defaults)
    self.delete()

必须有一个更清晰的解决方案来将任务保存在数据库中以便稍后重新运行,对吧?

最佳答案

最新版本的 Celery (4.4.0) 包含一个参数 extended_result。您可以将其设置为True,那么结果后端数据库中的表(默认名为celery_taskmeta)将存储args和kwargs任务的内容。

这是一个演示:

app = Celery('test_result_backend')

app.conf.update(
    broker_url='redis://localhost:6379/10',
    result_backend='db+mysql://root:passwd@localhost/celery_toys',
    result_extended=True
)


@app.task(bind=True, name='add')
def add(self, x, y): 
    self.request.task_name = 'add'  # For saving the task name.
    time.sleep(5)
    return x + y 

通过MySQL中记录的任务信息,您可以轻松地重新运行您的任务。

关于django - 在数据库中保存 celery 任务(用于重新运行),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57813881/

相关文章:

python - 在 django rest 框架中对 pandas DataFrame 重复操作

celery - 在 celery 中,当任务排队时,将上下文元数据从发送方进程传递给工作人员的适当方法是什么?

python - Celery - 如何在工作人员关闭后更新任务状态?

python - celery worker 没有填满并发槽

python - celery - "WorkerLostError: Worker exited prematurely: signal 11 (SIGSEGV)"

Django celery 页面给出 404

Django根据外键对字段求和

django - 多表继承模型和相同的两个模型之间的简单一对一关系有什么区别?

python - 如果上一个任务成功, celery 运行任务

Python/Django : How to Prepend a # on to all URLS