问题描述
我已经设置了dlq和dlx,但是失败的消息未重定向到dlq。 我正在尝试从Java应用程序以及Rabbitmq服务器向MESSAGES.EXCHANGE发送消息,在两种情况下,我都收到消息,但是抛出异常消息后应重定向到dlx.MESSAGES.EXCHANGE,但它正在发生。
下面是Rabbitmq serer的Java代码和屏幕截图。一切对我来说都不错。在代码或rabbitmq服务器中找不到任何问题。
队列设置代码-
public class DLQAmqpConfiguration {
public static final String dlx_MESSAGES_EXCHANGE = "dlx.MESSAGES.EXCHANGE";
public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";
public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";
public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";
public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(MESSAGES_QUEUE)
.withArgument("x-dead-letter-exchange",dlx_MESSAGES_EXCHANGE)
.build();
}
@Bean
DirectExchange messagesExchange() {
return new DirectExchange(MESSAGES_EXCHANGE);
}
@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
}
@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange(dlx_MESSAGES_EXCHANGE);
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(DLQ_MESSAGES_QUEUE).build();
}
@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}
}
制作人-
this.template.convertAndSend(DLQAmqpConfiguration.MESSAGES_EXCHANGE,DLQAmqpConfiguration.ROUTING_KEY_MESSAGES_QUEUE,message);
消费者-
@RabbitListener(queues = DLQAmqpConfiguration.MESSAGES_QUEUE)
public void receiveMessage(Message message) throws BusinessException {
System.out.println("Received Failed message,re-queueing: " + message.toString());
System.out.println("Received Failed message,re-queueing: " + message.getMessageProperties().getReceivedRoutingKey());
throw new BusinessException();
}
// this code never running
@RabbitListener(queues = DLQAmqpConfiguration.DLQ_MESSAGES_QUEUE)
public void processFailedMessages(Message message) {
System.out.println("Received Failed message: " + message.toString());
}
队列-
日志-
Received Failed message,re-queueing: (Body:'[B@55c36bc9(byte[26])' MessageProperties [headers={},contentLength=0,receivedDeliveryMode=PERSISTENT,redelivered=true,receivedExchange=MESSAGES.EXCHANGE,receivedRoutingKey=ROUTING_KEY_MESSAGES_QUEUE,deliveryTag=5444,consumerTag=amq.ctag-Krxkdplc_uoqHOx_bbnvnA,consumerQueue=MESSAGES.QUEUE])
Received Failed message,re-queueing: ROUTING_KEY_MESSAGES_QUEUE
2020-08-27 21:36:33.460 WARN 13192 --- [ntContainer#0-1] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener Failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void com.example.rabbitmq.RabbitmqApplication.receiveMessage(org.springframework.amqp.core.Message) throws com.example.rabbitmq.errorhandler.BusinessException' threw exception
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:228) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:148) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:133) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1591) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1510) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1489) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1433) ~[spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:970) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:916) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1291) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1197) [spring-rabbit-2.2.10.RELEASE.jar:2.2.10.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
Caused by: com.example.rabbitmq.errorhandler.BusinessException: null
解决方法
您必须设置function getMedia(imgMsgUrl,auth) {
const getImage = {
url: imgMsgUrl,oauth: auth,};
var getto = get(getImage);
getto.then(result => {
doSomething(result)
});
};
(如果使用直接容器而不是简单容器,则必须设置spring.rabbitmq.listener.simple.default-requeue-rejected=false
)或抛出...direct...
。
否则,失败的消息将重新排队并重新发送。
AmqpRejectAndDontRequeueException
@SpringBootApplication
public class So63620066Application {
public static void main(String[] args) {
SpringApplication.run(So63620066Application.class,args);
}
public static final String DLX_MESSAGES_EXCHANGE = "DLX.MESSAGES.EXCHANGE";
public static final String DLQ_MESSAGES_QUEUE = "DLQ.MESSAGES.QUEUE";
public static final String MESSAGES_QUEUE = "MESSAGES.QUEUE";
public static final String MESSAGES_EXCHANGE = "MESSAGES.EXCHANGE";
public static final String ROUTING_KEY_MESSAGES_QUEUE = "ROUTING_KEY_MESSAGES_QUEUE";
@Bean
Queue messagesQueue() {
return QueueBuilder.durable(MESSAGES_QUEUE)
.withArgument("x-dead-letter-exchange",DLX_MESSAGES_EXCHANGE)
.build();
}
@Bean
DirectExchange messagesExchange() {
return new DirectExchange(MESSAGES_EXCHANGE);
}
@Bean
Binding bindingMessages() {
return BindingBuilder.bind(messagesQueue()).to(messagesExchange()).with(ROUTING_KEY_MESSAGES_QUEUE);
}
@Bean
FanoutExchange deadLetterExchange() {
return new FanoutExchange(DLX_MESSAGES_EXCHANGE);
}
@Bean
Queue deadLetterQueue() {
return QueueBuilder.durable(DLQ_MESSAGES_QUEUE).build();
}
@Bean
Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}
@RabbitListener(queues = MESSAGES_QUEUE)
public void receiveMessage(Message message) {
System.out.println("Received failed message,re-queueing: " + message.toString());
System.out.println(
"Received failed message,re-queueing: " + message.getMessageProperties().getReceivedRoutingKey());
throw new RuntimeException("fail");
}
@RabbitListener(queues = DLQ_MESSAGES_QUEUE)
public void processFailedMessages(Message message) {
System.out.println("Received failed message: " + message.toString());
}
@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
template.convertAndSend(MESSAGES_EXCHANGE,ROUTING_KEY_MESSAGES_QUEUE,"foo");
};
}
}
spring.rabbitmq.listener.simple.default-requeue-rejected=false