如何在 pika 中运行异步 basic_consume

问题描述

我想在鼠兔 basic_consume 中运行一个 on 消息回调异步。这是可能的吗?我们已经在为其他任务运行 asyncio 循环,这个消费者使用带有异步连接的 httpx 来调用内部服务。

这是我们当前的 Consumer 类:

class Consumer:
    """
    https://www.devmashup.com/creating-a-rabbitmq-consumer-in-python/
    """

    connection: AsyncioConnection
    channel: Any
    routing_key: str

    def __init__(self,routing_key) -> None:
        self.connection = self.__create_connection()
        self.channel = self.connection.channel()
        self.__create_exchange()
        self.routing_key = routing_key

    @staticmethod
    def __create_connection():
        credentials = PlainCredentials(
            settings.mqtt_vhost_user,settings.mqtt_vhost_password
        )

        parameters = ConnectionParameters(
            settings.mqtt_host,settings.mqtt_port,settings.mqtt_vhost,credentials
        )

        return AsyncioConnection(parameters)

    def close_connection(self):
        self.connection.close()

    def __create_exchange(self):
        self.channel.exchange_declare(
            exchange=settings.mqtt_exchange,exchange_type=settings.mqtt_exchange_type,passive=False,durable=True,auto_delete=False,internal=False,)

    def consume(self,message_received_callback):
        logger.info(f"Started consumer for {self.routing_key}")

        self.channel.queue_declare(
            queue=self.routing_key,exclusive=False,)

        self.channel.queue_bind(
            queue=self.routing_key,exchange=settings.mqtt_exchange,routing_key=self.routing_key,)

        async def consume_message(channel,method,properties,body):
            await message_received_callback(body)
            channel.basic_ack(delivery_tag=method.delivery_tag)

        self.channel.basic_consume(
            self.routing_key,consume_message,)

        self.channel.start_consuming()

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...