FastAPI中音频流的Websockets桥接 websocket 的桥接 websocket更新Bridge websocket 同步生成器

问题描述

目标

我的目标是使用音频流。从逻辑上讲,这是我的目标:

  1. 音频流来自 WebSocket A(FastAPI 端点)
  2. 音频流被桥接到不同的 WebSocket B,它将返回一个 JSON(Rev-ai 的 WebSocket)
  3. Json 结果通过 WebSocket A 实时发回。因此,当音频流仍在传入时。

可能的解决方

为了解决这个问题,我有很多想法,但最终我一直在尝试将 WebSocket A 连接到 WebSocket B。到目前为止,我的尝试涉及一个 ConnectionManager 类,其中包含一个 Queue.queue。音频流的块被添加到这个队列中,这样我们就不会直接从 WebSocket A 消费。

ConnectionManager 还包含一个生成方法,用于从队列中生成所有值。

我的 FastAPI 实现像这样从 websocket A 消费:

@app.websocket("/ws")
async def predict_feature(websocket: WebSocket):
    await manager.connect(websocket)
    try:
        while True:
            chunk = await websocket.receive_bytes()
            manager.add_to_buffer(chunk)
    except KeyboardInterrupt:
        manager.disconnect()

在此摄取的同时,我希望有一项任务将我们的音频流桥接到 WebSocket B,并将获得的值发送到 WebSocket A。可以通过上述 generator 方法使用音频流。

由于WebSocket B如何消费消息,生成方法是必要的,如Rev-ai的examples所示:

streamclient = RevAiStreamingClient(access_token,config)
response_generator = streamclient.start(MEDIA_GENERATOR)
for response in response_generator:
    # return through websocket A this value
    print(response)

这是最大的挑战之一,因为我们需要将数据消耗到生成器中并实时获取结果。

最新尝试

我一直在用 asyncio 试试运气;根据我的理解,一种可能性是创建一个后台运行的协程。我一直没有成功,但听起来很有希望。

我曾考虑通过 FastAPI 启动事件触发此操作,但在实现并发性方面遇到问题。我尝试使用 event_loops,但它给了我一个 nested event loop 相关错误

警告

如果您的见解认为如此,FastAPI 可以是可选的,在某种程度上,WebSocket A 也是如此。在一天结束时,最终目标是通过我们自己的 API 端点接收音频流,并通过 Rev.ai 运行它WebSocket,做一些额外的处理,并将结果发回。

解决方法

websocket 的桥接 websocket

下面是一个简单的 webscoket 代理示例,其中 websocket A 和 websocket B 都是 FastAPI 应用程序中的端点,但是 websocket B 可以位于其他地方,只需更改它的地址 ws_b_uri。对于 websocket 客户端,使用 websockets 库。

为了进行数据转发,A端点的代码启动了两个任务forwardreverse,并通过asyncio.gather()等待它们完成。两个方向的数据传输以并行方式进行。

import asyncio

from fastapi import FastAPI
from fastapi import WebSocket
import websockets
app = FastAPI()

ws_b_uri = "ws://localhost:8001/ws_b"


async def forward(ws_a: WebSocket,ws_b: websockets.WebSocketClientProtocol):
    while True:
        data = await ws_a.receive_bytes()
        print("websocket A received:",data)
        await ws_b.send(data)


async def reverse(ws_a: WebSocket,ws_b: websockets.WebSocketClientProtocol):
    while True:
        data = await ws_b.recv()
        await ws_a.send_text(data)
        print("websocket A sent:",data)


@app.websocket("/ws_a")
async def websocket_a(ws_a: WebSocket):
    await ws_a.accept()
    async with websockets.connect(ws_b_uri) as ws_b_client:
        fwd_task = asyncio.create_task(forward(ws_a,ws_b_client))
        rev_task = asyncio.create_task(reverse(ws_a,ws_b_client))
        await asyncio.gather(fwd_task,rev_task)


@app.websocket("/ws_b")
async def websocket_b(ws_b_server: WebSocket):
    await ws_b_server.accept()
    while True:
        data = await ws_b_server.receive_bytes()
        print("websocket B server recieved: ",data)
        await ws_b_server.send_text('{"response": "value from B server"}')

更新(Bridge websocket 同步生成器)

考虑到问题的最后更新,问题是 WebSocket B 隐藏在同步生成器后面(实际上有两个,一个用于输入,另一个用于输出)实际上,任务变成了如何在 WebSocket 和同步生成器之间架起一座桥梁。由于我从未使用过 rev-ai 库,所以我为 stream_client_start 创建了一个存根函数 streamclient.start,它接受​​一个生成器(原版中的 MEDIA_GENERATOR)并返回一个生成器({{ 1}} 原文)。

在这种情况下,我通过 response_generator 在单独的线程中开始处理生成器,为了不重新发明轮子,我使用 janus 库中的队列进行通信,它允许您通过队列绑定同步和异步代码。因此,也有两个队列,一个用于 run_in_executor,另一个用于 A -> B

B -> A

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...