ActiveMQ使用者无法使用请求队列上的消息

问题描述

过程:

应用A在启动时正在创建与“请求队列”的使用者连接 在远程Active MQ broker中。应用B将请求推送到“ REQUEST QUEUE”,并且App A会使用它。App A会产生响应 进入另一个队列“ RESPONSE QUEUE”。

应用A正在使用ActiveMQ连接工厂和Spring DMLC 消费邮件

我从Spring DMLC了解到的是,它在队列中不断轮询 以获得消息。

问题:

应用程序A最初在应用程序重新启动时消耗请求。但是,第二天在发出另一个请求时,无法第二次使用该请求。甚至在第二个请求到达时,消费者也会拒绝。

到目前为止已尝试:

此问题仅存在于LIVE中,并且根据观察,消费者在一定时间段(30分钟)后无法处理/接收传入的消息。 我在其他环境中尝试使用相同的代理设置,但不幸的是无法重现。我尝试了所有其他链接,但都没有成功。

寻找指针:

我使用的方式是否存在根本性的错误 ActiveMQ连接用于消费还是生产?

可能需要指出用于调试问题的配置。

代码

消息轮询器配置代码

@Configuration
@EnableJms
@ComponentScan
@Slf4j
public class MessagePoller
{
    @Value("${outbound.queue}")
    private String outboundQueue;

    private final brokerProperties brokerProperties;

    @Autowired
    public MessagePoller(brokerProperties brokerProperties) {this.brokerProperties = brokerProperties;}

    @Bean(name = CONNECTION_FACTORY)
    @Primary
    public ActiveMQConnectionFactory connectionFactory()
    {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setbrokerURL(brokerProperties.getUrl());
        connectionFactory.setUserName(brokerProperties.getUser());
        connectionFactory.setPassword(brokerProperties.getpassword());
        return connectionFactory;
    }

    @Bean
    public JmsListenerContainerFactory jmsListenerContainerFactory()
    {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrency("1-1");
        factory.setErrorHandler(getErrorHandler());
        factory.setSessionAckNowledgeMode(Session.AUTO_ACKNowLEDGE);
        factory.setMessageConverter(new SimpleMessageConverter());
        factory.setPubSubDomain(false);
        return factory;
    }

    @Bean(name = OUTBOUND_JMS_TEMPLATE)
    public jmstemplate outboudjmstemplate(
            @Qualifier(CONNECTION_FACTORY)
                    ConnectionFactory connectionFactory)
    {
        jmstemplate template = new jmstemplate(connectionFactory);
        template.setPubSubDomain(false);
        template.setDefaultDestinationName(outboundQueue);
        return template;
    }

    private ErrorHandler getErrorHandler()
    {
        return exception -> log.error("Exception thrown from consumer",exception);
    }
}

消息侦听器代码

@JmsListener(
        destination = "requestQueue",containerFactory = "jmsListenerContainerFactory"
)
public void onMessage(Message<String> jmsMessage)
{
    log.info("TriggerJobOnRequestService.onMessage() starts");
    log.info("Incoming request message: {}",jmsMessage);
    Integer deliveryCount = jmsMessage.getHeaders().get("JMSXDeliveryCount",Integer.class);

    log.info("Payload : {}",jmsMessage.getPayload());
    log.info("Headers : {}",jmsMessage.getHeaders());
    log.info("Delivery Count : {}",deliveryCount);
    //Processing Code logic
    log.info("TriggerJobOnRequestService.onMessage() ends");
}

ActiveMQ连接网址:

spring.activemq.url=failover://(tcp://mqueue05.net:61616,tcp://mqueue06.net:61616,tcp://mqueue07.net:61616,tcp://mqueue08.net:61616)?randomize=false&timeout=100&initialReconnectDelay=1000&maxReconnectDelay=1000&maxReconnectAttempts=10

解决方法

暂无找到可以解决该程序问题的有效方法,小编努力寻找整理中!

如果你已经找到好的解决方法,欢迎将解决方案带上本链接一起发送给小编。

小编邮箱:dio#foxmail.com (将#修改为@)