问题描述
我正在使用 spring-amqp 的(最新版本)rabbitTemplate.sendAndReceive(exchange,routingKey,message)
方法发送消息。 RabbitTemplate 配置了 Jackson2JsonMessageConverter。
如果我发送格式错误的 json,我可以看到消息转换失败并显示 org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
,正如预期的那样。
但是,sendAndReceive 方法不会中止并继续执行,直到出现 replyTimeout。 相比之下,如果我的 @RabbitListener 注释方法中存在错误,则 sendAndReceive 会立即中止并返回异常。
有没有办法告诉 spring 在转换异常时中止 sendAndReceive?
2021-05-18 17:07:38.188 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#8-1] [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler] :: Execution of Rabbit message listener Failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverteradapter.extractPayload(MessagingMessageListenerAdapter.java:325)
at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:207)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:134)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
... 10 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('s' (code 115)): was expecting comma to separate Object entries
at [Source: (String)"{sd}"; line: 1,column: 81]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipComma(ReaderBasedJsonParser.java:2324)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:921)
at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:525)
at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:377)
at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4526)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3468)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:351)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertContent(AbstractJackson2MessageConverter.java:321)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:291)
... 18 common frames omitted
2021-05-18 17:07:38.188 WARN [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#8-1] [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy] :: Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange,if so configured: (Body:'{sd}' MessageProperties [headers={centreId=0,deliveryTypeId=null,path=/somePath,privileges=[],salespointId=0,insuranceId=null,promoter=null,userName=null,parameters={},userId=null,superCentreId=0},correlationId=1,replyTo=amq.rabbitmq.reply-to.g1h2AA5yZXBseUA4NzA3MTY3MAAAaScAAAABYJ9TZA==.YoDsH6nkpNiLYvA4h8716g==,contentType=application/json,contentLength=0,receivedDeliveryMode=PERSISTENT,priority=0,redelivered=false,receivedExchange=someExchange,receivedRoutingKey=someKey,deliveryTag=1,consumerTag=amq.ctag-lHGwtgImO4ohGFCAHuKkLg,consumerQueue=someQueue])
2021-05-18 17:07:38.188 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#8-1] [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] :: Execution of Rabbit message listener Failed,and the error handler threw an exception
org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1436)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1720)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1495)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncmessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
... 6 common frames omitted
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverteradapter.extractPayload(MessagingMessageListenerAdapter.java:325)
at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:207)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:134)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
... 10 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('s' (code 115)): was expecting comma to separate Object entries
at [Source: (String)"{sd}"; line: 1,column: 81]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipComma(ReaderBasedJsonParser.java:2324)
at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:921)
at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:525)
at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:377)
at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4526)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3468)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:351)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertContent(AbstractJackson2MessageConverter.java:321)
at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:291)
... 18 common frames omitted
编辑 1: 这似乎是 spring-amqp
中的错误。您可以关注此问题的状态 here。
编辑 2: 已在 spring-amqp 2.3.9
中修复。
解决方法
好的。我明白发生了什么。我们在 AbstractMessageListenerContainer
上失败了,它还不知道我们的 MessageListener
是关于请求-回复行为的。因此,它通过其默认的 ConditionalRejectingErrorHandler
静默处理异常,而后者又会抛出一个 AmqpRejectAndDontRequeueException
,仅此而已。侦听器容器返回到下一条消息的主循环。
您可能必须实现自己的 ConditionalRejectingErrorHandler
覆盖其:
/**
* Called when a message with a fatal exception has an {@code x-death} header,prior
* to discarding the message. Subclasses can override this method to perform some
* action,such as sending the message to a parking queue.
* @param failed the failed message.
* @since 2.3
*/
protected void handleDiscarded(Message failed) {
发送带有错误消息的回复。
欢迎在 GitHub 上提 issue,大家共同思考如何处理这种情况。