python - 为什么 Celery 只识别来自单个 Django 应用程序的任务

标签 python django celery django-celery

所以我们有这个包含多个应用程序的 Django 项目,我们使用 celery 来处理任务。我们遇到的问题是只有单个应用程序的 tasks.py 中的任务会运行,其他应用程序中的其他 tasks.py 任务返回以下错误:

celery_1     | [2018-10-22 08:27:59,563: ERROR/MainProcess] Received unregistered task of type 'biko.supplier.tasks.test_task'.
celery_1     | The message has been ignored and discarded.
celery_1     |
celery_1     | Did you remember to import the module containing this task?
celery_1     | Or maybe you're using relative imports?
celery_1     |
celery_1     | Please see
celery_1     | http://docs.celeryq.org/en/latest/internals/protocol.html
celery_1     | for more information.
celery_1     |
celery_1     | The full contents of the message body was:
celery_1     | b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
celery_1     | Traceback (most recent call last):
celery_1     |   File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 557, in on_task_received
celery_1     |     strategy = strategies[type_]
celery_1     | KeyError: 'biko.supplier.tasks.test_task'

当我运行 test_task.delay() 时会发生这种情况

这里是供应商tasks.py:

from config.celery import app

@app.task(shared=True)
def test_task():
    print("Runnign this task correctly")

这是 shop tasks.py 的一部分,任务在其中正常工作:

from django.contrib.contenttypes.models import ContentType

from config.celery import app
from celery_once import QueueOnce
from django.core.management import call_command
from django.utils import timezone
from raven.contrib.django.raven_compat.models import client

from biko.shop.models import Shop
from config.settings import MAX_INCOMING_BUFFER_RETRIES
from biko.buffer.models import IncomingBuffer, OutgoingBuffer

@app.task(shared=True)
def buffer_products(shop_id):
    shop = Shop.objects.get(id=shop_id)
    shop.get_manager().buffer_products()

这是 celery 配置:

import os

from celery import Celery
from celery.schedules import crontab
from django.conf import settings

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")

app = Celery('biko')

app.config_from_object('django.conf:settings')

app.autodiscover_tasks()
app.conf.ONCE = {
  'backend': 'celery_once.backends.Redis',
  'settings': {
    'url': 'redis://redis',
    'blocking': True,
    'default_timeout': 60 * 60,
    'blocking_timeout': 86400
  }
}

任何不属于 shop/tasks.py 的任务都不会显示为正在加载。我不知道为什么它会从 shop/tasks.py 而不是从另一个应用程序加载任务。

最佳答案

在 celery 配置中;您可以执行以下操作:

# Where app_module represents where tasks exists.
app = Celery('biko', include=['app_module.tasks'])

# Your line should also work, but sometimes it needs the apps configs
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

关于python - 为什么 Celery 只识别来自单个 Django 应用程序的任务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52925242/

相关文章:

python - 如何在 Xcode 中将 Python 函数导入 C?

django - 如何使用 Eclipse 和 Pydev 正确调试?

python - 如何在 ubuntu 服务器上守护 django celery 周期性任务?

django - 由于errno 104, celery 操作不佳

python - celery celerybeat 可以在没有 Django 的情况下使用数据库调度程序吗?

python 使用 __init__ 与仅在类中定义变量-有什么区别吗?

Python: turtle 大小(以像素为单位)

python - 如何在涉及写入文件或 .gzip 文件的多生产者-消费者流程中正确使用 asyncio?

django - 基于 Django 类的 View 的反向 url

python - 为什么 Amazon S3 签名 URL 在本地有效,但在生产服务器上无效?