ActiveMQ/STOMP 清除指向目的地的调度消息

问题描述

我想删除计划传送到特定队列的消息,但我发现该过程不必要地繁琐。

这里我向一个延迟队列发送一条空白消息:

self._connection.send(body="test",destination=f"/queue/my-queue",headers={
    "AMQ_SCHEDULED_DELAY": 100_000_000,"foo": "bar"
})

这里我想清除该队列的预定消息:

self._connection.send(destination=f"ActiveMQ.Scheduler.Management",headers={
    "AMQ_SCHEDULER_ACTION": "REMOVEALL",},body="")

当然这里的“目的地”需要是 ActiveMQ.Scheduler.Management 而不是我的实际队列。但无论如何我都无法删除发往 queue/my-queue 的预定消息。我尝试使用 selector 标头,但这似乎不适用于 AMQ_SCHEDULER_ACTION 类型的消息。

我看到的唯一建议是编写一个消费者来浏览所有计划的消息,检查每个计划的目的地,并按其 ID 删除每个计划。这对我来说似乎很疯狂,因为我想要删除的不仅仅是几条消息,而是数百万条消息。

有没有办法向 ActiveMQ 发送命令以清除带有自定义标头值的预定消息?

也许我可以为每个队列定义一个自定义的计划消息位置?

编辑:

我已经为 stomp.py 连接编写了一个包装器来处理以队列为目的地的清除计划。 MQStompFacade 采用现有的 stomp.Connection 和您正在使用的队列的名称,并提供 enqueueenqueue_manyreceivepurge、和move

当从队列接收时,如果 include_delayedTrue,它将订阅队列和消耗调度的主题。假设消息已通过此类入队,并且将原始目标队列的名称作为自定义标头,则将过滤掉不以接收队列为目的地的预定消息。

尚未在生产中进行测试。这里可能有很多优化。

用法

stomp = MQStompFacade(connection,"my-queue")

stomp.enqueue_many([
  EnqueueRequest(message="hello"),EnqueueRequest(message="goodbye",delay=100_000)
])

stomp.purge() # <- removes queued and scheduled messages destined for "/queues/my-queue"
class MQStompFacade (ConnectionListener):

    def __init__(self,connection: Connection,queue: str):
        self._connection = connection
        self._queue = queue
        self._messages: List[Message] = []
        self._connection_id = rand_string(6)
        self._connection.set_listener(self._connection_id,self)

    def __del__(self):
        self._connection.remove_listener(self._connection_id)

    def enqueue_many(self,requests: List[EnqueueRequest]):
        txid = self._connection.begin()
        for request in requests:
            headers = request.headers or {}

            # Used in scheduled message selectors
            headers["queue"] = self._queue

            if request.delay_millis:
                headers['AMQ_SCHEDULED_DELAY'] = request.delay_millis
            if request.priority is not None:
                headers['priority'] = request.priority

            self._connection.send(body=request.message,destination=f"/queue/{self._queue}",txid=txid,headers=headers)
        self._connection.commit(txid)

    def enqueue(self,request: EnqueueRequest):
        self.enqueue_many([request])

    def purge(self,selector: Optional[str] = None):
        num_purged = 0
        for _ in self.receive(idle_timeout=5,selector=selector):
            num_purged += 1
        return num_purged

    def move(self,destination_queue: AbstractQueueFacade,selector: Optional[str] = None):

        buffer_size = 500
        move_buffer = []

        for message in self.receive(idle_timeout=5,selector=selector):
            move_buffer.append(EnqueueRequest(
                message=message.body
            ))

            if len(move_buffer) >= buffer_size:
                destination_queue.enqueue_many(move_buffer)
                move_buffer = []

        if move_buffer:
            destination_queue.enqueue_many(move_buffer)

    def receive(self,max: Optional[int] = None,timeout: Optional[int] = None,idle_timeout: Optional[int] = None,selector: Optional[str] = None,peek: Optional[bool] = False,include_delayed: Optional[bool] = False):
        """
        Receiving messages until one of following conditions are met

        Args:
            max: Receive messages until the [max] number of messages are received
            timeout: Receive message until this timeout is reached
            idle_timeout (seconds): Receive messages until the queue is idle for this amount of time
            selector: JMS selector that can be applied to message headers. See https://activemq.apache.org/selector
            peek: Set to TRUE to disable automatic ack on matched criteria. Peeked messages will remain the queue
            include_delayed: Set to TRUE to return messages scheduled for delivery in the future
        """
        self._connection.subscribe(f"/queue/{self._queue}",id=self._connection_id,ack="client",selector=selector
                                   )
        if include_delayed:
            browse_topic = f"topic/scheduled_{self._queue}_{rand_string(6)}"
            schedule_selector = f"queue = '{self._queue}'"
            if selector:
                schedule_selector = f"{schedule_selector} AND ({selector})"

            self._connection.subscribe(browse_topic,ack="auto",selector=schedule_selector
                                       )

            self._connection.send(
                destination=f"ActiveMQ.Scheduler.Management",headers={
                    "AMQ_SCHEDULER_ACTION": "broWSE","JMSReplyTo": browse_topic
                },body=""
            )

        listen_start = time.time()
        last_receive = time.time()
        messages_received = 0
        scanning = True
        empty_receive = False
        while scanning:
            try:
                message = self._messages.pop()
                last_receive = time.time()
                if not peek:
                    self._ack(message)
                messages_received += 1
                yield message
            except IndexError:
                empty_receive = True
                time.sleep(0.1)

            if max and messages_received >= max:
                scanning = False
            elif timeout and time.time() > listen_start + timeout:
                scanning = False
            elif empty_receive and idle_timeout and time.time() > last_receive + idle_timeout:
                scanning = False
            else:
                scanning = True

        self._connection.unsubscribe(id=self._connection_id)

    def on_message(self,frame):
        destination = frame.headers.get("original-destination",frame.headers.get("destination"))
        schedule_id = frame.headers.get("scheduledJobId")

        message = Message(
            attributes=MessageAttributes(
                id=frame.headers["message-id"],schedule_id=schedule_id,timestamp=frame.headers["timestamp"],queue=destination.replace("/queue/","")
            ),body=frame.body
        )
        self._messages.append(message)

    def _ack(self,message: Message):
        """
        Deletes the message from queue.
        If the message has an scheduled_id,will also remove the associated scheduled job
        """
        if message.attributes.schedule_id:
            self._connection.send(
                destination=f"ActiveMQ.Scheduler.Management",headers={
                    "AMQ_SCHEDULER_ACTION": "REMOVE","scheduledJobId": message.attributes.schedule_id
                },body=""
            )
        self._connection.ack(message.attributes.id,subscription=self._connection_id)

解决方法

为了删除特定消息,您需要知道可以通过浏览预定消息获得的 ID。唯一可用的其他选项是使用删除操作中的开始和停止时间选项来删除范围内的所有消息。

MessageProducer producer = session.createProducer(management);
Message request = session.createMessage();
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION,ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVEALL);
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_START_TIME,Long.toString(start));
request.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION_END_TIME,Long.toString(end));
producer.send(request);

如果这不适合您的需要,我相信该项目会欢迎您的贡献。