如何正确处理与多个生产者的 AMQP 连接和 api

问题描述

我正在开发一个 api,它使用 RabbitMQ 主题与来自事件架构的其他服务进行通信。 我的 API 的几个路由将发布事件,我希望在我的 API 中始终有一个实时连接。这样,对于每个新请求,我只创建一个新通道,并且只保留一个连接(我在阅读了 amqp 0-9-2 连接的成本很高后决定这样做)。

现在我有这样的事情:

class Singleton:
    def __init__(self,target):
        self.target = target

    def __call__(self,*args,**kwargs) -> Any:
        try:
            return self._instance
        except AttributeError:
            self._instance = self.target(*args,**kwargs)
            return self._instance


@Singleton
class RabbitConnection(pika.BlockingConnection):
    def __init__(self):
        ssl_options = None

        if settings.RABBIT_SSL:
            context = ssl.create_default_context()
            ssl_options = pika.SSLOptions(context)

        credentials = pika.credentials.PlainCredentials(
            username=settings.RABBIT_USER,password=str(settings.RABBIT_PASSWORD),)
        parameters = pika.ConnectionParameters(
            host=settings.RABBIT_SERVER,port=settings.RABBIT_PORT,virtual_host="/",credentials=credentials,ssl_options=ssl_options,heartbeat=0
        )
        super().__init__(parameters=parameters)


class RabbitChannelProvider:
    _channel = None

    def __init__(self):
        self._connection = RabbitConnection()

    def __enter__(self) -> BlockingChannel:
        if not self._channel:
            self._channel = self._connection.channel()
            self._channel.exchange_declare(
                exchange=settings.RABBIT_EXCHANGE,exchange_type=ExchangeType.topic,passive=False,durable=True,auto_delete=False,)

        return self._channel

    def __exit__(self,exc_type,exc_value,tb) -> None:
        self._channel.close()
        self._channel = None


class MessagePublisher(SingletonCreateMixin,PublisherMessageBackend):
    id = "publisher_rabbitmq"

    def publish(self,routing_key: str,body: Any) -> None:
        try:
            message = build_message(body=body)
            logger.info(
                event="message_broker",event_type=LogEventType.SUCCESS,location=LogLocation.BACKEND,body=body,message="Sending message",)
            with RabbitChannelProvider() as channel:
                channel.basic_publish(
                    exchange=settings.RABBIT_EXCHANGE,routing_key=routing_key,body=message,properties=pika.BasicProperties(
                        content_type="application/json"
                    ),)
        except Exception as err:
            logger.error(
                event="message_broker",event_type=LogEventType.ERROR,error=err,)
            raise MessagebrokerException(message=err)

这是在api进程中只维护一个连接的正确方法吗?我这样做对吗?

解决方法

形成官方pika documentation

Pika 线程安全吗?

Pika 在代码中没有任何线程的概念。如果您想将 Pika 与线程一起使用,请确保每个线程都有一个 Pika 连接,并在该线程中创建。跨线程共享一个 Pika 连接是不安全的,只有一个例外:您可以从另一个线程调用连接方法 add_callback_threadsafe 以在活动的 pika 连接内安排回调。

因此您的解决方案可以使用单个线程

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...