问题描述
每个正在运行的作业都会将其日志打印到 RabbitMQ 通道。 频道的名称是 JobUUID。
我想允许客户端打开一个 websocket,它会在特定作业的所有日志写入队列时发送这些日志(为了呈现类似 CLI 的窗口,类似于您在每个 CI 中看到的) /CD 工具 - 如 github 操作、Gitlab CI、Travis 等)。
我的问题是我想在客户端尝试获取特定作业的日志时进行检查,如果此作业没有开放通道,则向他发送未找到的错误
我发现了这个RabbitMQ: Check queue exists
但这当然不适合我的用例,因为我不想声明队列,而只是检查是否有任何以该 jobUUID 作为名称的队列。
到目前为止我的代码是这样的
@app.websocket("/{job_id}/logs")
async def get_job_logs(websocket: WebSocket,job_id: str):
await websocket.accept()
async def retrieve_message(message: IncomingMessage):
m = message.body.decode('utf8')
if m == "END_OF_PIPELINE":
print("WILL BREAK")
await websocket.send_text(f"END OF {job_id}")
if len(m) > 1 :
await websocket.send_text(json.dumps(m))
consumer = AsyncConsumer()
await consumer.connect(job_id,retrieve_message)
while True:
await websocket.receive_text()
如何将接收到的 JobID 作为参数并检查是否有任何具有此名称 openend 的队列。如果没有这样的队列,我又不想打开一个新的队列,因为 WS 路由的作用不是打开,而只是消费消息。
顺便说一句,AsyncConsumer
是我自己的一个基于 pika 的类......它有效并且经过了很好的测试......
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)