使用kombu时gevent中的ConcurrentObjectUseError

问题描述

我在代码中使用了kombu,有时我从kombu的生产者的publish方法中得到以下异常。我认为它是在压力条件下重现的,所以可能是多线程问题,并且异常描述似乎也是如此。

使用python 2.7.18,kombu 4.6.11,amqp 2.6.1,gevent 20.6.2

我将不胜感激。谢谢!

我也看到了这些可能相关的页面,尽管我无法从中推断出我该怎么做:

例外:

Traceback (most recent call last):
File "C:\Code\A\home_common\rabbitmq_common.py",line 168,in send_data_message
self.producer.publish(data_message,routing_key=destination,exchange=self.dataDirectExchange,headers={'source': source},content_encoding='binary',content_type='application/octet-stream',retry=True)
File "C:\Python27\lib\site-packages\kombu\messaging.py",line 181,in publish
exchange_name,declare,File "C:\Python27\lib\site-packages\kombu\connection.py",line 533,in _ensured
return fun(*args,**kwargs)
File "C:\Python27\lib\site-packages\kombu\messaging.py",line 203,in _publish
mandatory=mandatory,immediate=immediate,File "C:\Python27\lib\site-packages\amqp\channel.py",line 1766,in _basic_publish
(0,exchange,routing_key,mandatory,immediate),msg
File "C:\Python27\lib\site-packages\amqp\abstract_channel.py",line 59,in send_method
conn.frame_writer(1,self.channel_id,sig,args,content)
File "C:\Python27\lib\site-packages\amqp\method_framing.py",line 189,in write_frame
write(view[:offset])
File "C:\Python27\lib\site-packages\amqp\transport.py",line 305,in write
self._write(s)
File "C:\Python27\lib\site-packages\gevent_socket2.py",line 383,in sendall
return _socketcommon._sendall(self,data_memory,flags)
File "C:\Python27\lib\site-packages\gevent_socketcommon.py",line 392,in _sendall
timeleft = __send_chunk(socket,chunk,flags,timeleft,end)
File "C:\Python27\lib\site-packages\gevent_socketcommon.py",line 321,in __send_chunk
data_sent += socket.send(chunk,flags)
File "C:\Python27\lib\site-packages\gevent_socket2.py",line 369,in send
self._wait(self._write_event)
File "src\\gevent
_hub_primitives.py",line 317,in gevent._gevent_c_hub_primitives.wait_on_socket
File "src\\gevent
_hub_primitives.py",line 322,line 297,in gevent._gevent_c_hub_primitives._primitive_wait
ConcurrentObjectUseError: This socket is already used by another greenlet: <bound method Waiter.switch of <gevent._gevent_c_waiter.Waiter object at 0x0598E810>>

解决方法

以供将来参考-似乎此问题已通过锁定所有可能与Rabbit服务器联系的kombu呼叫而得到解决,例如:

with self._kombu_lock:
    self.producer.publish(data_message,routing_key=destination,exchange=self.dataDirectExchange,headers={'source': source},content_encoding='binary',content_type='application/octet-stream',retry=True)