Rabbitmq pika 上的多进程和多线程架构

问题描述

我有以下场景。我有一个

  1. 基于python的网络服务器(发布)-(单进程和多线程)
  2. 计划作业(发布)(单进程和多线程运行不同的作业)
  3. 来自rabbitmq 队列的消费者(订阅rabbitmq 主题)(单进程和多线程以消费不同的消息)

我目前正在尝试将 rabbitmq 用于上述堆栈。因此,对于上述情况,我想为每个进程创建单个 rabbitmq 连接并使用多个通道来支持多线程任务。 Rabbitmq 文档说可以使用多个通道来支持多个线程。但是 pika 库似乎不支持这种情况。您可以参考我尝试过的以下示例

import pika
import threading
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))


def test_thread(a: int):
    channel = connection.channel()
    channel.exchange_declare(exchange='normal_ex',exchange_type='topic')
    channel.basic_publish(exchange='normal_ex',routing_key='test',body=str(a))


for i in range(0,10):
    t = threading.Thread(target=test_thread,args=[i])
    t.start()


time.sleep(10)
connection.close()

当我运行上面使用多个线程的多个通道的程序时,我收到以下错误

流连接丢失:AssertionError(('_AsyncTransportBase._produce() tx buffer size underflow',-21,1))

解决方法

来自https://pika.readthedocs.io/en/stable/faq.html#frequently-asked-questions

Pika 线程安全吗?

Pika 在代码中没有任何线程的概念。如果您想将 Pika 与线程一起使用,请确保每个线程都有一个 Pika 连接,并在该线程中创建。跨线程共享一个 Pika 连接是不安全的,只有一个例外:您可以从另一个线程调用连接方法 add_callback_threadsafe 以在活动的 pika 连接内安排回调。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...