如何在 MessageConversionException 上中止 sendAndReceive

问题描述

我正在使用 spring-amqp 的(最新版本)rabbitTemplate.sendAndReceive(exchange,routingKey,message) 方法发送消息。 RabbitTemplate 配置了 Jackson2JsonMessageConverter

如果我发送格式错误的 json,我可以看到消息转换失败并显示 org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content,正如预期的那样。

但是,sendAndReceive 方法不会中止并继续执行,直到出现 replyTimeout。 相比之下,如果我的 @RabbitListener 注释方法中存在错误,则 sendAndReceive 会立即中止并返回异常。

有没有办法告诉 spring 在转换异常时中止 sendAndReceive

2021-05-18 17:07:38.188 WARN  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#8-1] [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler] :: Execution of Rabbit message listener Failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverteradapter.extractPayload(MessagingMessageListenerAdapter.java:325)
    at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:207)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:134)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
    ... 10 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('s' (code 115)): was expecting comma to separate Object entries
 at [Source: (String)"{sd}"; line: 1,column: 81]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipComma(ReaderBasedJsonParser.java:2324)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:921)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:525)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:377)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4526)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3468)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:351)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertContent(AbstractJackson2MessageConverter.java:321)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:291)
    ... 18 common frames omitted
2021-05-18 17:07:38.188 WARN  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#8-1] [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy] :: Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange,if so configured: (Body:'{sd}' MessageProperties [headers={centreId=0,deliveryTypeId=null,path=/somePath,privileges=[],salespointId=0,insuranceId=null,promoter=null,userName=null,parameters={},userId=null,superCentreId=0},correlationId=1,replyTo=amq.rabbitmq.reply-to.g1h2AA5yZXBseUA4NzA3MTY3MAAAaScAAAABYJ9TZA==.YoDsH6nkpNiLYvA4h8716g==,contentType=application/json,contentLength=0,receivedDeliveryMode=PERSISTENT,priority=0,redelivered=false,receivedExchange=someExchange,receivedRoutingKey=someKey,deliveryTag=1,consumerTag=amq.ctag-lHGwtgImO4ohGFCAHuKkLg,consumerQueue=someQueue])
2021-05-18 17:07:38.188 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#8-1] [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] :: Execution of Rabbit message listener Failed,and the error handler threw an exception
org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1436)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1720)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1495)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
    ... 6 common frames omitted
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverteradapter.extractPayload(MessagingMessageListenerAdapter.java:325)
    at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:207)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:134)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
    ... 10 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('s' (code 115)): was expecting comma to separate Object entries
 at [Source: (String)"{sd}"; line: 1,column: 81]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipComma(ReaderBasedJsonParser.java:2324)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:921)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:525)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:377)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4526)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3468)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:351)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertContent(AbstractJackson2MessageConverter.java:321)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:291)
    ... 18 common frames omitted

编辑 1: 这似乎是 spring-amqp 中的错误。您可以关注此问题的状态 here

编辑 2: 已在 spring-amqp 2.3.9 中修复。

解决方法

好的。我明白发生了什么。我们在 AbstractMessageListenerContainer 上失败了,它还不知道我们的 MessageListener 是关于请求-回复行为的。因此,它通过其默认的 ConditionalRejectingErrorHandler 静默处理异常,而后者又会抛出一个 AmqpRejectAndDontRequeueException,仅此而已。侦听器容器返回到下一条消息的主循环。

您可能必须实现自己的 ConditionalRejectingErrorHandler 覆盖其:

/**
 * Called when a message with a fatal exception has an {@code x-death} header,prior
 * to discarding the message. Subclasses can override this method to perform some
 * action,such as sending the message to a parking queue.
 * @param failed the failed message.
 * @since 2.3
 */
protected void handleDiscarded(Message failed) {

发送带有错误消息的回复。

欢迎在 GitHub 上提 issue,大家共同思考如何处理这种情况。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...