在 Spring 启动框架中使用 Apache Camel 的 ActiveMQ 生产者的性能设置

问题描述

我们有一个 spring boot 应用程序,我们使用 apache camel 作为消息处理的框架。我们正在努力优化我们的应用程序设置,以使 ActiveMQ 队列上的消息快速入队,队列另一端的 Logstash 作为消费者接收这些消息。

文档分散在很多地方,可用的配置太多。

例如,camel link for spring boot 指定 102 个选项。同样,activemq apache camel link 详细介绍了这些内容

这是我们当前配置的:

Application.properties:

################################################
# Spring Active MQ
################################################
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.packages.trust-all=true
spring.activemq.user=admin
spring.activemq.password=admin

Apache Camel

.to("activemq:queue:"dataQueue"?messageConverter=#queueMessageConverter");

问题:

1 - 我们怀疑我们必须使用 poolConnectionFactory 而不是认的 Spring JMS 模板 bean,它以某种方式自动拾取。

2 - 我们还希望进程是异步的。我们只想将消息放入队列中,不想等待来自 activemq 的任何 ACK 或执行任何重试或其他操作。

3 - 我们只想在队列已满时等待重试。

4 - 我们应该在哪里设置 ActiveMq 大小?并且activemq还在死信队列中放置东西,以防没有消费者可用?我们希望覆盖该行为并希望将消息保留在那里。 (这是必须在Activemq中配置而不是在我们的应用程序/apache骆驼中配置)

更新 这是我们经过更多调查并根据目前的反馈解决的。注意:这不涉及重试,为此我们将尝试答案中建议的选项。

对于 Seda 队列:

制作人:

.to("seda:somequeue?waitForTaskToComplete=Never");

消费者:

.from("seda:somequeue?concurrentConsumers=20");

活动 MQ:

.to("activemq:queue:dataQueue?disableReplyTo=true);

应用程序.属性

#Enable poolconnection factory
spring.activemq.pool.enabled=true
spring.activemq.pool.blockIfFull=true
spring.activemq.pool.max-connections=50

解决方法

  1. 是的,您需要使用 pooledConnectionFactory。特别是骆驼+弹簧靴。或者考虑使用camel-sjms 组件。罪魁祸首是 Spring 的 JMSTemplate。超高延迟。

  2. 发送 NON_PERSISTENT 和 AUTO_ACK,同时在连接工厂开启 sendAsync

  3. 您需要在路由中捕获 javax.jms.ResourceAllocationException 以在 Producer Flow Control 启动时(即队列或代理已满)进行重试

  4. ActiveMQ 根据字节而不是消息计数确定大小。请参阅 Producer Flow Control 文档中的 SystemUsage 设置和基于字节限制队列大小的 Per-Destination Policies 政策。