问题描述
我有一个代码,它只是用 pika 将消息排入代理队列。
class Publisher:
def __init__(self,config):
self._params = ConnectionParameters(
host = config.RABBITMQ_HOST,credentials = PlainCredentials(config.RABBITMQ_USER,config.RABBITMQ_PASSWORD))
self._conn = None
self._channel = None
self.exchange_name = config.RABBITMQ_AGENT_EXCHANGE
def connect(self):
if not self._conn or self._conn.is_closed:
self._conn = BlockingConnection(self._params)
self._channel = self._conn.channel()
self._channel.exchange_declare(exchange=self.exchange_name,exchange_type = 'topic')
def _publish(self,task):
properties = BasicProperties(expiration=task.expiration_ms)
self._channel.basic_publish(exchange= self.exchange_name,routing_key = task.routing_key,properties = properties if task.has_expiration else None,body=dumps(task,cls = TaskEncoder).encode())
logging.debug('message sent: %s',task)
def publish(self,msg):
"""Publish msg,reconnecting if necessary."""
try:
self._publish(msg)
except ConnectionClosed:
logging.error('reconnecting to queue')
self.connect()
self._publish(msg)
Pika 停止将消息排入队列以与下一个消息进行长时间运行的连接,并且不再抛出任何错误
2021-03-14 12:25:09,981 MainThread-140100212655936 pika.heartbeat [INFO] - Connection is idle,1 stale byte intervals
2021-03-14 12:25:09,981 MainThread-140100212655936 pika.adapters.utils.io_services_utils [INFO] - Aborting transport connection: state=1; <socket.socket fd=6,family=AddressFamily.AF_INET,type=SocketKind.SOCK_STREAM,proto=6,laddr=('192.168.184.108',41024),raddr=('10.100.176.158',5672)>
2021-03-14 12:25:09,981 MainThread-140100212655936 pika.adapters.utils.io_services_utils [INFO] - _AsyncTransportBase._initate_abort(): Initiating abrupt asynchronous transport shutdown: state=1; error=None; <socket.socket fd=6,5672)
代码使用
publisher = Publisher(config)
publisher.connect()
while True:
publisher.publish(obj)
time.sleep(1)
我有两个问题:
如何预防?在这种情况下,禁用心跳可以吗?
如何使用防火墙重现/模拟此行为?我尝试在 RMQ 端口添加丢包规则,但没有成功。
鼠兔版本:1.0.1
RMQ 版本:3.8.9
Python:3.8.6
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)