如何使用flask socketio从redis pubsub侦听器发出

问题描述

我正在尝试扩展由客户服务代理和客户之间的交互组成的聊天应用程序。

如果我们有多个后端实例在运行,代理或客户套接字可以连接(join_room())到任何可用的实例。

为了多个实例之间的通信,我们添加了redis pubsub。每当我们想向代理或客户发送消息时,我们都会将消息和 room_id 发布到 redis 通道,它会告诉所有实例发送消息。

下面是我们写的代码

app.py

from flask import Flask,jsonify,request
from flask_cors import CORS
from flask_socketio import SocketIO,emit,join_room,send
from multiprocessing import Process
import redis

redis_conn = redis.Redis(charset="utf-8",decode_responses=True)
pubsub = redis_conn.pubsub()


app = Flask(__name__)
app.config["SECRET_KEY"] = Config.SECRET_KEY
CORS(app)
# Initializing Socket

socketio = SocketIO(app,cors_allowed_origins="*",path="/livechat_backend_sock/socket.io/")



@socketio.on("agent_connect_request")
def agent_connect_request(data):
    print("\n*** agent_connect_request data: ",data)
    msg = {"input_type": "text","response_type": "live_connecting","response": "Searching for Agent","data": data}
    if "user_section" in data.keys():
        emit("user_reply",msg)

    # result = some db operations

    # create a room on agent_email_id as it will always be unique
    room_id = result["agent_email_id"]
    msg["agent_details"] = result["agent_data"]

    # some more db operations

    # tell a process to emit on room_id for all instances
    p = Process(target=subs,args=(room_id,))
    p.start()
    join_room(room_id)

    emit("user_reply",msg) # this happens properly
    subscribe_agent(room_id,user_id)

    # publish a message
    pub(data,user_id,room_id,"user_connections")
    # emit("user_connections",data,room=room_id) => this is what is expected after publish

def subscribe_agent(agent_email_id,channel):
    p = Process(target=subs,args=(channel,))
    p.start()


def subs(room):
    pubsub.subscribe(room)
    for message in pubsub.listen():
        if message.get("type") == "message":
            data = json.loads(message.get("data"))
            msg = data.get("message")
            user_id = data.get("from")
            room_id = data.get("to")
            sock = data.get("sock")
            res = subs_emitter(msg,sock)


def unsbs(room):
    pubsub.unsubscribe(room)


def pub(msg,room,sock):
    data = {
        "message": msg,"from": user_id,"to": room,"sock": sock
    }
    redis_conn.publish(user_id,json.dumps(data))


def subs_emitter(msg,sock):
    if sock == "user_connections":
        print('on user_connections')
        # code reaches here but emit is not happening
        socketio.emit("user_connections",msg,room=room_id,broadcast=True) 
    if sock == "livechat_agent":
        socketio.emit("livechat_agent",room=room_id)
    if sock == "livechat_user":
        socketio.emit("livechat_user",room=room_id)
    if sock == "agent_group":
        socketio.emit("livechat_user",room=room_id)
    if sock == "liveuser_disconnect_status":
        socketio.emit("liveuser_disconnect_status",room=room_id)
    if sock == "agent_break":
        socketio.emit("agent_break",room=room_id)
    return True

当我们收到发布的消息时,代码不会向前端发送消息。我哪里出错了??

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)