问题描述
我想在鼠兔 basic_consume 中运行一个 on 消息回调异步。这是可能的吗?我们已经在为其他任务运行 asyncio 循环,这个消费者使用带有异步连接的 httpx 来调用内部服务。
这是我们当前的 Consumer 类:
class Consumer:
"""
https://www.devmashup.com/creating-a-rabbitmq-consumer-in-python/
"""
connection: AsyncioConnection
channel: Any
routing_key: str
def __init__(self,routing_key) -> None:
self.connection = self.__create_connection()
self.channel = self.connection.channel()
self.__create_exchange()
self.routing_key = routing_key
@staticmethod
def __create_connection():
credentials = PlainCredentials(
settings.mqtt_vhost_user,settings.mqtt_vhost_password
)
parameters = ConnectionParameters(
settings.mqtt_host,settings.mqtt_port,settings.mqtt_vhost,credentials
)
return AsyncioConnection(parameters)
def close_connection(self):
self.connection.close()
def __create_exchange(self):
self.channel.exchange_declare(
exchange=settings.mqtt_exchange,exchange_type=settings.mqtt_exchange_type,passive=False,durable=True,auto_delete=False,internal=False,)
def consume(self,message_received_callback):
logger.info(f"Started consumer for {self.routing_key}")
self.channel.queue_declare(
queue=self.routing_key,exclusive=False,)
self.channel.queue_bind(
queue=self.routing_key,exchange=settings.mqtt_exchange,routing_key=self.routing_key,)
async def consume_message(channel,method,properties,body):
await message_received_callback(body)
channel.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(
self.routing_key,consume_message,)
self.channel.start_consuming()
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)