SpringBoot + Rabbitmq-DLQ队列不起作用

问题描述

我已经设置了dlq和dlx,但是失败的消息未重定向到dlq。 我正在尝试从Java应用程序以及Rabbitmq服务器向MESSAGES.EXCHANGE发送消息,在两种情况下,我都收到消息,但是抛出异常消息后应重定向dlx.MESSAGES.EXCHANGE,但它正在发生。

下面是Rabbitmq serer的Java代码和屏幕截图。一切对我来说都不错。在代码或rabbitmq服务器中找不到任何问题。

队列设置代码-

public class DLQAmqpConfiguration {
    public static final String dlx_MESSAGES_EXCHANGE = "dlx.MESSAGES.EXCHANGE";
    public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";

    public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";
    public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";
    public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";

    @Bean
    Queue messagesQueue() {
        return QueueBuilder.durable(MESSAGES_QUEUE)
                .withArgument("x-dead-letter-exchange",dlx_MESSAGES_EXCHANGE)
                .build();
    }

    @Bean
    DirectExchange messagesExchange() {
        return new DirectExchange(MESSAGES_EXCHANGE);
    }

    @Bean
    Binding bindingMessages() {
        return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
    }

    @Bean
    FanoutExchange deadLetterExchange() {
        return new FanoutExchange(dlx_MESSAGES_EXCHANGE);
    }

    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(DLQ_MESSAGES_QUEUE).build();
    }

    @Bean
    Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
    }
}

制作人-

    this.template.convertAndSend(DLQAmqpConfiguration.MESSAGES_EXCHANGE,DLQAmqpConfiguration.ROUTING_KEY_MESSAGES_QUEUE,message);

消费者-

    @RabbitListener(queues = DLQAmqpConfiguration.MESSAGES_QUEUE)
    public void receiveMessage(Message message) throws BusinessException {
        System.out.println("Received Failed message,re-queueing: " + message.toString());
        System.out.println("Received Failed message,re-queueing: " + message.getMessageProperties().getReceivedRoutingKey());
        throw new BusinessException();
    }

    // this code never running 
    @RabbitListener(queues = DLQAmqpConfiguration.DLQ_MESSAGES_QUEUE)
    public void processFailedMessages(Message message) {
        System.out.println("Received Failed message: " + message.toString());
    }

交换-

enter image description here

队列-

enter image description here

enter image description here

日志-

Received Failed message,re-queueing: (Body:'[B@55c36bc9(byte[26])' MessageProperties [headers={},contentLength=0,receivedDeliveryMode=PERSISTENT,redelivered=true,receivedExchange=MESSAGES.EXCHANGE,receivedRoutingKey=ROUTING_KEY_MESSAGES_QUEUE,deliveryTag=5444,consumerTag=amq.ctag-Krxkdplc_uoqHOx_bbnvnA,consumerQueue=MESSAGES.QUEUE])
Received Failed message,re-queueing: ROUTING_KEY_MESSAGES_QUEUE
2020-08-27 21:36:33.460  WARN 13192 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener Failed.

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.rabbitmq.RabbitmqApplication.receiveMessage(org.springframework.amqp.core.Message) throws com.example.rabbitmq.errorhandler.BusinessException' threw exception
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:228) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:970) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:916) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1291) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: com.example.rabbitmq.errorhandler.BusinessException: null

解决方法

您必须设置function getMedia(imgMsgUrl,auth) { const getImage = { url: imgMsgUrl,oauth: auth,}; var getto = get(getImage); getto.then(result => { doSomething(result) }); }; (如果使用直接容器而不是简单容器,则必须设置spring.rabbitmq.listener.simple.default-requeue-rejected=false)或抛出...direct...

否则,失败的消息将重新排队并重新发送。

AmqpRejectAndDontRequeueException
@SpringBootApplication
public class So63620066Application {

    public static void main(String[] args) {
        SpringApplication.run(So63620066Application.class,args);
    }

    public static final String DLX_MESSAGES_EXCHANGE = "DLX.MESSAGES.EXCHANGE";

    public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";

    public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";

    public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";

    public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";

    @Bean
    Queue messagesQueue() {
        return QueueBuilder.durable(MESSAGES_QUEUE)
                .withArgument("x-dead-letter-exchange",DLX_MESSAGES_EXCHANGE)
                .build();
    }

    @Bean
    DirectExchange messagesExchange() {
        return new DirectExchange(MESSAGES_EXCHANGE);
    }

    @Bean
    Binding bindingMessages() {
        return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
    }

    @Bean
    FanoutExchange deadLetterExchange() {
        return new FanoutExchange(DLX_MESSAGES_EXCHANGE);
    }

    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(DLQ_MESSAGES_QUEUE).build();
    }

    @Bean
    Binding deadLetterBinding() {
        return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
    }

    @RabbitListener(queues = MESSAGES_QUEUE)
    public void receiveMessage(Message message) {
        System.out.println("Received failed message,re-queueing: " + message.toString());
        System.out.println(
                "Received failed message,re-queueing: " + message.getMessageProperties().getReceivedRoutingKey());
        throw new RuntimeException("fail");
    }

    @RabbitListener(queues = DLQ_MESSAGES_QUEUE)
    public void processFailedMessages(Message message) {
        System.out.println("Received failed message: " + message.toString());
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> {
            template.convertAndSend(MESSAGES_EXCHANGE,ROUTING_KEY_MESSAGES_QUEUE,"foo");
        };
    }

}
spring.rabbitmq.listener.simple.default-requeue-rejected=false