问题描述
我想删除计划传送到特定队列的消息,但我发现该过程不必要地繁琐。
这里我向一个延迟队列发送一条空白消息:
self._connection.send(body="test",destination=f"/queue/my-queue",headers={
"AMQ_SCHEDULED_DELAY": 100_000_000,"foo": "bar"
})
这里我想清除该队列的预定消息:
self._connection.send(destination=f"ActiveMQ.Scheduler.Management",headers={
"AMQ_SCHEDULER_ACTION": "REMOVEALL",},body="")
当然这里的“目的地”需要是 ActiveMQ.Scheduler.Management
而不是我的实际队列。但无论如何我都无法删除发往 queue/my-queue
的预定消息。我尝试使用 selector
标头,但这似乎不适用于 AMQ_SCHEDULER_ACTION
类型的消息。
我看到的唯一建议是编写一个消费者来浏览所有计划的消息,检查每个计划的目的地,并按其 ID 删除每个计划。这对我来说似乎很疯狂,因为我想要删除的不仅仅是几条消息,而是数百万条消息。
有没有办法向 ActiveMQ 发送命令以清除带有自定义标头值的预定消息?
编辑:
我已经为 stomp.py
连接编写了一个包装器来处理以队列为目的地的清除计划。 MQStompFacade
采用现有的 stomp.Connection
和您正在使用的队列的名称,并提供 enqueue
、enqueue_many
、receive
、purge
、和move
。
当从队列接收时,如果 include_delayed
为 True
,它将订阅队列和消耗调度的主题。假设消息已通过此类入队,并且将原始目标队列的名称作为自定义标头,则将过滤掉不以接收队列为目的地的预定消息。
尚未在生产中进行测试。这里可能有很多优化。
用法:
stomp = MQStompFacade(connection,"my-queue")
stomp.enqueue_many([
EnqueueRequest(message="hello"),EnqueueRequest(message="goodbye",delay=100_000)
])
stomp.purge() # <- removes queued and scheduled messages destined for "/queues/my-queue"
class MQStompFacade (ConnectionListener):
def __init__(self,connection: Connection,queue: str):
self._connection = connection
self._queue = queue
self._messages: List[Message] = []
self._connection_id = rand_string(6)
self._connection.set_listener(self._connection_id,self)
def __del__(self):
self._connection.remove_listener(self._connection_id)
def enqueue_many(self,requests: List[EnqueueRequest]):
txid = self._connection.begin()
for request in requests:
headers = request.headers or {}
# Used in scheduled message selectors
headers["queue"] = self._queue
if request.delay_millis:
headers['AMQ_SCHEDULED_DELAY'] = request.delay_millis
if request.priority is not None:
headers['priority'] = request.priority
self._connection.send(body=request.message,destination=f"/queue/{self._queue}",txid=txid,headers=headers)
self._connection.commit(txid)
def enqueue(self,request: EnqueueRequest):
self.enqueue_many([request])
def purge(self,selector: Optional[str] = None):
num_purged = 0
for _ in self.receive(idle_timeout=5,selector=selector):
num_purged += 1
return num_purged
def move(self,destination_queue: AbstractQueueFacade,selector: Optional[str] = None):
buffer_size = 500
move_buffer = []
for message in self.receive(idle_timeout=5,selector=selector):
move_buffer.append(EnqueueRequest(
message=message.body
))
if len(move_buffer) >= buffer_size:
destination_queue.enqueue_many(move_buffer)
move_buffer = []
if move_buffer:
destination_queue.enqueue_many(move_buffer)
def receive(self,max: Optional[int] = None,timeout: Optional[int] = None,idle_timeout: Optional[int] = None,selector: Optional[str] = None,peek: Optional[bool] = False,include_delayed: Optional[bool] = False):
"""
Receiving messages until one of following conditions are met
Args:
max: Receive messages until the [max] number of messages are received
timeout: Receive message until this timeout is reached
idle_timeout (seconds): Receive messages until the queue is idle for this amount of time
selector: JMS selector that can be applied to message headers. See https://activemq.apache.org/selector
peek: Set to TRUE to disable automatic ack on matched criteria. Peeked messages will remain the queue
include_delayed: Set to TRUE to return messages scheduled for delivery in the future
"""
self._connection.subscribe(f"/queue/{self._queue}",id=self._connection_id,ack="client",selector=selector
)
if include_delayed:
browse_topic = f"topic/scheduled_{self._queue}_{rand_string(6)}"
schedule_selector = f"queue = '{self._queue}'"
if selector:
schedule_selector = f"{schedule_selector} AND ({selector})"
self._connection.subscribe(browse_topic,ack="auto",selector=schedule_selector
)
self._connection.send(
destination=f"ActiveMQ.Scheduler.Management",headers={
"AMQ_SCHEDULER_ACTION": "broWSE","JMSReplyTo": browse_topic
},body=""
)
listen_start = time.time()
last_receive = time.time()
messages_received = 0
scanning = True
empty_receive = False
while scanning:
try:
message = self._messages.pop()
last_receive = time.time()
if not peek:
self._ack(message)
messages_received += 1
yield message
except IndexError:
empty_receive = True
time.sleep(0.1)
if max and messages_received >= max:
scanning = False
elif timeout and time.time() > listen_start + timeout:
scanning = False
elif empty_receive and idle_timeout and time.time() > last_receive + idle_timeout:
scanning = False
else:
scanning = True
self._connection.unsubscribe(id=self._connection_id)
def on_message(self,frame):
destination = frame.headers.get("original-destination",frame.headers.get("destination"))
schedule_id = frame.headers.get("scheduledJobId")
message = Message(
attributes=MessageAttributes(
id=frame.headers["message-id"],schedule_id=schedule_id,timestamp=frame.headers["timestamp"],queue=destination.replace("/queue/","")
),body=frame.body
)
self._messages.append(message)
def _ack(self,message: Message):
"""
Deletes the message from queue.
If the message has an scheduled_id,will also remove the associated scheduled job
"""
if message.attributes.schedule_id:
self._connection.send(
destination=f"ActiveMQ.Scheduler.Management",headers={
"AMQ_SCHEDULER_ACTION": "REMOVE","scheduledJobId": message.attributes.schedule_id
},body=""
)
self._connection.ack(message.attributes.id,subscription=self._connection_id)
解决方法
为了删除特定消息,您需要知道可以通过浏览预定消息获得的 ID。唯一可用的其他选项是使用删除操作中的开始和停止时间选项来删除范围内的所有消息。
MessageProducer producer = session.createProducer(management);
Message request = session.createMessage();
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME,Long.toString(start));
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME,Long.toString(end));
producer.send(request);
如果这不适合您的需要,我相信该项目会欢迎您的贡献。