python - gunicorn 与 gevent worker : Using a shared global list

标签 python gunicorn gevent

我正在尝试按照以下简单方法在我的 Flask 应用程序中实现服务器发送事件:http://flask.pocoo.org/snippets/116/

为了提供应用程序,我将 gunicorn 与 gevent worker 一起使用。

我的代码的最小版本如下所示:

import multiprocessing

from gevent.queue import Queue
from gunicorn.app.base import BaseApplication
from flask import Flask, Response

app = Flask('minimal')
# NOTE: This is the global list of subscribers
subscriptions = []


class ServerSentEvent(object):
    def __init__(self, data):
        self.data = data
        self.event = None
        self.id = None
        self.desc_map = {
            self.data: "data",
            self.event: "event",
            self.id: "id"
        }

    def encode(self):
        if not self.data:
            return ""
        lines = ["%s: %s" % (v, k)
                 for k, v in self.desc_map.iteritems() if k]
        return "%s\n\n" % "\n".join(lines)


@app.route('/api/events')
def subscribe_events():
    def gen():
        q = Queue()
        print "New subscription!"
        subscriptions.append(q)
        print len(subscriptions)
        print id(subscriptions)
        try:
            while True:
                print "Waiting for data"
                result = q.get()
                print "Got data: " + result
                ev = ServerSentEvent(unicode(result))
                yield ev.encode()
        except GeneratorExit:
            print "Removing subscription"
            subscriptions.remove(q)
    return Response(gen(), mimetype="text/event-stream")


@app.route('/api/test')
def push_event():
    print len(subscriptions)
    print id(subscriptions)
    for sub in subscriptions:
        sub.put("test")
    return "OK"


class GunicornApplication(BaseApplication):
    def __init__(self, wsgi_app, port=5000):
        self.options = {
            'bind': "0.0.0.0:{port}".format(port=port),
            'workers': multiprocessing.cpu_count() + 1,
            'worker_class': 'gevent',
            'preload_app': True,
        }
        self.application = wsgi_app
        super(GunicornApplication, self).__init__()

    def load_config(self):
        config = dict([(key, value) for key, value in self.options.iteritems()
                       if key in self.cfg.settings and value is not None])
        for key, value in config.iteritems():
            self.cfg.set(key.lower(), value)

    def load(self):
        return self.application


if __name__ == '__main__':
    gapp = GunicornApplication(app)
    gapp.run()

问题是订户列表似乎对每个工作人员都不同。这意味着如果 worker #1 处理 /api/events 端点并将新订阅者添加到列表中,则客户端将仅接收当 worker #1 也处理 /api/test 端点。

奇怪的是,实际列表对象似乎对每个工作人员都是相同的,因为 id(subscriptions) 在每个工作人员中返回相同的值。

有解决办法吗?我知道我可以只使用 Redis,但应用程序应该尽可能独立,所以我试图避免任何外部服务。

更新: 问题的原因似乎是我嵌入了 gunicorn.app.base.BaseApplication(这是一个 new feature in v0.19) .使用 gunicorn -k gevent minimal:app 从命令行运行应用程序时,一切都按预期运行

更新 2: 之前的怀疑被证明是错误的,它起作用的唯一原因是因为 gunicorn 的默认工作进程数是 1,当将数字调整为通过 -w 参数拟合代码,它表现出相同的行为。

最佳答案

你说:

the actual list object seems to be the same for each worker, since id(subscriptions) returns the same value in every worker.

但我认为这不是真的,每个 worker 上的 subscriptions 不是同一个对象。每个worker都是一个独立的进程,有自己的内存空间。

对于自包含系统,您可以开发一个微型系统,其功能类似于 Redis 的简单版本。例如,使用 SQLite 或 ZeroMQ 在这些工作人员之间进行通信。

关于python - gunicorn 与 gevent worker : Using a shared global list,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30151206/

相关文章:

python - Flask-Login & Flask-Principle 认证用户掉落到 flask_login.AnonymousUserMixin

http-headers - 如何防止 Gunicorn 返回 'Server' http header ?

python - gevent 与其他包的兼容性

python - 将 nginx 和 Gunicorn 连接在一起

python - 在 json 中格式化 Flask 应用程序日志

wsgi - 使用 Gevent 和 WSGI 阻止调用

python - 即使所有队列项目都已用尽,Gevent 线程也不会完成

python - 装饰器会消耗更多内存吗?

python生成xml

python - Django 中消息队列消费者放在哪里?