python - 在另一个消费者的一个 Django Channels 消费者中打破循环

标签 python django websocket django-channels

我正在使用 Django 和使用 websockets 的 Django Channels 构建网络应用。

当用户单击浏览器中的按钮时,websocket 将数据发送到我的服务器,服务器上的消费者开始每秒向客户端发送一次消息(循环)。

我想创建另一个按钮来停止此数据发送过程。当用户点击这个新按钮时,websocket 将另一个数据发送到服务器,服务器上的消费者必须以某种方式停止先前消费者的循环。此外,当客户端断开连接时,我将要求它停止循环。

我很想使用全局变量。但 Django Channels 文档指出,他们强烈不建议使用全局变量,因为他们希望保持应用程序网络透明(不太了解这一点)。

我试过使用 channel session 。我让第二个消费者更新 channel session 中的值,但 channel session 值没有在第一个消费者中更新。

这里是简化的代码。 浏览器:

var socket = new WebSocket("ws://" + window.location.host + "/socket/");
$('#button1').on('click', function() { 
    socket.send(JSON.stringify({action: 'start_getting_values'}))
});
$('#button2').on('click', function() { 
    socket.send(JSON.stringify({action: 'stop_getting_values'}))
});

服务器上的消费者:

@channel_session
def ws_message(message):
    text = json.loads(message.content['text'])

    if text['action'] == 'start_getting_values':
        while True:
            # Getting some data here
            # ...
            message.reply_channel.send({"text": some_data}, immediately=True)
            time.sleep(1)

    if text['action'] == 'stop_getting_values':
        do_something_to_stop_the_loop_above()

最佳答案

好吧,在联系 Django Channels 开发人员后,我设法自己解决了这个任务。

在消费者内部创建循环的方法很糟糕,因为一旦消费者运行的次数等于运行该消费者的所有工作线程的数量,它就会阻塞站点。

所以我的方法如下:一旦消费者收到“start_getting_values”信号,它就会将当前回复 channel 添加到一个组,并在它连接的 Redis 服务器上增加值(我使用 Redis 作为 channel 层后端,但它会在任何其他后端)。

它会增加什么值?在 Redis 上,我有一个键说哈希对象类型的“组”。该键的每个键代表 Channels 中的一个组,值代表该组中回复 channel 的数量。

然后我创建了一个新的 python 文件,我在其中连接到同一个 Redis 服务器。在这个文件中,我运行无限循环,从 Redis 的关键“组”加载字典。然后我遍历这个字典中的每个键(每个键代表 channel 组名称)并将数据广播到每个具有非零值的组。当我运行这个文件时,它作为单独的进程运行,因此不会阻止消费者方面的任何事情。

要停止向用户广播,当我从他那里得到适当的信号时,我只是将他从组中删除并减少适当的 Redis 值。

消费者代码:

import redis

redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)

@channel_session_user
def ws_message(message):

    text = json.loads(message.content['text'])

    if text['stream'] == 'start_getting_values':
        value_id = text['value_id']
        redis_client.hincrby('redis_some_key', value_id, 1)
        Group(value_id).add(message.reply_channel)
        channel_session['value_id'] = value_id
        return 0

    if text['stream'] == 'stop_getting_values':
        if message.channel_session['value_id'] != '':
            value_id = message.channel_session['value_id']
            Group(value_id).discard(message.reply_channel)

            l = redis_client.lock(name='del_lock')
            val = redis_client.hincrby('redis_some_key', value_id, -1)
            if (val <= 0):
                redis_client.hdel('redis_some_key', value_id)
            l.release()
        return 0

单独的 python 文件:

import time
import redis
from threading import Thread
import asgi_redis


redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
cl = asgi_redis.RedisChannelLayer()

def some_action(value_id):

    # getting some data based on value_id
    # ....

    cl.send_group(value_id, {
        "text": some_data,
    })


while True:
    value_ids = redis_client.hgetall('redis_some_key')

    ths = []
    for b_value_id in value_ids.keys():
        value_id = b_value_id.decode("utf-8")
        ths.append(Thread(target=some_action, args=(value_id,)))

    for th in ths:
        th.start()
    for th in ths:
        th.join()


    time.sleep(1)

关于python - 在另一个消费者的一个 Django Channels 消费者中打破循环,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45369448/

相关文章:

python - 如何解决加载模型以获得新预测的问题?

mysql - 外键约束不允许从 django 中的表中删除行

java - 使用 Websocket 连接 stomp 和 ActiveMQ

python - 错误的文件描述符 - Heroku Foreman

python - pandas 从日期时间索引中删除秒

python - XPath:通过当前节点属性选择当前和下一个节点的文本

Django 模板标记将空日期显示为 "still open"

Django模型递归关系

ios - 在 Xcode 中连接到服务器时出现错误 "Timed out waiting for the socket to be ready for a write"

websocket - WebSocket 握手期间出现间歇性错误 : Unexpected response code: 400 on CloudBees