问题描述
- 基于python的网络服务器(发布)-(单进程和多线程)
- 计划作业(发布)(单进程和多线程运行不同的作业)
- 来自rabbitmq 队列的消费者(订阅rabbitmq 主题)(单进程和多线程以消费不同的消息)
我目前正在尝试将 rabbitmq 用于上述堆栈。因此,对于上述情况,我想为每个进程创建单个 rabbitmq 连接并使用多个通道来支持多线程任务。 Rabbitmq 文档说可以使用多个通道来支持多个线程。但是 pika 库似乎不支持这种情况。您可以参考我尝试过的以下示例
import pika
import threading
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
def test_thread(a: int):
channel = connection.channel()
channel.exchange_declare(exchange='normal_ex',exchange_type='topic')
channel.basic_publish(exchange='normal_ex',routing_key='test',body=str(a))
for i in range(0,10):
t = threading.Thread(target=test_thread,args=[i])
t.start()
time.sleep(10)
connection.close()
当我运行上面使用多个线程的多个通道的程序时,我收到以下错误
流连接丢失:AssertionError(('_AsyncTransportBase._produce() tx buffer size underflow',-21,1))
解决方法
来自https://pika.readthedocs.io/en/stable/faq.html#frequently-asked-questions
Pika 线程安全吗?
Pika 在代码中没有任何线程的概念。如果您想将 Pika 与线程一起使用,请确保每个线程都有一个 Pika 连接,并在该线程中创建。跨线程共享一个 Pika 连接是不安全的,只有一个例外:您可以从另一个线程调用连接方法 add_callback_threadsafe 以在活动的 pika 连接内安排回调。