RabbitMQ 交换在一段时间后变得无响应

问题描述

我有在 Docker 中运行的 RabbitMQ 服务器和两个连接到服务器并使用标头交换相互发送消息的 python 客户端。消息速率约为 10/s。一段时间后(大部分时间是在交换了 300-500 条消息之后),其中一个交换变得无响应。 channel.basic_publish 调用无一例外地通过,但接收者没有收到任何消息。同样在 rabbitmq 仪表板上,此交易所没有任何活动。 rabbitmq dashboard screenshot

代码示例如下:

    import pika
    import threading
    import time
    import sys


    class Test:
        def __init__(
                self,p_username,p_password,p_host,p_port,p_virtualHost,p_outgoingExchange,p_incomingExchange
        ):
            self.__outgoingExch = p_outgoingExchange
            self.__incomingExch = p_incomingExchange
            self.__headers = {'topic': 'test'}
            self.__queueName = ''
            self.__channelConsumer = None
            self.__channelProducer = None
            self.__isRun = False

            l_credentials = pika.PlainCredentials(p_username,p_password)
            l_parameters = pika.ConnectionParameters(
                host=p_host,port=p_port,virtual_host=p_virtualHost,credentials=l_credentials,socket_timeout=30,connection_attempts=5,)

            self.__connection = pika.SelectConnection(
                parameters=l_parameters,on_open_callback=self.__on_connection_open,on_open_error_callback=self.__on_connection_open_error,on_close_callback=self.__on_connection_closed
            )

        def __on_connection_open(self,_conn):
            print("Connection opened")
            self.__connection.channel(on_open_callback=self.__on_consume_channel_open)
            self.__connection.channel(on_open_callback=self.__on_produce_channel_open)

        def __on_connection_open_error(self,_conn,_exception):
            print("Failed to open connection")

        def __on_connection_closed(self,p_exception):
            print("Connection closed: {}".format(p_exception))

        def __on_consume_channel_open(self,p_ch):
            print("Consumer channel opened")
            self.__channelConsumer = p_ch
            self.__channelConsumer.exchange_declare(
                exchange=self.__incomingExch,exchange_type="headers",callback=self.__on_consume_exchange_declared
            )

        def __on_consume_exchange_declared(self,p_method):
            print("Consumer exchange declared")
            self.__channelConsumer.queue_declare(
                queue='',callback=self.__on_queue_declare
            )

        def __on_queue_declare(self,p_method):
            print("Consumer queue declared")
            self.__queueName = p_method.method.queue
            self.__channelConsumer.queue_bind(
                queue=self.__queueName,exchange=self.__incomingExch,arguments=self.__headers,)
            self.__channelConsumer.basic_consume(self.__queueName,self.__onMessageReceived)

        def __on_produce_channel_open(self,p_ch):
            print("Producer channel opened")
            self.__channelProducer = p_ch
            self.__channelProducer.exchange_declare(
                exchange=self.__outgoingExch,callback=self.__on_produce_exchange_declared
            )

        def __on_produce_exchange_declared(self,p_method):
            print("Producer exchange declared")
            l_publisher = threading.Thread(target=self.__publishProcedure)
            l_publisher.start()

        def __onMessageReceived(self,p_channel,p_method,p_properties,p_body):
            p_channel.basic_ack(p_method.delivery_tag)
            print("Message received: {}".format(p_body))

        def __publishProcedure(self):
            print("Start publishing")
            l_msgCounter = 0
            while self.__isRun:
                l_msgCounter += 1
                self.__publish(l_msgCounter)
                time.sleep(0.1)

        def __publish(self,p_msgCounter):
            self.__channelProducer.basic_publish(
                exchange=self.__outgoingExch,routing_key="#",body=str(p_msgCounter),properties=pika.BasicProperties(headers=self.__headers)
            )

        def run(self):
            self.__isRun = True
            try:
                self.__connection.ioloop.start()
            except KeyboardInterrupt:
                self.__isRun = False
                self.__connection.close()
                print("Exit...")

    if __name__ == '__main__':
        if len(sys.argv) < 2:
            print("Provide node name [node1 | node2]")
            exit(-1)

        l_outgoingExch = ''
        l_incomingExch = ''
        if sys.argv[1] == 'node1':
            l_outgoingExch = 'node2.headers'
            l_incomingExch = 'node1.headers'
        elif sys.argv[1] == 'node2':
            l_outgoingExch = 'node1.headers'
            l_incomingExch = 'node2.headers'
        else:
            print("Wrong node name")
            exit(-1)
        l_testInstance = Test(
            p_username='admin',p_password='admin',p_host='localhost',p_port=5672,p_virtualHost='/',p_incomingExchange=l_incomingExch,p_outgoingExchange=l_outgoingExch
        )
        l_testInstance.run()

我将两个实例作为两个节点(node1 和 node2)运行,因此它们应该相互通信。

有时我也会遇到这里描述的问题: Stream connection lost: AssertionError(('_AsyncTransportBase._produce() tx buffer size underflow',-275,1),)

解决方法

我发现我误用了鼠兔。正如 pika documentation 所述,跨多个线程共享连接是不安全的。您可以与来自其他线程的连接进行交互的唯一方法是使用 add_callback_threadsafe 函数。在我的例子中,它应该是这样的:

   def __publishProcedure(self):
        print("Start publishing")
        l_msgCounter = 0
        while self.__isRun:
            l_msgCounter += 1
            l_cb = functools.partial(self.__publish,l_msgCounter)
            self.__connection.ioloop.add_callback_threadsafe(l_cb)
            time.sleep(0.1)

    def __publish(self,p_msgCounter):
        self.__channelProducer.basic_publish(
            exchange=self.__outgoingExch,routing_key="#",body=str(p_msgCounter),properties=pika.BasicProperties(headers=self.__headers)
        )

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...