除非 Heroku dyno 计数按比例增加,否则消息不会到达消费者

问题描述

我们用 React 构建了一个前端,用 Django Rest 框架和通道构建了一个后端。我们使用 Heroku Redis 作为我们的 Redis 提供程序。我们的用户通过 ReconnectingWebSocket 连接到频道。

我们使用的是 Python 3.6 和 Channels 2.4

问题是我们的 API 调用试图将信息传递给套接字,而它们并不总是将信息传递给消费者。我通过打印记录了调用的步骤,打印了 channel_name 它即将尝试将其发送到并确认它是连接时返回给用户内容,但消费者中的打印没有被调用消息永远不会发送给用户

如果我在用户连接到套接字的情况下将 dynos 的数量增加到或多或少的 1-1,那么它似乎解决了问题(或至少使它更可靠)。根据我的理解,1 dyno 应该能够处理许多套接字连接。我的消费者没有收到信号是否有原因?是否有理由增加 dyno 的数量解决问题?

在连接时,我让用户加入一个名为“u_{their id}”的组,以允许将信号发送到以同一用户身份登录的多台计算机。我曾尝试通过他们的 channel_name 直接和通过该组发送消息,但当消息没有通过时,似乎也没有通过。 prints 验证 channel_names 是正确的,消费者仍然没有收到消息。似乎没有发生任何错误。它可能不起作用,然后我会刷新收件人,它会起作用,然后我会再次刷新收件人,它又回到不起作用。

套接字连接肯定是活动的 - 我在前端做了一个简单的函数来 ping 套接字,当我这样做时(即使消费者没有从 API 调用中获得信号),它也会做出响应。

我还注意到,如果我重新启动我的 dynos,当它们加载并重新连接套接字时,第一个用户会在短时间内通过 API 调用工作,然后他们开始不再通过。此外,如果我有一段时间不使用套接字,然后刷新它们似乎也会再次开始短暂工作。

procfile

web: daphne doctalk.asgi:application --port $PORT --bind 0.0.0.0

consumers.py

import json
from asgiref.sync import async_to_sync
from channels.db import database_sync_to_async

from channels.generic.websocket import AsyncWebsocketConsumer
from messages.models import Thread
from profile.models import Onlinestatus,DailyOnlineUserActivity
from rest_framework.authtoken.models import Token
from django.contrib.auth.models import AnonymousUser
from .exceptions import ClientError
import datetime
from django.utils import timezone

class HeaderConsumer(AsyncWebsocketConsumer):
    async def connect(self):   
        await self.accept()
        await self.send("request_for_token")


    async def continue_connect(self):
        print(self.channel_name)
        print(self.channel_layer)
        await self.send(json.dumps({'your_channel_name': self.channel_name}))

        await self.get_user_from_token(self.scope['token'])

        await self.channel_layer.group_send(
            "online_users",{
                "type": "new_user_online","user": self.user,"channel_layer": str(self.channel_layer),"channel_name": self.channel_name,}
        )

        await self.channel_layer.group_add(
            "online_users",self.channel_name,)

        print("adding to personal group u_%d" % self.user['id'])
        await self.channel_layer.group_add(
            "u_%d" % self.user['id'],)


        self.message_threads = set()

        self.message_threads = await self.get_message_ids()

        for thread in self.message_threads:
            await self.monitor_thread(thread)

        self.doa = await self.check_for_or_establish_dailyonlineactivity()
        self.online_status = await self.establish_onlinestatus()
        await self.add_to_online_status_list()

        self.user_id_list = await self.get_online_user_list()
        await self.send_online_user_list()

    async def disconnect(self,code):
        # Leave all the rooms we are still in
        if hasattr(self,'user'):
            await self.remove_from_dailyonlineactivity()

            try:
                await self.channel_layer.group_discard(
                    "u_%d" % self.user['id'],)
            except Exception as e:
                print("issue with self channel")
                print(e)

            try:
                await self.channel_layer.group_send(
                    "online_users",{
                        "type": "user_went_offline","message": self.user['id'],}
                )

            except Exception as e:
                print("issue with online_users")
                print(e)

            await self.channel_layer.group_discard(
                "online_users",)
            try:
                for thread_id in list(self.message_threads):
                    print("leaving " + str(thread_id))
                    try:
                        self.message_threads.discard(thread_id)
                        await self.channel_layer.group_discard(
                            "m_%d" % thread_id,)
                    except ClientError:
                        pass
            except Exception as e:
                print("issue with threads")
                print(e)

    async def receive(self,text_data):
        print(text_data)
        text_data_json = json.loads(text_data)
        if 'token' in text_data_json:
            self.scope['token'] = text_data_json['token']
            await self.continue_connect()

        #self.send(text_data=json.dumps({
        #    'message': message
        #}))

    async def new_message(self,event):
        # Send a message down to the client
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],"thread": event['thread'],"message": event["message"],},))

    async def user_went_offline(self,event):
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))

    async def send_call_ring(self,event):
        print("SENDING CALL RING")
        print(event["message"])
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))

    async def rejoin_call(self,))

    async def popup_notification(self,event):
        print("sending popup_notification")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))


    async def new_call_participant(self,event):
        print("new_call_participant received")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))

    async def new_participants_invited(self,event):
        print("new_participants_invited received")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))

    async def share_document_via_videocall(self,event):    
        print("share_document received")
        print(event)
        print(self.channel_name)
        print(self.user['id'])
        
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))


    async def event_video_share_link(self,event):   

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))

    async def event_video_hand_up(self,event):    

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))

    async def event_video_address_hand_up(self,))


    async def you_are_dominant_speaker(self,event):

        # Send a message down to the client
        print("SENDING DOMINANT SPEAKER")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))


    async def you_are_no_longer_dominant_speaker(self,event):

        print("SENDING NO LONGER DOMINANT SPEAKER")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))


    async def event_video_screenshare(self,))


    async def event_video_reaction(self,))

    async def video_call_thread(self,event):

        print("sending video call thread")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))


    async def video_call_chat_message(self,event):

        print("sending video call chat message")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))

    async def event_chat_message(self,event):

        print("sending event chat message")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))

    async def to_next_agenda_item(self,))


    async def mute_all_event_participants(self,event):

        print("sending mute all participants")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))


    async def event_started(self,event):

        print("event started consumer")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))

    async def event_ended(self,event):

        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))
        
    async def video_call_reaction(self,event):

        print("sending video call reaction")
        print(event)

        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))


    async def new_user_online(self,event):

        print("user_online received")
        print(event)
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],"message": event["user"],"channel_layer": event["channel_layer"],"channel_name": event["channel_name"],))

    @database_sync_to_async
    def get_message_ids(self):
        return set(Thread.objects.filter(participants__id=self.user['id'],subject="").values_list('id',flat=True))

    async def monitor_thread(self,thread_id):
        print("monitoring thread %d" % thread_id)
        print("on channel %s" % self.channel_name)
        await self.channel_layer.group_add(
            "m_%d" % thread_id,)

    @database_sync_to_async
    def get_user_from_token(self,t):
        try:
            print("trying token" + t)
            token = Token.objects.get(key=t)
            self.user = token.user.get_profile.json()
        except Token.DoesNotExist:
            print("Failed")
            self.user = AnonymousUser()

    @database_sync_to_async
    def check_for_or_establish_dailyonlineactivity(self):
        doa,created = DailyOnlineUserActivity.objects.get_or_create(date=datetime.date.today())
        if created:
            print("created DOA %d" %doa.id)
        else:
            print("found existing DOA %d" %doa.id)
        return doa

    @database_sync_to_async
    def establish_onlinestatus(self):
        old_os = Onlinestatus.objects.filter(user_id=self.user['id'],online_to=None)
        if old_os.exists():
            for os in old_os:
                print("found unclosed OS %d" % old_os[0].id)
                os.online_to = timezone.Now()
                os.save()
        new_os = Onlinestatus(
            user_id=self.user['id'],channel_name=self.channel_name
        )
        new_os.save()
        return new_os

    @database_sync_to_async
    def add_to_online_status_list(self):
        self.doa.currently_active_users.add(self.user['id'])
        self.doa.all_daily_users.add(self.user['id'])
        self.doa.online_log.add(self.online_status)
        self.doa.save()

    @database_sync_to_async
    def remove_from_dailyonlineactivity(self):
        if hasattr(self,'doa') and self.doa is not None:
            self.doa.currently_active_users.remove(self.user['id'])
        if hasattr(self,'onine_status') and self.online_status is not None:
            self.online_status.online_to = timezone.Now()
            self.online_status.save()

    @database_sync_to_async
    def get_online_user_list(self):   
        user_id_list = list(self.doa.currently_active_users.all().values_list('id',flat=True))
        user_id_list.remove(self.user['id'])
        return user_id_list

    async def send_online_user_list(self):
        print("sending online_users")
        await self.send(text_data=json.dumps(
            {
                "type": "online_users","message": self.user_id_list,))

    async def participant_ignored(self,event):
        print("irgnored call")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))

    async def participant_left(self,event):
        print("left call")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))

    async def participant_joined(self,))

    async def video_screenshare(self,event):

        print("sending screenshare")
        await self.send(text_data=json.dumps(
            {
                "type": event['type'],))

通过向 VideoRoom 添加配置文件触发的 django 信号:

@receiver(m2m_changed,sender=VideoRoom.invitees.through)
def invitee_added(sender,**kwargs):
    instance = kwargs.pop('instance',None)
    action = kwargs.pop('action',None)
    pk = kwargs.pop('pk_set',None)

    if action == 'post_add':    

        if len(pk) > 0:
            user = Profile.objects.get(id=list(pk)[0])
            if instance.initiator.id == user.id:
                return

            identity = "u_%d" % user.id

            # Create access token with credentials
            token = Accesstoken(settings.TWILIO_ACCOUNT_SID,settings.TWILIO_API_KEY,settings.TWILIO_API_SECRET,identity=identity,ttl=86399)

            # Create a Video grant and add to token
            video_grant = VideoGrant(room=instance.room_name)
            token.add_grant(video_grant)

            invitee_access_token = VideoAccesstoken(user=user,token=token.to_jwt())
            invitee_access_token.save()

            instance.invitee_access_tokens.add(invitee_access_token)

            channel_layer = get_channel_layer()
            print(channel_layer)

            profiles = {"u_%d" % instance.initiator.id: instance.initiator.json()}

            for u in instance.current_participants.all():
                profiles["u_%d" % u.id] = u.json()
            print("instance.type")
            print(instance.type)
            if instance.type != 'event':
                print("sending to existing users")
                for key,value in profiles.items():
                    if value['id'] != user.id:
                        async_to_sync(channel_layer.group_send)(
                            key,{'type': 'new_call_participant','message': {
                                 'key': "u_%d" % user.id,'value': user.json()
                             }
                             }
                        )

                ons = Onlinestatus.objects.get(user=user,online_to=None)
                print("in signal,sending to %s on channel %s" % (user.full_name,ons.channel_name))

                async_to_sync(channel_layer.send)(
                    ons.channel_name,{'type': 'send_call_ring','message': {
                         'id': instance.id,'room_name': instance.room_name,'identity': "u_%d" % user.id,'profiles': profiles,'token': invitee_access_token.token.decode(),'answered': False,'initiated': False,'caller': instance.initiator.json()
                     }
                     }
                )

在不成功的套接字信号期间记录:

2021-03-11T15:16:14.489596+00:00 app[web.1]: pk
2021-03-11T15:16:14.489655+00:00 app[web.1]: {113}
2021-03-11T15:16:14.518051+00:00 app[web.1]: pk
2021-03-11T15:16:14.518058+00:00 app[web.1]: {68}
2021-03-11T15:16:14.786357+00:00 app[web.1]: sending to existing users
2021-03-11T15:16:14.786377+00:00 app[web.1]: u_113
2021-03-11T15:16:14.911441+00:00 app[web.1]: u_68
2021-03-11T15:16:14.915900+00:00 app[web.1]: in signal,sending to John Doe on channel u_68
2021-03-11T15:16:15.228644+00:00 app[web.1]: 10.63.249.212:12999 - - [11/Mar/2021:10:16:15] "POST /api/start-video-chat/" 200 3523
2021-03-11T15:16:15.231562+00:00 heroku[router]: at=info method=POST path="/api/start-video-chat/" host=project-name.herokuapp.com request_id=7ec75a21-c6bd-452b-9517-cd500064d7ee fwd="12.34.56.78" dyno=web.1 connect=3ms service=955ms status=200 bytes=3714 protocol=http

成功调用

2021-03-11T15:20:50.253243+00:00 app[web.4]: pk
2021-03-11T15:20:50.253248+00:00 app[web.4]: {113}
2021-03-11T15:20:50.280925+00:00 app[web.4]: pk
2021-03-11T15:20:50.280926+00:00 app[web.4]: {68}
2021-03-11T15:20:50.614504+00:00 app[web.4]: sending to existing users
2021-03-11T15:20:50.614527+00:00 app[web.4]: u_113
2021-03-11T15:20:50.713880+00:00 app[web.4]: u_68
2021-03-11T15:20:50.718141+00:00 app[web.4]: in signal,sending to John Doe on channel u_68
2021-03-11T15:20:50.799546+00:00 app[web.2]: CALLING
2021-03-11T15:20:50.801670+00:00 app[web.2]: {'type': 'send_call_ring','message': "some payload data"}
2021-03-11T15:20:50.965602+00:00 app[web.4]: 10.11.225.205:25635 - - [11/Mar/2021:10:20:50] "POST /api/start-video-chat/" 200 3533
2021-03-11T15:20:50.964378+00:00 heroku[router]: at=info method=POST path="/api/start-video-chat/" host=project-name.herokuapp.com request_id=2da9918b-b587-4db9-a3c2-9d6dfd55ef42 fwd="12.34.56.78" dyno=web.4 connect=1ms service=888ms status=200 bytes=3724 protocol=http

解决方法

问题最终出在 Redis 上。我从channels-redis 转换为channels-rabbitmq 并且我所有的问题都消失了。我不知道是我的 Redis 提供程序还是 channel-redis,但只需更改后端即可解决所有问题。