使用 pika 库检查 RabbitMQ 中是否存在队列

问题描述

我有一个工人从 Redis 队列中获取作业。

每个正在运行的作业都会将其日志打印到 RabbitMQ 通道。 频道的名称是 JobUUID。

我想允许客户端打开一个 websocket,它会在特定作业的所有日志写入队列时发送这些日志(为了呈现类似 CLI 的窗口,类似于您在每个 CI 中看到的) /CD 工具 - 如 github 操作、Gitlab CI、Travis 等)。

我的问题是我想在客户端尝试获取特定作业的日志时进行检查,如果此作业没有开放通道,则向他发送未找到的错误

我发现了这个RabbitMQ: Check queue exists

但这当然不适合我的用例,因为我不想声明队列,而只是检查是否有任何以该 jobUUID 作为名称的队列。

到目前为止我的代码是这样的


@app.websocket("/{job_id}/logs")
async def get_job_logs(websocket: WebSocket,job_id: str):
    await websocket.accept()
    async def retrieve_message(message: IncomingMessage):
        m = message.body.decode('utf8')
        if m == "END_OF_PIPELINE":
            print("WILL BREAK")
            await websocket.send_text(f"END OF {job_id}")
        if len(m) > 1 :
            await websocket.send_text(json.dumps(m))
    consumer = AsyncConsumer()
    await consumer.connect(job_id,retrieve_message)
    while True:
        await websocket.receive_text()

如何将接收到的 JobID 作为参数并检查是否有任何具有此名称 openend 的队列。如果没有这样的队列,我又不想打开一个新的队列,因为 WS 路由的作用不是打开,而只是消费消息。

顺便说一句,AsyncConsumer 是我自己的一个基于 pika 的类......它有效并且经过了很好的测试......

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)