ActiveMQ:如何限制发送的消息数量?

问题描述

假设我有一个 ActiveMQ broker 和未定义数量的消费者。

问题:

  • 要处理消息,消费者需要一个外部服务,即“DATA1”或“DATA2”(在消息中指定)
  • 每个服务器“DATA1”和“DATA2”只能处理 20 个连接
  • 因此,任何时候最多必须发送 20 个“DATA1”和 20 个“DATA2”消息
  • 由于优先级,消息必须排入同一个队列中
  • 即使消息 A 的优先级高于消息 B,如果由于外部服务没有空闲槽而无法处理 A,则需要处理消息 B

这个怎么解决?只要我使用消息拉取(预取 0),我就可以通过使用 brokerPlugin 来做到这一点,在 messagePull 上,它通过使用信号量和选择器来实现。如果达到限制,拉取返回 null

但是,由于性能问题,我不得不将预取设置为 1 并改用推送。因此,我的 messagePull hack 不再有效(它从未被调用)。

到目前为止,我正在考虑实现自定义 Cursor,但我想知道是否有人知道更好的解决方案。

更新自定义光标有效,但破坏了消息删除功能。我尝试使用自定义 Queue 和 QueuedispatchSelector(配置起来很麻烦,因为没有合适的 API 来执行此操作)并且它大部分都可以工作,但我仍然遇到同步问题。

此外,一个非常合适的 API 似乎是 dispatchPolicy,然而,虽然它被 Queue 引用,但从未使用过。

解决方法

队列为您免费缓冲系统处理时间。消息按需传递。使用 prefetch=0 或 prefetch=1,应该可以有效地帮助您实现目标。只有在消费者准备好时(即在 consumer.receive() 方法期间)才会将消息传递给消费者。

consumer.receive() 是一个阻塞调用,因此在消费者进程(及其所需的下游服务)准备好处理之前,您不需要任何自定义插件或其他插件来延迟交付。

该行为应该是开箱即用的,或者您的用例中有一些细节未提供以进一步阐明该场景。