python - 如何使用 Flask 应用程序工厂模式实现 Celery

标签 python flask celery blueprint

我在使用 python flask application factory app 实现 celery 时遇到问题

我打算从应用程序初始化文件中创建一个 Celery 应用程序的实例,如下所示:

from celery import Celery
celery = Celery('myapp', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

调用时我不能使用其他蓝图中的 Celery。

最佳答案

def init_celery(app):
    celery = Celery()
    celery.conf.broker_url = app.config['CELERY_BROKER_URL']
    celery.conf.result_backend = app.config['CELERY_RESULT_BACKEND']
    celery.conf.update(app.config)

    class ContextTask(celery.Task):
        """Make celery tasks work with Flask app context"""
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)

    celery.Task = ContextTask
    return celery

create_app时初始化celery:

init_celery(app)

this Flask cookiecutter 中了解 celery 是如何实现的

关于python - 如何使用 Flask 应用程序工厂模式实现 Celery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58540194/

相关文章:

python - 自动加载 Django Fixture

python - SleekXMPP: "Certificate has expired."

python - 类型错误 : no salt specified in flask(about sha256)

python - 使用 python3 而不是 python 运行 Flask

python - 如何确保 Celery 任务在 Pytest 中排队?

python - 在 Django 中中止 Celery 中正在运行的任务

带有硬制表符的 Spacemacs 中的 Python 缩进已关闭

python - 添加到 memcache 时发生 PicklingError

Python 请求 : how to GET and POST a picture without saving to drive?

python - 无法使用 "PicklingError"在 Windows 10 上启动 Celery worker