django - Django Channels 消费者测试 : "No reply_channel sent to consumer"

标签 django django-testing django-channels

我正在使用 Django Channels@channel_session_user 装饰器(用于访问 Django 的 session 数据)。

@channel_session_user_from_http
def ws_connect(message):
    # creates group names like "group-1"
    group_kw = get_group_id_for_user(message.user)
    Group(group_kw).add(message.reply_channel)

@channel_session_user
def ws_receive(message):
    group_kw = get_group_id_for_user(message.user)
    payload = json.loads(message.content['text'])
    Channel(payload['action']).send(message.content)

@channel_session_user
def ws_disconnect(message):
    group_kw = get_group_id_for_user(message.user)
    Group(group_kw).discard(message.reply_channel)

可以正常运行,但是在测试的时候出现了问题。

下面的测试应该在 websocket.receive channel 上放置一条消息,然后 ws_receive 应该接收消息并将其放置在消息的 中定义的 channel 上 Action 值。最后,我测试它是否真的放在那个 channel 上。

def test_send_chat_message_is_used_by_consumer(self):
    # Make sure a user is authenticated
    self.assertTrue(auth.get_user(self.client).is_authenticated())

    payload = {'action': 'chat.receive',
               'msg': 'Test message.',
               'receiver': self.user2.id}
    message = {'text': json.dumps(payload)}

    # Send a chat message
    Channel('websocket.receive').send(message)
    # Receive it and place it on the right channel
    ws_receive(self.get_next_message('websocket.receive', require=True))
    # Fetch it from the channel
    result = self.get_next_message(payload['action'], require=True)
    # That should be the message sent
    self.assertEqual(result, message)

相反,我收到以下错误,指向调用 ws_receive() 的行。

ValueError: No reply_channel sent to consumer; @channel_session can only be used on messages containing it.

错误is raised here channel 源中。

打印 reply_channel 返回 None 而不是包含正确的回复 channel 名称。

tmp = self.get_next_message('websocket.receive', require=True)
print(tmp.reply_channel)  # prints: None

我忽略了一些明显的东西?

最佳答案

我认为你不能简单地调用消费者,因为它有装饰器@channel_session_user。您应该尝试使用 Client channels.tests 提供的。

from channels.tests import ChannelTestCase, Client

然后在测试函数中使用类似这样的东西

client = Client()
client.send('websocket.receive', content=message)

关于django - Django Channels 消费者测试 : "No reply_channel sent to consumer",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38952841/

相关文章:

Python - 错误 - 无法将数据写入流 : <open file '<stdout>' , 模式 'w' 在 0x104c8f150>

python - Django-filter 和 Django-tables2 使用外部属性

django - 如何使用 Python 在 Django 中只有正十进制数?

django - 测试 Jinja2 支持的 Django View 时如何访问 response.context

django - 我需要开始工作吗?

python - 缩短 HTML 内容 python/django

python - 测试中的模型 - Django 1.7 问题

python - 尽管两个查询集在 shell 中打印出相同的内容,但 Django 的 assertQuerysetEqual() 方法还是失败了?

django - 调试 django-channels

django - 达芙妮 channel 2和Redis层的CPU使用率高