客户端确认模式在 activemq 中不起作用

问题描述

在这个问题上花了很长时间,但我无法弄清楚。也许我没有正确理解客户确认?

我有一个 activemq 使用者,我为它配置了一个客户端确认。当我在调试中检查确认模式时,我看到它正确设置为 2。

我想检查一下,如果我调用 message.ackNowledge()消息将重新发送或返回给代理.

然而,这并没有发生。即使我没有确认该消息,它仍然显示为已成功出队。

我做错了什么?

package test.config;

import javax.jms.Session;    
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.jmstemplate;

@Configuration
public class ConnectionFactoryConfig {

    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String username;

    @Value("${spring.activemq.password}")
    private String password;


    @Bean
    public PooledConnectionFactory pooledConnectionFactory() {
        final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
        pooledConnectionFactory.setConnectionFactory(createActiveMQConnectionFactory());
        pooledConnectionFactory.setMaxConnections(2);
        return pooledConnectionFactory;
    }

    @Bean
    public jmstemplate jmstemplate() {
        jmstemplate jmstemplate = new jmstemplate(pooledConnectionFactory());
        jmstemplate.setSessionAckNowledgeMode(Session.CLIENT_ACKNowLEDGE);
        return jmstemplate;
    }

    @Bean
    public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(pooledConnectionFactory());
        factory.setSessionAckNowledgeMode(2);
        return factory;
    }


    private ActiveMQConnectionFactory createActiveMQConnectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setbrokerURL(brokerUrl);
        factory.setUserName(username);
        factory.setPassword(password);
        factory.setRedeliveryPolicy(createRedeliveryPolicy());
        return factory;
    }

    private RedeliveryPolicy createRedeliveryPolicy() {
        final RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(5);
        redeliveryPolicy.setRedeliveryDelay(13000);
        return redeliveryPolicy;
    }
}

消费者

package poc.consumers;

import javax.jms.Connection;
import javax.jms.Session;
import org.apache.activemq.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

    @JmsListener(destination = "queue.test",containerFactory = "myJmsListenerContainerFactory")
    public void consume(Message message) {
        System.out.println("Consumer");
    }
}

enter image description here

解决方法

仅当侦听器抛出异常(或者您关闭应用程序,或以其他方式关闭 Session)时才会重新排队。

据经纪人所知,您的消费者仍在努力。

Session.recover()——这是容器中的代码,当监听器抛出异常时调用...

            else if (isClientAcknowledge(session)) {
                session.recover();
            }
/** Stops message delivery in this session,and restarts message delivery
  * with the oldest unacknowledged message.
  *  
  * <P>All consumers deliver messages in a serial order.
  * Acknowledging a received message automatically acknowledges all 
  * messages that have been delivered to the client.
  *
  * <P>Restarting a session causes it to take the following actions:
  *
  * <UL>
  *   <LI>Stop message delivery
  *   <LI>Mark all messages that might have been delivered but not 
  *       acknowledged as "redelivered"
  *   <LI>Restart the delivery sequence including all unacknowledged 
  *       messages that had been previously delivered. Redelivered messages
  *       do not have to be delivered in 
  *       exactly their original delivery order.
  * </UL>
  *
  * @exception JMSException if the JMS provider fails to stop and restart
  *                         message delivery due to some internal error.
  * @exception IllegalStateException if the method is called by a 
  *                         transacted session.
  */ 

void
recover() throws JMSException;

对于其他读者 - 如果您将 AUTO_ACKNOWLEDGE 与 DefaultMessageListenerContainer 一起使用,您必须使用事务来回滚确认。