在测试我的 Websocket 消费者时,未创建模型对象Django 频道

问题描述

我是 Django Channels 的新手,我正在尝试构建一个简单的聊天应用程序。但是当我尝试测试我的异步 Websocket 消费者时,我遇到了以下异常:chat.models.RoomModel.DoesNotExist:RoomModel 匹配查询不存在。

似乎没有创建测试室。

test.py 文件如下:

class ChatTest(TestCase):

    @sync_to_async
    def set_data(self):
        room = RoomModel.objects.create_room('Room1','room_password_123')
        room_slug = room.slug
        user = User.objects.create_user(username='User1',password='user_password_123')
        print(RoomModel.objects.all()) # querySet here contains the created room
        return room_slug,user

    async def test_my_consumer(self):
        room_slug,user = await self.set_data()
        application = URLRouter([
            re_path(r'ws/chat/(?P<room_name>\w+)/$',ChatConsumer.as_asgi()),])
        communicator = WebsocketCommunicator(application,f'/ws/chat/{room_slug}/')
        communicator.scope['user'] = user
        connected,subprotocol = await communicator.connect()
        self.assertTrue(connected)
        await communicator.send_json_to({'message': 'hello'})
        response = await communicator.receive_json_from()
        self.assertEqual('hello',response)
        await communicator.disconnect()

我的consumer.py文件如下:

class ChatConsumer(AsyncWebsocketConsumer):

    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'
        await self.channel_layer.group_add(self.room_group_name,self.channel_name)
        await self.accept()

    async def disconnect(self,close_code):
        await self.channel_layer.group_discard(self.room_group_name,self.channel_name)

    async def receive(self,text_data):
        self.message = json.loads(text_data).get('message')
        data = {'type': 'chat_message'}
        data.update(await self.create_message())
        await self.channel_layer.group_send(self.room_group_name,data)

    async def chat_message(self,event):
        await self.send(text_data=json.dumps({
            'message': event['message'],'user': event['user'],'timestamp': event['timestamp']
        }))

    @database_sync_to_async
    def create_message(self):
        print(RoomModel.objects.all()) # qyerySet turns out to be empty here
        room = RoomModel.objects.get(slug=self.room_name)
        msg = room.messagemodel_set.create(text=self.message,user_name=self.scope['user'])
        return {
            'message': msg.text,'user': self.scope['user'].username,'timestamp': msg.timestamp.strftime("%d/%m/%Y,%H:%M")
        }

如果能得到任何帮助,我将不胜感激。

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)