python - 如何在 Django channel 中为消费者编写单元测试?

标签 python django django-channels channels django-unittest

我是 django-channels 的初学者。我正在使用 channels 2.3.1。我有以下代码。

主路由

websocket_urlpatterns = [
    url(r'^ws/events/(?P<room_type>[^/]+)/(?P<room_id>[^/]+)/$', consumers.Consumer, name='core-consumer'),
]

应用路由

application = ProtocolTypeRouter({
    'websocket': URLRouter(core.routing.websocket_urlpatterns),
})

消费者

class Consumer(AsyncWebsocketConsumer):
    async def connect(self):
        room_type = self.scope['url_route']['kwargs']['room_type']
        room_id = self.scope['url_route']['kwargs']['room_id']
        self.room_group_name = f'{room_type}_{room_id}'

        await self.channel_layer.group_add(self.room_group_name, self.channel_name)
        await self.accept()

    async def receive(self, text_data = None, bytes_data = None):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        await self.channel_layer.group_send(self.room_group_name,
            {'type': 'send_data', 'message': message}
        )

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.room_group_name, self.channel_name)

    async def send_data(self, event):
        message = event['message']
        await self.send(text_data=json.dumps({
            'data': message
        }))

def send_event(channel='', message=''):
    channel_layer = get_channel_layer()
    asyncio.run(channel_layer.group_send(channel,
        {
            'type': 'send_data',
            'message': message
        }
    ))

信号

    def activate_notification(self):
        send_event(f'user_{self.id}',
                   {'message': f"Welcome back to one deeds!"})

如何使用 unittest 测试 consumer 中的每个函数?

我读了这个documentation但什么都不懂(

我想用命令 ./manage.py test 运行测试

我尝试使用 WebsocketCommunicator 进行测试

最佳答案

我会提出一些建议。

1) 使用 channels.layers.InMemoryChannelLayer 层。

2) 使用pytest-asyncio 这样你的测试代码就可以是async 方法

@pytest.mark.django_db(transaction=True)
@pytest.mark.asyncio
async def test_get():
   ....

要运行这些测试,您将使用 pytest

3) 使用 WebsocketCommunicator 检查这些 examples .

关于python - 如何在 Django channel 中为消费者编写单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60127144/

相关文章:

python - python flex环境导入报错google.appengine.api

Python 子字符串 - 将第 n 个字符拆分到某个字符串的左侧

django - python-social-auth,无需断开连接即可注销

django - 将请求从一个 uwsgi 分派(dispatch)到另一个运行 Django Channels 的 uwsgi 实例

python - 在 Windows 中使用 Protocol Buffer

python - 在 Python 3 中提供目录

Django:在通用 DetailView 中实现表单

python - Django.db 导入错误

python - VueJS + Django channel

Django channel WebSocket 握手期间出错 : Unexpected response code: 500