问题描述
我们用 React 构建了一个前端,用 Django Rest 框架和通道构建了一个后端。我们使用 Heroku Redis 作为我们的 Redis 提供程序。我们的用户通过 ReconnectingWebSocket
连接到频道。
我们使用的是 Python 3.6 和 Channels 2.4
问题是我们的 API 调用试图将信息传递给套接字,而它们并不总是将信息传递给消费者。我通过打印记录了调用的步骤,打印了 channel_name
它即将尝试将其发送到并确认它是连接时返回给用户的内容,但消费者中的打印没有被调用消息永远不会发送给用户。
如果我在用户连接到套接字的情况下将 dynos 的数量增加到或多或少的 1-1,那么它似乎解决了问题(或至少使它更可靠)。根据我的理解,1 dyno 应该能够处理许多套接字连接。我的消费者没有收到信号是否有原因?是否有理由增加 dyno 的数量来解决问题?
在连接时,我让用户加入一个名为“u_{their id}”的组,以允许将信号发送到以同一用户身份登录的多台计算机。我曾尝试通过他们的 channel_name
直接和通过该组发送消息,但当消息没有通过时,似乎也没有通过。 prints
验证 channel_names
是正确的,消费者仍然没有收到消息。似乎没有发生任何错误。它可能不起作用,然后我会刷新收件人,它会起作用,然后我会再次刷新收件人,它又回到不起作用。
套接字连接肯定是活动的 - 我在前端做了一个简单的函数来 ping 套接字,当我这样做时(即使消费者没有从 API 调用中获得信号),它也会做出响应。
我还注意到,如果我重新启动我的 dynos,当它们加载并重新连接套接字时,第一个用户会在短时间内通过 API 调用工作,然后他们开始不再通过。此外,如果我有一段时间不使用套接字,然后刷新它们似乎也会再次开始短暂工作。
web: daphne doctalk.asgi:application --port $PORT --bind 0.0.0.0
consumers.py
import json
from asgiref.sync import async_to_sync
from channels.db import database_sync_to_async
from channels.generic.websocket import AsyncWebsocketConsumer
from messages.models import Thread
from profile.models import Onlinestatus,DailyOnlineUserActivity
from rest_framework.authtoken.models import Token
from django.contrib.auth.models import AnonymousUser
from .exceptions import ClientError
import datetime
from django.utils import timezone
class HeaderConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
await self.send("request_for_token")
async def continue_connect(self):
print(self.channel_name)
print(self.channel_layer)
await self.send(json.dumps({'your_channel_name': self.channel_name}))
await self.get_user_from_token(self.scope['token'])
await self.channel_layer.group_send(
"online_users",{
"type": "new_user_online","user": self.user,"channel_layer": str(self.channel_layer),"channel_name": self.channel_name,}
)
await self.channel_layer.group_add(
"online_users",self.channel_name,)
print("adding to personal group u_%d" % self.user['id'])
await self.channel_layer.group_add(
"u_%d" % self.user['id'],)
self.message_threads = set()
self.message_threads = await self.get_message_ids()
for thread in self.message_threads:
await self.monitor_thread(thread)
self.doa = await self.check_for_or_establish_dailyonlineactivity()
self.online_status = await self.establish_onlinestatus()
await self.add_to_online_status_list()
self.user_id_list = await self.get_online_user_list()
await self.send_online_user_list()
async def disconnect(self,code):
# Leave all the rooms we are still in
if hasattr(self,'user'):
await self.remove_from_dailyonlineactivity()
try:
await self.channel_layer.group_discard(
"u_%d" % self.user['id'],)
except Exception as e:
print("issue with self channel")
print(e)
try:
await self.channel_layer.group_send(
"online_users",{
"type": "user_went_offline","message": self.user['id'],}
)
except Exception as e:
print("issue with online_users")
print(e)
await self.channel_layer.group_discard(
"online_users",)
try:
for thread_id in list(self.message_threads):
print("leaving " + str(thread_id))
try:
self.message_threads.discard(thread_id)
await self.channel_layer.group_discard(
"m_%d" % thread_id,)
except ClientError:
pass
except Exception as e:
print("issue with threads")
print(e)
async def receive(self,text_data):
print(text_data)
text_data_json = json.loads(text_data)
if 'token' in text_data_json:
self.scope['token'] = text_data_json['token']
await self.continue_connect()
#self.send(text_data=json.dumps({
# 'message': message
#}))
async def new_message(self,event):
# Send a message down to the client
await self.send(text_data=json.dumps(
{
"type": event['type'],"thread": event['thread'],"message": event["message"],},))
async def user_went_offline(self,event):
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def send_call_ring(self,event):
print("SENDING CALL RING")
print(event["message"])
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def rejoin_call(self,))
async def popup_notification(self,event):
print("sending popup_notification")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def new_call_participant(self,event):
print("new_call_participant received")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def new_participants_invited(self,event):
print("new_participants_invited received")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def share_document_via_videocall(self,event):
print("share_document received")
print(event)
print(self.channel_name)
print(self.user['id'])
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def event_video_share_link(self,event):
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def event_video_hand_up(self,event):
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def event_video_address_hand_up(self,))
async def you_are_dominant_speaker(self,event):
# Send a message down to the client
print("SENDING DOMINANT SPEAKER")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def you_are_no_longer_dominant_speaker(self,event):
print("SENDING NO LONGER DOMINANT SPEAKER")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def event_video_screenshare(self,))
async def event_video_reaction(self,))
async def video_call_thread(self,event):
print("sending video call thread")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def video_call_chat_message(self,event):
print("sending video call chat message")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def event_chat_message(self,event):
print("sending event chat message")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def to_next_agenda_item(self,))
async def mute_all_event_participants(self,event):
print("sending mute all participants")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def event_started(self,event):
print("event started consumer")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def event_ended(self,event):
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def video_call_reaction(self,event):
print("sending video call reaction")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def new_user_online(self,event):
print("user_online received")
print(event)
await self.send(text_data=json.dumps(
{
"type": event['type'],"message": event["user"],"channel_layer": event["channel_layer"],"channel_name": event["channel_name"],))
@database_sync_to_async
def get_message_ids(self):
return set(Thread.objects.filter(participants__id=self.user['id'],subject="").values_list('id',flat=True))
async def monitor_thread(self,thread_id):
print("monitoring thread %d" % thread_id)
print("on channel %s" % self.channel_name)
await self.channel_layer.group_add(
"m_%d" % thread_id,)
@database_sync_to_async
def get_user_from_token(self,t):
try:
print("trying token" + t)
token = Token.objects.get(key=t)
self.user = token.user.get_profile.json()
except Token.DoesNotExist:
print("Failed")
self.user = AnonymousUser()
@database_sync_to_async
def check_for_or_establish_dailyonlineactivity(self):
doa,created = DailyOnlineUserActivity.objects.get_or_create(date=datetime.date.today())
if created:
print("created DOA %d" %doa.id)
else:
print("found existing DOA %d" %doa.id)
return doa
@database_sync_to_async
def establish_onlinestatus(self):
old_os = Onlinestatus.objects.filter(user_id=self.user['id'],online_to=None)
if old_os.exists():
for os in old_os:
print("found unclosed OS %d" % old_os[0].id)
os.online_to = timezone.Now()
os.save()
new_os = Onlinestatus(
user_id=self.user['id'],channel_name=self.channel_name
)
new_os.save()
return new_os
@database_sync_to_async
def add_to_online_status_list(self):
self.doa.currently_active_users.add(self.user['id'])
self.doa.all_daily_users.add(self.user['id'])
self.doa.online_log.add(self.online_status)
self.doa.save()
@database_sync_to_async
def remove_from_dailyonlineactivity(self):
if hasattr(self,'doa') and self.doa is not None:
self.doa.currently_active_users.remove(self.user['id'])
if hasattr(self,'onine_status') and self.online_status is not None:
self.online_status.online_to = timezone.Now()
self.online_status.save()
@database_sync_to_async
def get_online_user_list(self):
user_id_list = list(self.doa.currently_active_users.all().values_list('id',flat=True))
user_id_list.remove(self.user['id'])
return user_id_list
async def send_online_user_list(self):
print("sending online_users")
await self.send(text_data=json.dumps(
{
"type": "online_users","message": self.user_id_list,))
async def participant_ignored(self,event):
print("irgnored call")
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def participant_left(self,event):
print("left call")
await self.send(text_data=json.dumps(
{
"type": event['type'],))
async def participant_joined(self,))
async def video_screenshare(self,event):
print("sending screenshare")
await self.send(text_data=json.dumps(
{
"type": event['type'],))
通过向 VideoRoom 添加配置文件触发的 django 信号:
@receiver(m2m_changed,sender=VideoRoom.invitees.through)
def invitee_added(sender,**kwargs):
instance = kwargs.pop('instance',None)
action = kwargs.pop('action',None)
pk = kwargs.pop('pk_set',None)
if action == 'post_add':
if len(pk) > 0:
user = Profile.objects.get(id=list(pk)[0])
if instance.initiator.id == user.id:
return
identity = "u_%d" % user.id
# Create access token with credentials
token = Accesstoken(settings.TWILIO_ACCOUNT_SID,settings.TWILIO_API_KEY,settings.TWILIO_API_SECRET,identity=identity,ttl=86399)
# Create a Video grant and add to token
video_grant = VideoGrant(room=instance.room_name)
token.add_grant(video_grant)
invitee_access_token = VideoAccesstoken(user=user,token=token.to_jwt())
invitee_access_token.save()
instance.invitee_access_tokens.add(invitee_access_token)
channel_layer = get_channel_layer()
print(channel_layer)
profiles = {"u_%d" % instance.initiator.id: instance.initiator.json()}
for u in instance.current_participants.all():
profiles["u_%d" % u.id] = u.json()
print("instance.type")
print(instance.type)
if instance.type != 'event':
print("sending to existing users")
for key,value in profiles.items():
if value['id'] != user.id:
async_to_sync(channel_layer.group_send)(
key,{'type': 'new_call_participant','message': {
'key': "u_%d" % user.id,'value': user.json()
}
}
)
ons = Onlinestatus.objects.get(user=user,online_to=None)
print("in signal,sending to %s on channel %s" % (user.full_name,ons.channel_name))
async_to_sync(channel_layer.send)(
ons.channel_name,{'type': 'send_call_ring','message': {
'id': instance.id,'room_name': instance.room_name,'identity': "u_%d" % user.id,'profiles': profiles,'token': invitee_access_token.token.decode(),'answered': False,'initiated': False,'caller': instance.initiator.json()
}
}
)
在不成功的套接字信号期间记录:
2021-03-11T15:16:14.489596+00:00 app[web.1]: pk
2021-03-11T15:16:14.489655+00:00 app[web.1]: {113}
2021-03-11T15:16:14.518051+00:00 app[web.1]: pk
2021-03-11T15:16:14.518058+00:00 app[web.1]: {68}
2021-03-11T15:16:14.786357+00:00 app[web.1]: sending to existing users
2021-03-11T15:16:14.786377+00:00 app[web.1]: u_113
2021-03-11T15:16:14.911441+00:00 app[web.1]: u_68
2021-03-11T15:16:14.915900+00:00 app[web.1]: in signal,sending to John Doe on channel u_68
2021-03-11T15:16:15.228644+00:00 app[web.1]: 10.63.249.212:12999 - - [11/Mar/2021:10:16:15] "POST /api/start-video-chat/" 200 3523
2021-03-11T15:16:15.231562+00:00 heroku[router]: at=info method=POST path="/api/start-video-chat/" host=project-name.herokuapp.com request_id=7ec75a21-c6bd-452b-9517-cd500064d7ee fwd="12.34.56.78" dyno=web.1 connect=3ms service=955ms status=200 bytes=3714 protocol=http
成功调用:
2021-03-11T15:20:50.253243+00:00 app[web.4]: pk
2021-03-11T15:20:50.253248+00:00 app[web.4]: {113}
2021-03-11T15:20:50.280925+00:00 app[web.4]: pk
2021-03-11T15:20:50.280926+00:00 app[web.4]: {68}
2021-03-11T15:20:50.614504+00:00 app[web.4]: sending to existing users
2021-03-11T15:20:50.614527+00:00 app[web.4]: u_113
2021-03-11T15:20:50.713880+00:00 app[web.4]: u_68
2021-03-11T15:20:50.718141+00:00 app[web.4]: in signal,sending to John Doe on channel u_68
2021-03-11T15:20:50.799546+00:00 app[web.2]: CALLING
2021-03-11T15:20:50.801670+00:00 app[web.2]: {'type': 'send_call_ring','message': "some payload data"}
2021-03-11T15:20:50.965602+00:00 app[web.4]: 10.11.225.205:25635 - - [11/Mar/2021:10:20:50] "POST /api/start-video-chat/" 200 3533
2021-03-11T15:20:50.964378+00:00 heroku[router]: at=info method=POST path="/api/start-video-chat/" host=project-name.herokuapp.com request_id=2da9918b-b587-4db9-a3c2-9d6dfd55ef42 fwd="12.34.56.78" dyno=web.4 connect=1ms service=888ms status=200 bytes=3724 protocol=http
解决方法
问题最终出在 Redis 上。我从channels-redis 转换为channels-rabbitmq 并且我所有的问题都消失了。我不知道是我的 Redis 提供程序还是 channel-redis,但只需更改后端即可解决所有问题。