python - 为什么 `celery.current_app` 引用 Flask View 函数中的默认实例

标签 python flask thread-safety celery

我并没有尝试在 View 函数中使用 celery.current_app,但是我有一个挂接到 after_task_publish 信号的函数,该信号使用它来更新状态发布任务后,它在 Flask View 函数之外工作并正确更新状态,但是当我从 View 函数内部发送任务时,任务状态不会更新,我检查了一下,问题是 current_app.backendDisabledBackend 的一个实例,它是默认的,而不是我正在使用的 RedisBackend 的一个实例。

发生这种情况是因为在 Flask View 函数中,当前 Celery 实例 celery.current_app 的代理指的是在没有当前 Celery 实例时创建的默认实例。

我尝试重现正在发生的事情,这是一个测试脚本:

from __future__ import absolute_import, print_function, unicode_literals

from flask import Flask, request

from celery import Celery, current_app
from celery.signals import after_task_publish
# internal module for debugging purposes
from celery._state import default_app, _tls


# the flask application
flask_app = Flask(__name__)

# the celery application
celery_app = Celery('tasks', broker='amqp://', backend='redis://')

# debugging info
debug = """
[{location}]
celery_app       = {celery_app}
current_app      = {current_app}
add.app          = {add_app}
default_app      = {default_app}
_tls.current_app = {tls_current_app}
"""

print(debug.format(
    location = 'OUTSIDE VIEW',
    celery_app = celery_app,
    current_app = current_app,
    add_app = add.app,
    default_app = default_app,
    tls_current_app = _tls.current_app
))


# fired after a task is published
@after_task_publish.connect
def after_publish(sender=None, body=None, **kwargs):
    print(debug.format(
        location = 'INSIDE SIGNAL FUNCTION',
        celery_app = celery_app,
        current_app = current_app,
        add_app = add.app,
        default_app = default_app,
        tls_current_app = _tls.current_app
    ))

# a simple task for testing
@celery_app.task(name='add')
def add(a, b):
    return a + b


@flask_app.route('/add')
def add_view():
    print(debug.format(
        location = 'INSIDE VIEW',
        celery_app = celery_app,
        current_app = current_app,
        add_app = add.app,
        default_app = default_app,
        tls_current_app = _tls.current_app
    ))

    a = request.args.get('a')
    b = request.args.get('b')

    task = add.delay(a, b)

    return task.task_id


if __name__ == '__main__':
    flask_app.run(debug=True)

这是输出:

[OUTSIDE VIEW]
celery_app       = <Celery tasks:0xb69ede4c>
current_app      = <Celery tasks:0xb69ede4c>
add.app          = <Celery tasks:0xb69ede4c>
default_app      = None
_tls.current_app = <Celery tasks:0xb69ede4c>


[INSIDE VIEW]
celery_app       = <Celery tasks:0xb69ede4c>
current_app      = <Celery default:0xb6b0546c>
add.app          = <Celery tasks:0xb69ede4c>
default_app      = None
_tls.current_app = None   # making current_app fallback to the default instance


[INSIDE SIGNAL FUNCTION]
celery_app       = <Celery tasks:0xb69ede4c>
current_app      = <Celery default:0xb6a174ec>
add.app          = <Celery tasks:0xb69ede4c>
default_app      = None
_tls.current_app = None

因为 _tls.current_app 在 View 中是 None,这就是 celery.current_app 引用默认实例的原因,来自 celery._state._get_current_app :

return _tls.current_app or default_app

_tlscelery._state._TLS 的实例:

class _TLS(threading.local):
    #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
    #: sets this, so it will always contain the last instantiated app,
    #: and is the default app returned by :func:`app_or_default`.
    current_app = None

问题与线程相关吗?这可能是一个错误吗?还是这是预期的行为?

请注意,我可以在我的钩子(Hook)函数中使用实际的 celery 实例,一切都会正常工作,但我担心在其他地方使用的 celery.current_app 会破坏我的代码。

最佳答案

我在未启用调试的情况下运行 Flask 应用程序时发现了问题,并且它没有任何问题地工作,当 debugTrue 时,使用重新加载程序运行应用程序另一个线程,这发生在 werkzeug._reloader.run_with_reloader 函数中。

并且取决于 Python docs关于 threading.local 类,它被子类化以存储当前应用程序实例:

A class that represents thread-local data. Thread-local data are data whose values are thread specific.

The instance’s values will be different for separate threads.

所以这意味着 celery._state._tls.current_app 不在线程之间共享,我们必须手动将 celery 实例设置为当前应用程序,例如在 View 函数中:

celery_app.set_current()

关于python - 为什么 `celery.current_app` 引用 Flask View 函数中的默认实例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26527214/

相关文章:

python - 警告 :tensorflow:11 out of the last 11 calls to triggered tf. 函数回溯

python - Python 中的函数乘法

python - 将非常大的 n 基数转换为字节

python - 牛津计划的演讲者识别-无效的音频格式

multithreading - 为什么 java.util.HashMap.getEntry 可以阻止我的程序?

Python字符串处理优化

python - Flask 文件上传挂起

python - 您尚未定义默认连接

c# - Windows 窗体线程到底发生了什么?

authentication - 使用Spring Security时,请求之间是否共享SecurityContext?