python - Django Celery AbortableTask 使用

标签 python django celery celery-task

我正在尝试使用 AbortableTask Celery 的功能,但文档示例似乎不适合我。给出的例子是:

from celery.contrib.abortable import AbortableTask

def MyLongRunningTask(AbortableTask):

    def run(self, **kwargs):
        logger = self.get_logger(**kwargs)
        results = []
        for x in xrange(100):
            # Check after every 5 loops..
            if x % 5 == 0:  # alternatively, check when some timer is due
                if self.is_aborted(**kwargs):
                    # Respect the aborted status and terminate
                    # gracefully
                    logger.warning("Task aborted.")
                    return None
            y = do_something_expensive(x)
            results.append(y)
        logger.info("Task finished.")
        return results

from myproject.tasks import MyLongRunningTask

def myview(request):

    async_result = MyLongRunningTask.delay()
    # async_result is of type AbortableAsyncResult

    # After 10 seconds, abort the task
    time.sleep(10)
    async_result.abort()

    ...

但是,我收到错误:

TypeError: MyLongRunningTask() takes exactly 1 argument (0 given)

我做错了什么?

最佳答案

只是一个猜测,但我认为应该是

class MyLongRunningTask(AbortableTask)

而不是

def MyLongRunningTask(AbortableTask)

关于python - Django Celery AbortableTask 使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/3659561/

相关文章:

python - 如何用单个斜杠替换目录中的多个正斜杠?

python - 通过 clean 方法传递时,django modelform 字段上出现错误的验证错误

django - 如何在 django 中删除 celerybeat 任务

python - 如何在 Celery Flower Monitor 选项卡中查看所有图表

python - 从 Python 中的字典值绘制直方图

python - (Python/TFTP-Server)如何监听(尚未)现有的IP地址(RNDIS)?

python - 类型错误 : worker() takes 0 positional arguments but 1 was given

python - Django 应用程序在迁移到 heroku MySQL ClearDB 时抛出 MigrationSchemaMissing

django - ValueError : not enough values to unpack (expected 2, 得到 1)

python - 具有长时间运行计算的 Django 应用程序