问题描述
谁能帮助我理解如何为 RabbitMQ 创建一个非阻塞的连接类(即我可以创建并启动连接,然后开始运行代码的其他部分?)
我修改了 pika 中优秀的 SelectConnection eamples 以启用回调来处理意外的连接关闭,并创建了一个 RmqConnect
类,该类成功连接并将在 RabbitMQ 服务器关闭时重新连接
class RmqConnection():
def __init__(self):
self._connection = None
self._stopping = False
def connect(self):
"""
An endless loop to Keep reconnecting to RabbitMQ
if the connection is closed
"""
while not self._stopping:
self._connection = None
log.info('Connecting to %s',CON_ParaMS.host)
self._connection = pika.SelectConnection(
CON_ParaMS,on_open_callback=self.on_connection_open,on_open_error_callback=self.on_connection_open_error,on_close_callback=self.on_connection_closed)
try:
self._connection.ioloop.start()
except KeyboardInterrupt:
self.close()
if (self._connection is not None and
not self._connection.is_closed):
# Finish closing
self._connection.ioloop.start()
然后作为测试,我尝试创建 RmqConnection
def main():
connection = RmqConnection()
connection.connect()
print ("We never get here until the connect() method returns")
理想情况下,我尝试在 RmqConnection
类中创建一个连接对象,并在连接并启动 Select Connection 的 ioloop 后使其可用于程序的其他部分。
然而,self._connection.ioloop.start()
调用似乎阻塞了,并且我的 connect() 方法在连接打开时永远不会返回。只有在我关闭 KeyboardInteerupt 后才会返回。
关于如何更好地重构它并启用我所追求的功能的任何想法(即设置与 SelectConnection 回调的连接,启动 ioloop 然后继续我的程序?)
解决方法
好的,我(犹豫地)得到了一个现在似乎有效的修复。但是我担心它不是线程安全的。
基本上,我启动了在它自己的线程中阻塞的 SelectConnection.ioloop.start()
方法。然后我可以自由地返回连接对象并在其他地方使用它。
class RmqConnection():
def __init__(self):
self._connection = None
self._stopping = False
def connect(self):
"""
Create a connection,start the ioloop to connect
inside a thread and then return the connection
"""
log.info('Connecting to %s',CON_PARAMS.host)
self._connection = pika.SelectConnection(
CON_PARAMS,on_open_callback=self.on_connection_open,on_open_error_callback=self.on_connection_open_error,on_close_callback=self.on_connection_closed)
self.iothread = threading.Thread(
target=self._connection.ioloop.start,args=())
self.iothread.start()
return self._connection
然后在 bpython 中,我似乎可以获取连接对象(它现在在线程中的单独 ioloop 中运行)并添加和删除队列并发布消息。
bpython version 0.20.1 on top of Python 3.6.9 /usr/bin/python3
>>> import connect_rmq
>>> test_conn=connect_rmq.RmqConnection()
>>> connection=test_conn.connect()
>>> mychannel = connection.channel()
>>> mychannel.close()
>>> mychannel = connection.channel()
>>> mychannel.basic_publish(exchange='rmq.telemetry',routing_key='',body='HELLO')
>>>
如果有人看到这方面的任何问题,请大声喊出来!