flask - 带create_app,SQLAlchemy和Celery的 flask

标签 flask celery flask-sqlalchemy

我真的在努力为Flask,SQLAlchemy和Celery设置正确的设置。我进行了广泛的搜索并尝试了不同的方法,但似乎没有任何效果。我错过了应用程序上下文或无法运行工作程序,或者还有其他问题。该结构非常通用,因此我可以构建一个更大的应用程序。

我正在使用:Flask 0.10.1,SQLAlchemy 1.0,Celery 3.1.13,我当前的设置如下:

app / __ init__.py

#Empty

app / config.py
import os
basedir = os.path.abspath(os.path.dirname(__file__))

class Config:

    @staticmethod
    def init_app(app):
        pass

class LocalConfig(Config):
    DEBUG = True
    SQLALCHEMY_DATABASE_URI = r"sqlite:///" + os.path.join(basedir, 
                                 "data-dev.sqlite")
    CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'


config = {
    "local": LocalConfig}

app / exstensions.py
from flask.ext.sqlalchemy import SQLAlchemy
from celery import Celery

db = SQLAlchemy()
celery = Celery()

app / factory.py
from extensions import db, celery
from flask import Flask
from flask import g
from config import config

def create_before_request(app):
    def before_request():
        g.db = db
    return before_request


def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])

    db.init_app(app)
    celery.config_from_object(config)

    # Register the blueprints

    # Add the before request handler
    app.before_request(create_before_request(app))
    return app

app / manage.py
from factory import create_app

app = create_app("local")

from flask import render_template
from flask import request

@app.route('/test', methods=['POST'])
def task_simple():
    import tasks
    tasks.do_some_stuff.delay()
    return ""

if __name__ == "__main__":
    app.run()

app / models.py
from extensions import db

class User(db.Model):
    __tablename__ = "user"

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(128), unique=True, nullable=False)

app / tasks.py
from extensions import celery
from celery.signals import task_prerun
from flask import g, current_app


@task_prerun.connect
def close_session(*args, **kwargs):
    with current_app.app_context():
       # use g.db
       print g

@celery.task()
def do_some_stuff():
    with current_app.app_context():
       # use g.db
       print g

在文件夹应用中:
  • 使用以下命令启动开发Web服务器:python.exe manage.py
  • 使用以下命令启动工作人员:celery.exe worker -A tasks

  • 我收到一个导入错误,对我来说没有任何意义。
    我应该以不同的方式构造应用程序吗?最后,我想我想要一个非常基本的设置,例如通过将Flask与工厂模式一起使用,可以使用Flask-SQLAlchmey扩展名,并需要一些工作人员来访问数据库。

    非常感谢您的帮助。

    启动 celery worker 时执行回溯。
    Traceback (most recent call last):
    
      File "[PATH]\scripts\celery-script.py", line 9, in <module>
        load_entry_point('celery==3.1.13', 'console_scripts', 'celery')()
    
      File "[PATH]\lib\site-packages\celery\__main__.py", line 30, in main
        main()
    
      File "[PATH]\lib\site-packages\celery\bin\celery.py", line 81, in main
        cmd.execute_from_commandline(argv)
    
      File "[PATH]\lib\site-packages\celery\bin\celery.py", line 769, in execute_from_commandline
        super(CeleryCommand, self).execute_from_commandline(argv)))
    
      File "[PATH]\lib\site-packages\celery\bin\base.py", line 305, in execute_from_commandline
        argv = self.setup_app_from_commandline(argv)
    
      File "[PATH]\lib\site-packages\celery\bin\base.py", line 473, in setup_app_from_commandline
        user_preload = tuple(self.app.user_options['preload'] or ())
    AttributeError: 'Flask' object has no attribute 'user_options'
    

    更新我根据注释中的建议更改代码。工作程序现在启动,但是在测试时使用http://127.0.0.1:5000/test的get请求。我得到以下回溯:
    Traceback (most recent call last):
      File "[PATH]\lib\site-packages\celery\app\trace.py", line 230, in trace_task
        args=args, kwargs=kwargs)
    
      File "[PATH]\lib\site-packages\celery\utils\dispatch\signal.py", line 166, in send
        response = receiver(signal=self, sender=sender, \**named)
    
      File "[PATH]\app\stackoverflow\tasks.py", line 7, in close_session
        with current_app.app_context():
    
      File "[PATH]\lib\site-packages\werkzeug\local.py", line 338, in __getattr__
        return getattr(self._get_current_object(), name)
    
      File "[PATH]\lib\site-packages\werkzeug\local.py", line 297, in _get_current_object
        return self.__local()
    
      File "[PATH]\lib\site-packages\flask\globals.py", line 34, in _find_app
        raise RuntimeError('working outside of application context')
    RuntimeError: working outside of application context exc, exc_info.traceback)))
    

    更新基于Marteen的评论,我更改了代码。当前的工作版本在https://gist.github.com/anonymous/fa47834db2f4f3b8b257下。
    欢迎任何进一步的改进或建议。

    最佳答案

    我不喜欢current_app的建议。

    您的celery对象需要访问应用程序上下文。我在网上找到了有关使用工厂功能创建Celery对象的信息。以下示例在没有消息代理的情况下进行了测试。

    #factory.py
    from celery import Celery
    from config import config
    
    def create_celery_app(app=None):
        app = app or create_app(config)
        celery = Celery(__name__, broker=app.config['CELERY_BROKER_URL'])
        celery.conf.update(app.config)
        TaskBase = celery.Task
    
        class ContextTask(TaskBase):
            abstract = True
    
            def __call__(self, *args, **kwargs):
                with app.app_context():
                    return TaskBase.__call__(self, *args, **kwargs)
    
        celery.Task = ContextTask
        return celery
    

    在tasks.py中:
    #tasks.py
    from factory import create_celery_app
    from celery.signals import task_prerun
    from flask import g
    
    celery = create_celery_app()
    
    @task_prerun.connect
    def celery_prerun(*args, **kwargs):
        #print g
        with celery.app.app_context():
        #   # use g.db
           print g
    
    @celery.task()
    def do_some_stuff():
        with celery.app.app_context():
            # use g.db
            g.user = "test"
            print g.user
    

    一些链接:

    Flask pattern for creating a Celery instance with factory function

    Application using both application factory and celery

    Source for said application's factory.py

    Source for application tasks.py

    关于flask - 带create_app,SQLAlchemy和Celery的 flask ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25360136/

    相关文章:

    python - 当我运行 Celery worker 时出现 AttributeError

    python - 让 sqlalchemy View 反射在 fl​​ask-sqlalchemy 中工作

    python - sqlalchemy:TypeError:无法散列的类型创建实例,sqlalchemy

    python - celery worker 接收任务,即使我在 RabbitMQ 队列上没有消息

    python - Celery:查询未决任务的标准方法?

    python - 仅复制 SQLAlchemy 对象中的数据列

    flask - 如何登录 flask

    python - Flask-SQLAlchemy 缺少 "caching_sha2_password"身份验证模块

    python - Flask-Bokeh-Gunicorn : Models must be owned by a single document

    python - 使用 Flask 和 SQLAlchemy 的显式主-主数据库设置,希望使用 Flask-SQLAlchemy