阻止 Pika (RabbitMQ) 中 SelectConnection 的 ioloop 阻塞

问题描述

谁能帮助我理解如何为 RabbitMQ 创建一个非阻塞的连接类(即我可以创建并启动连接,然后开始运行代码的其他部分?)

修改pika 中优秀的 SelectConnection eamples 以启用回调来处理意外的连接关闭,并创建了一个 RmqConnect 类,该类成功连接并将在 RabbitMQ 服务器关闭时重新连接

class RmqConnection():
    def __init__(self):
        self._connection = None
        self._stopping = False
   def connect(self):
        """
        An endless loop to Keep reconnecting to RabbitMQ
        if the connection is closed
        """
        while not self._stopping:
            self._connection = None
            log.info('Connecting to %s',CON_ParaMS.host)
            self._connection = pika.SelectConnection(
                CON_ParaMS,on_open_callback=self.on_connection_open,on_open_error_callback=self.on_connection_open_error,on_close_callback=self.on_connection_closed)
            try:
                self._connection.ioloop.start()
            except KeyboardInterrupt:
                self.close()
                if (self._connection is not None and
                        not self._connection.is_closed):
                    # Finish closing
                    self._connection.ioloop.start()

然后作为测试,我尝试创建 RmqConnection

的实例
def main():
    connection = RmqConnection()
    connection.connect()
    print ("We never get here until the connect() method returns")

理想情况下,我尝试在 RmqConnection 类中创建一个连接对象,并在连接并启动 Select Connection 的 ioloop 后使其可用于程序的其他部分。

然而,self._connection.ioloop.start() 调用似乎阻塞了,并且我的 connect() 方法在连接打开时永远不会返回。只有在我关闭 KeyboardInteerupt 后才会返回。

关于如何更好地重构它并启用我所追求的功能的任何想法(即设置与 SelectConnection 回调的连接,启动 ioloop 然后继续我的程序?)

解决方法

好的,我(犹豫地)得到了一个现在似乎有效的修复。但是我担心它不是线程安全的。

基本上,我启动了在它自己的线程中阻塞的 SelectConnection.ioloop.start() 方法。然后我可以自由地返回连接对象并在其他地方使用它。

class RmqConnection():

    def __init__(self):
        self._connection = None
        self._stopping = False

    def connect(self):
        """
        Create a connection,start the ioloop to connect
        inside a thread and then return the connection
        """
        log.info('Connecting to %s',CON_PARAMS.host)
        self._connection = pika.SelectConnection(
            CON_PARAMS,on_open_callback=self.on_connection_open,on_open_error_callback=self.on_connection_open_error,on_close_callback=self.on_connection_closed)

        self.iothread = threading.Thread(
            target=self._connection.ioloop.start,args=())
        self.iothread.start()

        return self._connection

然后在 bpython 中,我似乎可以获取连接对象(它现在在线程中的单独 ioloop 中运行)并添加和删除队列并发布消息。

bpython version 0.20.1 on top of Python 3.6.9 /usr/bin/python3
>>> import connect_rmq
>>> test_conn=connect_rmq.RmqConnection()
>>> connection=test_conn.connect()
>>> mychannel = connection.channel()
>>> mychannel.close()
>>> mychannel = connection.channel()
>>> mychannel.basic_publish(exchange='rmq.telemetry',routing_key='',body='HELLO')
>>> 

如果有人看到这方面的任何问题,请大声喊出来!

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...