我正在编写一个小型 Django 应用程序,我应该能够创建 对于每个模型对象,其定期任务将被执行 一定的间隔。我使用 Celery 应用程序,但我无法理解一件事:
class ProcessQueryTask(PeriodicTask):
run_every = timedelta(minutes=1)
def run(self, query_task_pk, **kwargs):
logging.info('Process celery task for QueryTask %d' %
query_task_pk)
task = QueryTask.objects.get(pk=query_task_pk)
task.exec_task()
return True
然后我将执行以下操作:
>>> from tasks.tasks import ProcessQueryTask
>>> result1 = ProcessQueryTask.delay(query_task_pk=1)
>>> result2 = ProcessQueryTask.delay(query_task_pk=2)
第一次调用成功,但其他定期调用返回错误
- TypeError: run() 恰好需要 2 个非关键字参数(给定 1 个)
celery 服务器。
我可以将自己的参数传递给PeriodicTask run()
吗?
最佳答案
Ask Solem 在 his response to your question 中对此做出了精彩的回答。上celery-users Google group .
定期任务不使用参数,因此您需要创建多个 类或创建一个处理多个“模型”的周期性任务。
例如:
from celery.task import PeriodicTask
from celery.decorators import periodic_task
# base class
class BaseProcessQueryTask(PeriodicTask):
abstract = True
run_every = timedelta(minutes=1)
query_task_pk = None
def run(self):
task = QueryTask.objects.get(pk=self.query_task_pk)
task.exec_task()
class ProcessQueryTask1(BaseProcessQueryTask):
query_task_pk = 1
class ProcessQueryTask2(BaseProcessQueryTask):
query_task_pk = 2
但您更有可能想要这样的东西:
@task(ignore_result=True)
def execute_query_task(task):
task.exec_task()
@periodic_task(run_every=timedelta(minutes=1))
def process_query_tasks():
for task in QueryTask.objects.all():
ExecuteQueryTask.delay(task)
关于python - Celery 中的PeriodicTask run() 方法拥有自己的参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/2850017/