问题描述
过程:
应用A在启动时正在创建与“请求队列”的使用者连接 在远程Active MQ broker中。应用B将请求推送到“ REQUEST QUEUE”,并且App A会使用它。App A会产生响应 进入另一个队列“ RESPONSE QUEUE”。
应用A正在使用ActiveMQ连接工厂和Spring DMLC 消费邮件。
我从Spring DMLC了解到的是,它在队列中不断轮询 以获得消息。
问题:
应用程序A最初在应用程序重新启动时消耗请求。但是,第二天在发出另一个请求时,无法第二次使用该请求。甚至在第二个请求到达时,消费者也会拒绝。
到目前为止已尝试:
此问题仅存在于LIVE中,并且根据观察,消费者在一定时间段(30分钟)后无法处理/接收传入的消息。 我在其他环境中尝试使用相同的代理设置,但不幸的是无法重现。我尝试了所有其他链接,但都没有成功。
寻找指针:
我使用的方式是否存在根本性的错误 ActiveMQ连接用于消费还是生产?
可能需要指出用于调试问题的配置。
代码:
消息轮询器配置代码:
@Configuration
@EnableJms
@ComponentScan
@Slf4j
public class MessagePoller
{
@Value("${outbound.queue}")
private String outboundQueue;
private final brokerProperties brokerProperties;
@Autowired
public MessagePoller(brokerProperties brokerProperties) {this.brokerProperties = brokerProperties;}
@Bean(name = CONNECTION_FACTORY)
@Primary
public ActiveMQConnectionFactory connectionFactory()
{
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setbrokerURL(brokerProperties.getUrl());
connectionFactory.setUserName(brokerProperties.getUser());
connectionFactory.setPassword(brokerProperties.getpassword());
return connectionFactory;
}
@Bean
public JmsListenerContainerFactory jmsListenerContainerFactory()
{
DefaultJmsListenerContainerFactory factory =
new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrency("1-1");
factory.setErrorHandler(getErrorHandler());
factory.setSessionAckNowledgeMode(Session.AUTO_ACKNowLEDGE);
factory.setMessageConverter(new SimpleMessageConverter());
factory.setPubSubDomain(false);
return factory;
}
@Bean(name = OUTBOUND_JMS_TEMPLATE)
public jmstemplate outboudjmstemplate(
@Qualifier(CONNECTION_FACTORY)
ConnectionFactory connectionFactory)
{
jmstemplate template = new jmstemplate(connectionFactory);
template.setPubSubDomain(false);
template.setDefaultDestinationName(outboundQueue);
return template;
}
private ErrorHandler getErrorHandler()
{
return exception -> log.error("Exception thrown from consumer",exception);
}
}
消息侦听器代码:
@JmsListener(
destination = "requestQueue",containerFactory = "jmsListenerContainerFactory"
)
public void onMessage(Message<String> jmsMessage)
{
log.info("TriggerJobOnRequestService.onMessage() starts");
log.info("Incoming request message: {}",jmsMessage);
Integer deliveryCount = jmsMessage.getHeaders().get("JMSXDeliveryCount",Integer.class);
log.info("Payload : {}",jmsMessage.getPayload());
log.info("Headers : {}",jmsMessage.getHeaders());
log.info("Delivery Count : {}",deliveryCount);
//Processing Code logic
log.info("TriggerJobOnRequestService.onMessage() ends");
}
ActiveMQ连接网址:
spring.activemq.url=failover://(tcp://mqueue05.net:61616,tcp://mqueue06.net:61616,tcp://mqueue07.net:61616,tcp://mqueue08.net:61616)?randomize=false&timeout=100&initialReconnectDelay=1000&maxReconnectDelay=1000&maxReconnectAttempts=10
解决方法
暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!
如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。
小编邮箱:dio#foxmail.com (将#修改为@)