问题描述
我在这个问题上花了很长时间,但我无法弄清楚。也许我没有正确理解客户确认?
我有一个 activemq 使用者,我为它配置了一个客户端确认。当我在调试中检查确认模式时,我看到它正确设置为 2。
我想检查一下,如果我不调用 message.ackNowledge(),消息将重新发送或返回给代理.
然而,这并没有发生。即使我没有确认该消息,它仍然显示为已成功出队。
我做错了什么?
package test.config;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.activemq.jms.pool.PooledConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.core.jmstemplate;
@Configuration
public class ConnectionFactoryConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
@Value("${spring.activemq.user}")
private String username;
@Value("${spring.activemq.password}")
private String password;
@Bean
public PooledConnectionFactory pooledConnectionFactory() {
final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setConnectionFactory(createActiveMQConnectionFactory());
pooledConnectionFactory.setMaxConnections(2);
return pooledConnectionFactory;
}
@Bean
public jmstemplate jmstemplate() {
jmstemplate jmstemplate = new jmstemplate(pooledConnectionFactory());
jmstemplate.setSessionAckNowledgeMode(Session.CLIENT_ACKNowLEDGE);
return jmstemplate;
}
@Bean
public DefaultJmsListenerContainerFactory myJmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(pooledConnectionFactory());
factory.setSessionAckNowledgeMode(2);
return factory;
}
private ActiveMQConnectionFactory createActiveMQConnectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setbrokerURL(brokerUrl);
factory.setUserName(username);
factory.setPassword(password);
factory.setRedeliveryPolicy(createRedeliveryPolicy());
return factory;
}
private RedeliveryPolicy createRedeliveryPolicy() {
final RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(5);
redeliveryPolicy.setRedeliveryDelay(13000);
return redeliveryPolicy;
}
}
消费者
package poc.consumers;
import javax.jms.Connection;
import javax.jms.Session;
import org.apache.activemq.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@JmsListener(destination = "queue.test",containerFactory = "myJmsListenerContainerFactory")
public void consume(Message message) {
System.out.println("Consumer");
}
}
解决方法
仅当侦听器抛出异常(或者您关闭应用程序,或以其他方式关闭 Session
)时才会重新排队。
据经纪人所知,您的消费者仍在努力。
见Session.recover()
——这是容器中的代码,当监听器抛出异常时调用...
else if (isClientAcknowledge(session)) {
session.recover();
}
/** Stops message delivery in this session,and restarts message delivery
* with the oldest unacknowledged message.
*
* <P>All consumers deliver messages in a serial order.
* Acknowledging a received message automatically acknowledges all
* messages that have been delivered to the client.
*
* <P>Restarting a session causes it to take the following actions:
*
* <UL>
* <LI>Stop message delivery
* <LI>Mark all messages that might have been delivered but not
* acknowledged as "redelivered"
* <LI>Restart the delivery sequence including all unacknowledged
* messages that had been previously delivered. Redelivered messages
* do not have to be delivered in
* exactly their original delivery order.
* </UL>
*
* @exception JMSException if the JMS provider fails to stop and restart
* message delivery due to some internal error.
* @exception IllegalStateException if the method is called by a
* transacted session.
*/
void
recover() throws JMSException;
对于其他读者 - 如果您将 AUTO_ACKNOWLEDGE 与 DefaultMessageListenerContainer
一起使用,您必须使用事务来回滚确认。