ActiveMQ:为什么我的消息卡在StoreQueueCursor中?

问题描述

我在使用ActiveMQ时遇到问题,我发现很难查明甚至重现,也很难问一个具体问题。请忍受我。

我基本上有:

producer (prioritized messages) -> queue -> consumer

通常,队列中有10万条消息,每当具有更高优先级的消息到达时,它们都会首先被消耗。

这很好,直到星星对齐并且写入队列的高优先级消息不被消耗为止。至少要等到我打电话给Queue.removeMatchingMessages(String selector)才能从队列中删除消息之前,消息的数量数量无关紧要。

幸运的是,我发现了一个很好的指示。

在我们的用户界面中可以看到,我提交了444条消息(3条),其优先级高于其余消息(1条),但它们并没有被消耗:

enter image description here

用调试器检查队列,我发现StoreQueueCursor.pendingCount是444:

enter image description here

如果我再提交72条消息,则未决计数为516(444 + 72):

enter image description here

然后我使用Queue.removeMatchingMessages(String selector)删除72条消息时,StoreQueueCursor.pendingCount变为0:

enter image description here

我的444条消息突然被消耗掉了:

enter image description here

所以我现在能问的最好的问题是:

StoreQueueCursor的用途是什么,它如何导致我的邮件被消耗?或者更确切地说:为什么这些消息不写到队列中并准备好被使用?

非常感谢您的帮助。

我正在使用org.apache.activemq:activemq-broker:5.15.12(通过Spring Boot 2.3.1.RELEASE)。

更新

有趣的是,在“快乐的情况”下,我所有的高优先级邮件都得到了应有的处理,pendingCount远高于0:

enter image description here

更新#2

在ActiveMQ的How can I support priority queues?中,它说:

由于消息游标(和客户端)实现了严格的优先级排序,因此,如果可以从缓存中进行消息分发而不必访问磁盘,则可以观察到严格的优先级排序(即,您的使用者足够快地保持取决于生产者),或者如果您使用的是非永久性消息而不必刷新到磁盘(使用FilePendingMessageCursor)。但是,一旦遇到使用者速度慢或生产者速度显着加快的情况,您会发现缓存将被填满(可能具有较低优先级的消息),而较高优先级的消息将滞留在磁盘上,直到它们被使用时才可用。重新分页在这种情况下,您可以决定权衡优化的消息分发以执行优先级。您可以禁用缓存,消息过期检查,并将消费者预取降低到1,以确保从商店中获取优先级较高的消息优先于优先级较低的消息

因此,我尝试像这样禁用缓存(顺便说一句,我已经设置了jms.prefetchPolicy.all=0):

PolicyEntry policyEntry = new PolicyEntry();
policyEntry.setQueue(JmsQueueNames.TASK_QUEUE_PREFIX + ".*");
policyEntry.setDeadLetterStrategy(taskDeadLetterStrategy);
policyEntry.setPrioritizedMessages(true);
policyEntry.setUseCache(false);
policyEntry.setExpireMessagesPeriod(0);

现在,useCache为假,但cacheEnabled为真:

enter image description here

但是可以观察到相同的行为。

此外,我总是关闭broker-Persistence,所以我不确定上述情况是否适用:

@Bean
public brokerService broker(ActiveMQProperties properties,dispatcherProperties dispatcherProperties) throws Exception {
  brokerService brokerService = new brokerService();
  brokerService.setPersistent(false);
  brokerService.getProducerSystemUsage().getMemoryUsage().setLimit(dispatcherProperties.getActiveMq().getMemoryLimit());
  brokerService.addConnector(properties.getbrokerUrl());
  brokerService.setPlugins(getPluginsToLoad());
  brokerService.setDestinationPolicy(policyMap());
  return brokerService;
}

来自JMX的信息:

enter image description here

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)