问题描述
pika 在代码中没有任何线程的概念。如果您想将 pika 与线程一起使用,请确保每个线程都有一个 pika 连接,并在该线程中创建。跨线程共享一个 pika 连接是不安全的,只有一个例外:您可以从另一个线程调用连接方法 add_callback_threadsafe 以在活动的 pika 连接内安排回调。
假设我有一个订阅者,我已经开始使用 channel.start_consuming()
。该线程将被阻塞,等待消息到达。这些消息可能会相隔很长时间(有时是数小时)。
当然,如果我想安全/干净地关闭订阅者,我必须从另一个线程这样做吗?否则我怎样才能触发消费者打破阻塞?
解决方法
您可以使用 connection.process_data_events()
而不仅仅是 channel.start_consuming()
。这里的优点是您可以执行类似操作来关闭连接。
def consume_messages(self):
while self.running:
self.connection.process_data_events()
sleep(0.1)
self.connection.close()
然后您只需将 self.running
设置为 False
即可关闭连接。