django - Celery Task 下面这两个任务的区别

标签 django django-celery

下面这两个任务有什么区别?

第一个给出错误,第二个运行得很好。两者都是相同的,它们接受额外的参数,并且都以相同的方式调用。

ProcessRequests.delay(batch) **error object.__new__() takes no parameters**


SendMessage.delay(message.pk, self.pk) **works!!!!**   

现在,我已经知道错误意味着什么,但我的困惑是为什么一个有效而不是另一个。

任务...

1)
class ProcessRequests(Task):
    name = "Request to Process"
    max_retries = 1
    default_retry_delay = 3

    def run(self, batch):
       #do something

2)
class SendMessage(Task):
    name = "Sending SMS"
    max_retries = 10
    default_retry_delay = 3

    def run(self, message_id, gateway_id=None, **kwargs):
        #do something

完整的任务代码....
from celery.task import Task
from celery.decorators import task

import logging

from sms.models import Message, Gateway, Batch
from contacts.models import Contact
from accounts.models import Transaction, Account


class SendMessage(Task):
    name = "Sending SMS"
    max_retries = 10
    default_retry_delay = 3

    def run(self, message_id, gateway_id=None, **kwargs):
        logging.debug("About to send a message.")

        # Because we don't always have control over transactions
        # in our calling code, we will retry up to 10 times, every 3
        # seconds, in order to try to allow for the commit to the database
        # to finish. That gives the server 30 seconds to write all of
        # the data to the database, and finish the view.
        try:
            message = Message.objects.get(pk=message_id)
        except Exception as exc:
            raise SendMessage.retry(exc=exc)


        if not gateway_id:
            if hasattr(message.billee, 'sms_gateway'):
                gateway = message.billee.sms_gateway
            else:
                gateway = Gateway.objects.all()[0]
        else:
            gateway = Gateway.objects.get(pk=gateway_id)

        # Check we have a credits to sent me message
        account = Account.objects.get(user=message.sender)
        # I'm getting the non-cathed version here, check performance!!!!!
        if account._balance() >= message.length:
            response = gateway._send(message)

            if response.status == 'Sent':
                # Take credit from users account.
                transaction = Transaction(
                    account=account,
                    amount=- message.charge,
                    description="Debit: SMS Sent",

                    )
                transaction.save()
                message.billed = True
                message.save()
        else:
            pass


        logging.debug("Done sending message.")


class ProcessRequests(Task):
    name = "Request to Process"
    max_retries = 1
    default_retry_delay = 3

    def run(self, message_batch):
        for e in Contact.objects.filter(contact_owner=message_batch.user, group=message_batch.group):
            msg = Message.objects.create(
                recipient_number=e.mobile,
                content=message_batch.content,
                sender=e.contact_owner,
                billee=message_batch.user,
                sender_name=message_batch.sender_name
            )
            gateway = Gateway.objects.get(pk=2)
            msg.send(gateway)
            #replace('[FIRSTNAME]', e.first_name)

试过:
 ProcessRequests.delay(batch) should work gives error error object.__new__() takes no parameters     
 ProcessRequests().delay(batch) also gives error error object.__new__() takes no parameters

最佳答案

我能够重现您的问题:

import celery
from celery.task import Task

@celery.task
class Foo(celery.Task):
    name = "foo"
    def run(self, batch):
       print 'Foo'

class Bar(celery.Task):
    name = "bar"
    def run(self, batch):
       print 'Bar'

# subclass deprecated base Task class
class Bar2(Task):
    name = "bar2"
    def run(self, batch):
       print 'Bar2'

@celery.task(name='def-foo')
def foo(batch):
    print 'foo'

输出:
In [2]: foo.delay('x')
[WARNING/PoolWorker-4] foo

In [3]: Foo().delay('x')
[WARNING/PoolWorker-2] Foo

In [4]: Bar().delay('x')
[WARNING/PoolWorker-3] Bar

In [5]: Foo.delay('x')
TypeError: object.__new__() takes no parameters

In [6]: Bar.delay('x')
TypeError: unbound method delay() must be called with Bar instance as first argument (got str instance instead)

In [7]: Bar2.delay('x')
[WARNING/PoolWorker-1] Bar2

我看到您使用已弃用的 celery.task.Task基类,这就是为什么你没有得到 unbound method错误:
Definition: Task(self, *args, **kwargs)
Docstring:
Deprecated Task base class.

Modern applications should use :class:`celery.Task` instead.

不知道为什么ProcessRequests虽然不起作用。也许是一些缓存问题,您之前可能尝试将装饰器应用于您的类,但它被缓存了,这正是您尝试将此装饰器应用于 Task 类时遇到的错误。

删除所有 .pyc 文件,重新启动 celery worker 并重试。

不要直接使用类
  • Tasks are instantiated only once per (worker) process ,所以每次创建任务类的对象(在客户端)没有意义,即 Bar()是错的。
  • Foo.delay()Foo().delay()可能有效也可能无效,取决于装饰者的组合 name参数和类 name属性。

  • celery.registry.tasks 获取任务对象字典或只是使用 @celery.task函数上的装饰器(在我的示例中为 foo)。

    关于django - Celery Task 下面这两个任务的区别,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16217297/

    相关文章:

    python - 我可以使用 python os.nice 来降低 celery 任务的友好度吗?

    Django 和 celery : admin task list does not display values

    python - Celery:为什么从接受任务到开始执行有几秒的时间间隔?

    python - Django 覆盖表单

    python - 重用 Django 模型

    python - 似乎无法丢失此错误 : "You are trying to add a non-nullable field"

    python - Django 调试错误页面不适用于 AJAX 调用

    Django REST 框架 : Dynamic serializer relation field - POST pk but GET hyperlink

    linux - 如何检测失败并自动重启 celery worker

    daemon - 使用 celeryd 作为多个 django 应用程序的守护进程?