当我停止我的程序时,如何防止我的 Java 应用程序丢弃未确认的 RabbitMQ 消息?

问题描述

我写了一个使用RabbitMQ的java项目。我正在调试模式下启动我的 Spring 项目,并在 System.out.println(input);显示 ListenerExample.java 的行上设置一个断点。在应用程序的另一部分中,我使用以下代码行向此队列发送消息:

rabbitTemplate.convertAndSend(ManagerExample.getTopicExchangeName(),ManagerExample.getRoutingKey(),"test");

它在我的 IntelliJ 窗口中遇到断点,使用 RabbitMQ 管理器我可以看到 queue1 有一条未确认的消息,没有其他消息。如果我停止程序,那么 queue1 不包含任何消息并且我会丢失消息(它不会在队列中进入就绪状态)。如果我引入死信队列或更改 AckNowledgementMode 那么我仍然会丢失消息。我如何才能以某种方式在另一个队列或 queue1 中保留此消息?

ListenerExample.java:

package rabbitExample;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;

import java.io.IOException;

@Component
public class ListenerExample {
  @RabbitListener(queues = ManagerExample.queueName,containerFactory = "prefetchOneRabbitListenerContainerFactory")
  public static void listen(final String input,final Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) final long tag) throws IOException {
    System.out.println(input);
    channel.basicAck(tag,false);
  }
}

ManagerExample.java:

package rabbitExample;

import lombok.Getter;
import org.springframework.amqp.core.AckNowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class ManagerExample {
  @Getter
  static final String topicExchangeName = "exchange";
  @Getter
  static final String queueName = "queue1";
  @Getter
  static final String deadLetterQueueName = "dropped";
  @Getter
  static final String routingKey = "key";

  @Bean
  Queue queue() {
    return QueueBuilder.durable(queueName)
      .build();
  }

  @Bean
  static TopicExchange exchange() {
    return new TopicExchange(topicExchangeName);
  }

  @Bean
  Binding binding(final Queue queue,final TopicExchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(routingKey);
  }

  @Bean
  public RabbitListenerContainerFactory<SimpleMessageListenerContainer> prefetchOneRabbitListenerContainerFactory(final ConnectionFactory rabbitConnectionFactory) {
    final SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory);
    factory.setPrefetchCount(1);
    factory.setAckNowledgeMode(AckNowledgeMode.MANUAL);
    return factory;
  }
}

解决方法

我正在尝试运行您的示例。一切正常。一条未确认的消息毫无问题地返回到队列中。 您可以尝试对您的“聆听”方法稍作改动,例如:

  @RabbitListener(queues = RabbitConfig.QUEUE,containerFactory = "prefetchOneRabbitListenerContainerFactory")
  public static void listen(final String input,final Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) final long tag) throws IOException {
        System.out.println(input);
        System.exit(0);
        channel.basicAck(tag,false);
  }

你会看到,一切正常。我认为,您的情况的原始问题是“停止”应用程序的方式。我假设您在 Intellij Idea 中单击了“停止”按钮。 但由于未知原因,Intellij Idea 不会立即停止进程和 channel.basicAck(tag,false); 命令 在您按下“停止”按钮后运行。

Example

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...