问题描述
我们有一个 spring boot 应用程序,我们使用 apache camel 作为消息处理的框架。我们正在努力优化我们的应用程序设置,以使 ActiveMQ 队列上的消息快速入队,队列另一端的 Logstash 作为消费者接收这些消息。
文档分散在很多地方,可用的配置太多。
例如,camel link for spring boot 指定 102 个选项。同样,activemq apache camel link 详细介绍了这些内容。
这是我们当前配置的:
Application.properties:
################################################
# Spring Active MQ
################################################
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.packages.trust-all=true
spring.activemq.user=admin
spring.activemq.password=admin
Apache Camel
.to("activemq:queue:"dataQueue"?messageConverter=#queueMessageConverter");
问题:
1 - 我们怀疑我们必须使用 poolConnectionFactory 而不是默认的 Spring JMS 模板 bean,它以某种方式自动拾取。
2 - 我们还希望进程是异步的。我们只想将消息放入队列中,不想等待来自 activemq 的任何 ACK 或执行任何重试或其他操作。
3 - 我们只想在队列已满时等待重试。
4 - 我们应该在哪里设置 ActiveMq 大小?并且activemq还在死信队列中放置东西,以防没有消费者可用?我们希望覆盖该行为并希望将消息保留在那里。 (这是必须在Activemq中配置而不是在我们的应用程序/apache骆驼中配置)
更新 这是我们经过更多调查并根据目前的反馈解决的。注意:这不涉及重试,为此我们将尝试答案中建议的选项。
对于 Seda 队列:
制作人:
.to("seda:somequeue?waitForTaskToComplete=Never");
消费者:
.from("seda:somequeue?concurrentConsumers=20");
活动 MQ:
.to("activemq:queue:dataQueue?disableReplyTo=true);
应用程序.属性:
#Enable poolconnection factory
spring.activemq.pool.enabled=true
spring.activemq.pool.blockIfFull=true
spring.activemq.pool.max-connections=50
解决方法
-
是的,您需要使用 pooledConnectionFactory。特别是骆驼+弹簧靴。或者考虑使用camel-sjms 组件。罪魁祸首是 Spring 的 JMSTemplate。超高延迟。
-
发送 NON_PERSISTENT 和 AUTO_ACK,同时在连接工厂开启 sendAsync
-
您需要在路由中捕获 javax.jms.ResourceAllocationException 以在 Producer Flow Control 启动时(即队列或代理已满)进行重试
-
ActiveMQ 根据字节而不是消息计数确定大小。请参阅 Producer Flow Control 文档中的 SystemUsage 设置和基于字节限制队列大小的 Per-Destination Policies 政策。