python - Django Channels 2.0 channel_layers 不通信

标签 python django websocket celery django-channels

所以我一直在迁移使用 Django Channels 1.x -> 2.x+ 的服务器

最初的设计将使用 getAFTreeTask.delay(message.reply_channel.name) 向 celery 发送任务并通过访问 channel_name它可能会异步回复

from celery import task
from channels import Channel

@task
def getAFTreeTask(channel_name):
    tree = Request().cache_af_tree()
    Channel(channel_name).send({
        "text": json.dumps({
            "channel": "AF_INIT",
            "payload": tree
        })
    })

出于各种原因,现在我已将我的服务器迁移到 Channels 2.x+。根据文档

class Consumer(JsonWebsocketConsumer):

     def connect(self):
        print("Client Connected: ", self.channel_name)
        self.accept()

    def receive_json(self, content, **kwargs):
        print(content)
        parse_request(self.channel_name, content)

    def disconnect(self, content):
        print(content)

    def chat_message(self, event):
        print("Entered reply channel")
        print(event)

如果我使用正确的 channel_name,这样的消费者应该通过 channel 层接收请求,现在如果响应可以访问 self.send_json(),消费者可以正确地作为发送-接收 websocket 工作或 self.send()对于其他通用消费者,所以我假设我所有的设置都是正确的,我的问题是当我尝试使用 channel 层发送一些东西时,就像这样(根据 https://channels.readthedocs.io/en/latest/topics/channel_layers.html#single-channels )

from channels.layers import get_channel_layer
from asgiref.sync import AsyncToSync

def parse_request(channel_name, content):

    print("parsed ", channel_name, content)
    channel_layer = get_channel_layer()
    AsyncToSync(channel_layer.send)(channel_name, {
        "type": "chat.message",
        "text": "Hello there!",
    })

我明白了

编辑(完整堆栈跟踪):

    2018-02-02 18:28:35,984 ERROR    Exception inside application: There is no current event loop in thread 'Thread-3'.
  File "/usr/lib/python3.5/asyncio/tasks.py", line 241, in _step
    result = coro.throw(exc)
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/consumer.py", line 51, in __call__
    await await_many_dispatch([receive, self.channel_receive], self.dispatch)
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/utils.py", line 48, in await_many_dispatch
    await dispatch(result)
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 81, in inner
    return await async_func(*args, **kwargs)
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 65, in __call__
    return await asyncio.wait_for(future, timeout=None)
  File "/usr/lib/python3.5/asyncio/tasks.py", line 373, in wait_for
    return (yield from fut)
  File "/usr/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.5/asyncio/tasks.py", line 296, in _wakeup
    future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/concurrent/futures/thread.py", line 55, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 74, in thread_handler
    raise e
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 72, in thread_handler
    self.func(*args, **kwargs)
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/consumer.py", line 93, in dispatch
    handler(message)
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/generic/websocket.py", line 40, in websocket_receive
    self.receive(text_data=message["text"])
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/channels/generic/websocket.py", line 104, in receive
    self.receive_json(self.decode_json(text_data), **kwargs)
  File "./MYAPP/API/consumers.py", line 13, in receive_json
    parse_api_request(self.channel_name, content)
  File "./MYAPP/API/api_request.py", line 16, in parse_api_request
    AsyncToSync(channel_layer.send)(channel_name, {
  File "/home/chris/Env/myapp/lib/python3.5/site-packages/asgiref/sync.py", line 17, in __init__
    self.main_event_loop = asyncio.get_event_loop()
  File "/usr/lib/python3.5/asyncio/events.py", line 632, in get_event_loop
    return get_event_loop_policy().get_event_loop()
  File "/usr/lib/python3.5/asyncio/events.py", line 578, in get_event_loop
    % threading.current_thread().name)
  There is no current event loop in thread 'Thread-3'.

如果我不使用 AsyncToSync我明白了(根据文档我不应该这样做,只是为了检查)

2018-02-02 18:34:27,965 WARNING  ./MYAPP/API/api_request.py:18: builtins.RuntimeWarning: coroutine 'RedisChannelLayer.send' was never awaited

我不明白,因为我完全按照指南进行了操作,我也尝试从 celery 任务(一个单独的线程)中回复,但没有得到相同的错误,但什么也没发生,celery 日志只是说任务已完成,但我没有收到回复。

此外,尝试直接通过

发送响应
AsyncToSync(channel_layer.send)(channel_name, {
            "type": "websocket.send",
            "text": "Hello there!",
        })

从线程内和线程外得到相同的非结果....

有没有人能够通过 Consumers 之外的 channel_layers 进行发送?对象。

仅供引用我的 settings.py

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'channels',
    'myapp',
]
ASGI_APPLICATION = "myapp.routing.application"

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("localhost", 6379)],
        },
    },
}

最佳答案

经过 reply来自安德烈斯·戈德温:

我发现这是 asgiref < 2.1.3 中的错误,通过升级 SyncToAsync/AsyncToSync 的返回值已修复!

所以我的工作实现,对于任何感兴趣的人:

consumers.py

from channels.consumer import AsyncConsumer

class My_Consumer(AsyncConsumer):

async def websocket_connect(self, event):
    print("Connected")
    print(event)
    print(self.channel_name)
    await self.send({
        "type": "websocket.accept",
    })

async def websocket_receive(self, event):
    print("Received")
    print(event)
    parse_api_request(self.channel_name, json.loads(event['text']))

async def celery_message(self, event):
    print("Service Received")
    print(event)
    await self.send({
        "type": "websocket.send",
        "text": event["text"],
    })


task.py 

from channels.layers import get_channel_layer
from asgiref.sync import AsyncToSync


def async_send(channel_name, text):
    channel_layer = get_channel_layer()
    AsyncToSync(channel_layer.send)(
            channel_name,
            {"type": "celery.message",
             "text": json.dumps(text)
             })


def getAFTree(channel_name, message):
    getAFTreeTask.delay(channel_name, message)



@task
def getAFTreeTask(channel_name, message):
    tree = Request().cache_af_tree()
    async_send(channel_name, {
                "channel": "AF_INIT",
                "payload": tree
             })

关于python - Django Channels 2.0 channel_layers 不通信,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48590263/

相关文章:

python - 在函数中使用IPython.display.audio在jupyter笔记本中播放音频无法正常工作

django - 如何像在 Django Admin 中一样在表单中创建外键链接字段?

Java - Play2 WebSockets 实现需要 Actors 吗?

google-chrome-extension - 从 chrome 扩展访问 Websocket 流量

python - NumPy 数组 : assignment of value fails

python - 快速访问字典中的部分数据

python - Django:如何在模板的 if 语句中使用变量?

Django 模型继承,过滤模型

windows - Windows 8 应用商店应用程序可以通过 UDP/TCP 套接字与 Windows 7 桌面应用程序通信吗?

python - 用 Python 计算 Keras 神经网络的准确性